How to schedule a task with airflow

kidman01

Unfortunately even after reading the many questions here and the FAQ page of the airflow website, I still don't understand how airflow schedules tasks. I have a very simple example task here:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    "depends_on_past": False,
    "start_date": datetime(2020, 5, 29),
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "example_dag_one",
    schedule_interval="30 8 * * *",
    catchup=False,
    default_args=default_args,
)

with dag:

    t1 = BashOperator(task_id="print_hello", bash_command="echo hello", dag=dag)

    t1

My naiv view would be that this task would be run on May 29th 08:30. But as the time passes, airflow has not scheduled that task. If I change the cron expression to something like: '* 8 * * *' It will schedule a task every minute.

When I however use the same DAG with a start date of yesterday (so May 28th in that case) the task will be scheduled at 08:30, yet it's execution date is the 28th (even though it ran on May 29th) and the start date in the web ui is May 29th. This is VERY confusing.

What I want from airflow in the end is simple: "Here is python code, run it on this time day". So how could I achieve that. Again let's say I want to schedule a task on 08:30 every day starting tomorrow.

Javier López Tomás

The answer can be found in Airflow official documentation:

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

So applying to your case, if you put start date 29th of May, with the original cron, it will run every day at 08:30 starting from tomorrow 30th of May.

Anyway, if you don't need a dag specifically at some point in the day, you can just set schedule interval to '@daily', and it will be triggered at the beginning (00:00) of each day. If there are a lot of dags with @daily, don't worry, the scheduler and the workers will know how to handle it to execute all of them. If you have dags that depend on other dags, there are mechanisms to concatenate them so that you still don't have to worry about specifying hours.

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Airflow: how to schedule a dag to start the day following a weekday?

分類Dev

How to stop a Airflow DAG run from task

分類Dev

How to schedule a periodic task that is immune to system time change using Python

分類Dev

How to pass a class based task into CELERY_BEAT_SCHEDULE

分類Dev

Celery - Schedule periodic task at the end of another task

分類Dev

Cron schedule a task alleatory in a range of hours

分類Dev

Use Task Schedule to offset System Time

分類Dev

Is it possible to have a pipeline in Airflow that does not tie to any schedule?

分類Dev

Airflow XCOM KeyError: 'task_instance'

分類Dev

Can airflow be used to run a never ending task?

分類Dev

Airflow task dont start but stuck on "running"

分類Dev

How to uninstall Airflow?

分類Dev

Schedule a task in EC2 Auto Scaling Group

分類Dev

Airflow - Parse task-id from dag context callback

分類Dev

Apache Airflow - get all parent task_ids

分類Dev

How to dynamically create subdags in Airflow

分類Dev

How to run jupyter notebook in airflow

分類Dev

How to schedule multiple notification using alarm manager?

分類Dev

How to run a job in openshift to schedule a particular script?

分類Dev

How to schedule a docker run on google cloud

分類Dev

How to schedule Google Data Fusion pipeline?

分類Dev

How to write a cron schedule for US Market Hours

分類Dev

Wix How to schedule the XmlConfig run after ConfigureIIs?

分類Dev

How to find the number of upstream tasks failed in Airflow?

分類Dev

How to define a timeout for Apache Airflow DAGs?

分類Dev

How to run backfill job with puckel/airflow image

分類Dev

Use Airflow for batch processing to dynamically start multiple tasks based on the output of a parent task

分類Dev

Airflow、XComおよび複数のtask_ids

分類Dev

Airflow falls with TypeError "can't pickle module objects" when task returns kwargs

Related 関連記事

  1. 1

    Airflow: how to schedule a dag to start the day following a weekday?

  2. 2

    How to stop a Airflow DAG run from task

  3. 3

    How to schedule a periodic task that is immune to system time change using Python

  4. 4

    How to pass a class based task into CELERY_BEAT_SCHEDULE

  5. 5

    Celery - Schedule periodic task at the end of another task

  6. 6

    Cron schedule a task alleatory in a range of hours

  7. 7

    Use Task Schedule to offset System Time

  8. 8

    Is it possible to have a pipeline in Airflow that does not tie to any schedule?

  9. 9

    Airflow XCOM KeyError: 'task_instance'

  10. 10

    Can airflow be used to run a never ending task?

  11. 11

    Airflow task dont start but stuck on "running"

  12. 12

    How to uninstall Airflow?

  13. 13

    Schedule a task in EC2 Auto Scaling Group

  14. 14

    Airflow - Parse task-id from dag context callback

  15. 15

    Apache Airflow - get all parent task_ids

  16. 16

    How to dynamically create subdags in Airflow

  17. 17

    How to run jupyter notebook in airflow

  18. 18

    How to schedule multiple notification using alarm manager?

  19. 19

    How to run a job in openshift to schedule a particular script?

  20. 20

    How to schedule a docker run on google cloud

  21. 21

    How to schedule Google Data Fusion pipeline?

  22. 22

    How to write a cron schedule for US Market Hours

  23. 23

    Wix How to schedule the XmlConfig run after ConfigureIIs?

  24. 24

    How to find the number of upstream tasks failed in Airflow?

  25. 25

    How to define a timeout for Apache Airflow DAGs?

  26. 26

    How to run backfill job with puckel/airflow image

  27. 27

    Use Airflow for batch processing to dynamically start multiple tasks based on the output of a parent task

  28. 28

    Airflow、XComおよび複数のtask_ids

  29. 29

    Airflow falls with TypeError "can't pickle module objects" when task returns kwargs

ホットタグ

アーカイブ