我是 Python 的新手,也是 Airflow 的新手。
我正在使用雪花数据库。
我创建了一个SnowflakeGetDataOperator
返回雪花hook.get_records
方法的运算符(我正在返回少量的 kines - 通常是单个单元格)
所以现在我在 dag 中有这个任务:
check_last_run_date=SnowflakeGetDataOperator(
task_id='check_last_run_date',
sql="SELECT COALESCE (max(update_date), '2000-01-01') FROM poc.dwh.fact_collector",
snowflake_conn_id='snowflake_default',
dag=dag)
当此任务运行时,我在 Airfow 后端看到此任务的 xcom 对象(运算符的返回值 - 我没有使用xcom.push()
)
我的问题是如何从下一个下游任务访问这个值?
我需要将它用作下一个 sql 运算符的参数。
我在 dag 代码中尝试了以下行
{{ task_instance.xcom_pull(task_ids='check_last_run_date') }}
但代码无法识别 task_instance 属性。
编辑
下一个任务应该是这样的
fill_agg_table = SnowflakeOperator(
task_id='fill_cust_agg_data',
sql= str.replace ("""INSERT INTO oc.TEMP_COMPUTING.collector_customer_aggregative_data
( SELECT * FROM POC.STG."stg_atg_data" WHERE XXXXX < current_date)""",
'XXXXX',
{{ task_instance.xcom_pull(task_ids='check_last_run_date') }},
snowflake_conn_id='snowflake_default',
dag=dag ))
你的第二个任务看起来有点不寻常。如果字段是模板化的,您可以简单地将字段放入字符串中。
事实上,使用string.replace
orstring.format
会弄乱你的宏并且在 Airflow 中不能很好地工作。其他宏在这里:https : //airflow.apache.org/code.html#macros
确保您在自己的运算符中模板化了 sql 字段。如何做到这一点,请参阅此示例代码https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_operator.py并检查变量templated_fields
。
建议:
sql= """INSERT INTO oc.TEMP_COMPUTING.collector_customer_aggregative_data
( SELECT * FROM POC.STG."stg_atg_data" WHERE {{ task_instance.xcom_pull(task_ids='check_last_run_date') }} < current_date)""",
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句