if task_1(a1.py) run at 1:00 pm daily, task_2(a2.py) run at 5:00 pm daily, task_3 dependents task_1 and task_2.
How to design them in one dag?
I tried that when I set different schedule_interval
at a task level, it showed warnings like
'schedule_interval was on efficacious as dag level '
The code which I have used.
from datetime import datetime, timedelta
from airflow import DAG
from airflow import utils
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': utils.dates.days_ago(2)
}
dag = DAG(DAG_NAME, default_args=default_args)
task_1 = BashOperator(
task_id='task_1',
bash_command='python3 a1.py',
schedule_interval = '0 1 * * *'
dag=dag,
)
task_2 = BashOperator(
task_id='task_2',
bash_command='python3 a2.py',
schedule_interval = '0 5 * * *'
dag=dag,
)
task_3 = BashOperator(
task_id='task_3',
bash_command='python3 a3.py',
dag=dag,
)
[task1, task2] >> task3
I am expecting this,
- task_3 wait for task_1 and task_2 finished, and then start running.
- task_1, task2, task_3 must be designed in one dag.
You add TimeDeltaSensor to wait with timedelta of the minutes between the tasks.
The other option is to create a delaytask using PythonOperator(time.sleep()
) or BashOperator(bashcommand='sleep 10m'
) between the 2 task's.
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加