TL; DR如何实现,以便dag中的任务可以访问资产(SQL文件,文本文件)
大家好,
我正在运行单节点Airflow安装(1.10.12)。我有一个使用SQL语句的dag。为了便于阅读,我想将这些语句保留在自己的SQL文件中。所以我的项目看起来像这样:
MyProject
- airflow_ftp_DAG.py
- airflow_ftp_assets (directory)
- WEX_HH_Upsert_SQL.sql
当我的dag运行时,它抱怨找不到SQL文件。这是由于dag在不同的文件夹中运行任务(如通过运行可见os.getcwd()
)
如何使我的SQL文件可被任务引用?
def insert_hh_records(**kwargs):
key = 'WEX_FILETYPE_HH'
collated_data_file_path = kwargs['ti'].xcom_pull(key=key)
sql_path = "airflow_wex_ftp_assets/WEX_HH_Upsert_SQL.sql"
sql_string = ''
<!--- Error below --->
with open(sql_path,"r") as f:
sql_string = f.read()
任何帮助是极大的赞赏
我将Dag文件作为zip文件上传到dag文件夹中。在此过程中的某个地方,它正在解压缩它们,但没有保留SQL资产文件。一旦我停止压缩它们,问题就消失了。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句