气流 xcom.pull() 访问上游任务的隐式返回值

沙比

我是 Python 的新手,也是 Airflow 的新手。

我正在使用雪花数据库。

我创建了一个SnowflakeGetDataOperator返回雪花hook.get_records方法的运算符(我正在返回少量的 kines - 通常是单个单元格)

所以现在我在 dag 中有这个任务:

check_last_run_date=SnowflakeGetDataOperator(
    task_id='check_last_run_date',
    sql="SELECT COALESCE (max(update_date), '2000-01-01') FROM poc.dwh.fact_collector",
    snowflake_conn_id='snowflake_default',
    dag=dag)

当此任务运行时,我在 Airfow 后端看到此任务的 xcom 对象(运算符的返回值 - 我没有使用xcom.push()

我的问题是如何从下一个下游任务访问这个值?

我需要将它用作下一个 sql 运算符的参数。

我在 dag 代码中尝试了以下行

{{ task_instance.xcom_pull(task_ids='check_last_run_date') }}

但代码无法识别 task_instance 属性。

编辑

下一个任务应该是这样的

fill_agg_table = SnowflakeOperator( 
task_id='fill_cust_agg_data', 
sql= str.replace ("""INSERT INTO oc.TEMP_COMPUTING.collector_customer_aggregative_data 
  ( SELECT * FROM POC.STG."stg_atg_data" WHERE XXXXX < current_date)""", 
    'XXXXX', 
    {{ task_instance.xcom_pull(task_ids='check_last_run_date') }}, 
snowflake_conn_id='snowflake_default', 
dag=dag )) 
托比6

你的第二个任务看起来有点不寻常。如果字段是模板化的,您可以简单地将字段放入字符串中。

事实上,使用string.replaceorstring.format会弄乱你的宏并且在 Airflow 中不能很好地工作。其他宏在这里:https : //airflow.apache.org/code.html#macros

确保您在自己的运算符中模板化了 sql 字段。如何做到这一点,请参阅此示例代码https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_operator.py并检查变量templated_fields

建议:

sql= """INSERT INTO oc.TEMP_COMPUTING.collector_customer_aggregative_data 
  ( SELECT * FROM POC.STG."stg_atg_data" WHERE {{ task_instance.xcom_pull(task_ids='check_last_run_date') }} < current_date)""", 

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

气流GKEPodOperator xcom_push返回None

来自分类Dev

气流xcom Job是否隔离?

来自分类Dev

Airflow XCOM Pull仅返回字符串

来自分类Dev

使用气流测试与DebugExecutor调试气流任务

来自分类Dev

气流中任务的粒度

来自分类Dev

气流 - 任务之间的变量

来自分类Dev

气流任务失败

来自分类Dev

在Airflow的EmailOperator中访问Xcom

来自分类Dev

气流-任务的下游任务列表

来自分类Dev

气流并行运行任务

来自分类Dev

气流-任务内的并行执行

来自分类Dev

如何在PostgresOperator中提取XCOM值

来自分类Dev

我可以在Airflow脚本的MAIN部分(PythonOperator外部)中获取()或xcom.pull()变量吗?

来自分类Dev

气流传递参数到相关任务

来自分类Dev

气流计划程序无法启动任务

来自分类Dev

气流DAG成功,但任务未运行

来自分类Dev

如何通过气流安排任务

来自分类Dev

组织DAG任务在气流中的依赖关系

来自分类Dev

如何使用条件任务运行气流DAG

来自分类Dev

气流中的任务调度不起作用

来自分类Dev

气流任务按计划运行延迟

来自分类Dev

气流-BigQuery模式字段中的值无效

来自分类Dev

运行tweepy的气流任务退出,返回码为-6

来自分类Dev

上游跳过时气流“ none_failed”跳过

来自分类Dev

将气流任务依赖关系表达给非直接父任务

来自分类Dev

即使我不向上游或向下游提供气流,任务也会显示出来

来自分类Dev

气流:有效地执行等待(睡眠)任务

来自分类Dev

气流-使用CLI将失败任务的状态更改为成功

来自分类Dev

气流-跳过将来的任务实例而不更改dag文件