分支任务的气流问题

2024-04-26 12:51:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试设置一个DAG,其中每分钟运行一个任务,然后在第5分钟运行另一个任务(就在1分钟任务之前)。这只是测试,我不打算在这么短的时间内运行作业。在

从视觉上看,我的DAG是这样的:

DAG

代码本身是这样的:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 9)
}

now = datetime.now()
minute_check = now.minute % 5

dag = DAG(
    dag_id='test3',
    default_args=default_args,
    schedule_interval='* * * * *',
    dagrun_timeout=timedelta(minutes=5),
    catchup=False,
    max_active_runs=99
        )

def check_minute():
    if minute_check == 0:
        return "branch_fiveminute"
    else:
        return "branch_minute"

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=check_minute,
    trigger_rule='all_done',
    dag=dag)

branch_minute = BashOperator(
    task_id='branch_minute',
    bash_command='test1min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_fiveminute = BashOperator(
    task_id='branch_fiveminute',
    bash_command='test5min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_task.set_downstream(branch_minute)
branch_task.set_downstream(branch_fiveminute)
branch_fiveminute.set_downstream(branch_minute)

我遇到的问题是,在第5分钟,气流跳过了1分钟的任务:

enter image description here

我尝试过使用触发器规则设置,但没有成功。在

有什么好主意吗?如果需要的话,我使用1.10气流。在


Tags: fromimportbashbranchiddefaulttaskdatetime
1条回答
网友
1楼 · 发布于 2024-04-26 12:51:02

由于5分钟任务的执行路径不同,因此跳过一分钟的任务。从图中看,这有点违反直觉,但只有一条带有execute的路径。在

所以你要做的是在开始的时候有一个分支,一个路径通向一个假操作符,一个路径指向一个5分钟的任务,但是5分钟的任务和虚拟的操作符都会导致1分钟的任务。在

这样,虚拟任务将被跳过,但无论选择了哪个执行路径,执行流程都将以1分钟的任务结束。在

from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator  import DummyOperator
from airflow.operators.bash_operator   import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 9)
}

now = datetime.now()
minute_check = now.minute % 5

dag = DAG(
    dag_id='test3',
    default_args=default_args,
    schedule_interval='* * * * *',
    dagrun_timeout=timedelta(minutes=5),
    catchup=False,
    max_active_runs=99
        )

def check_minute():
    if minute_check == 0:
        return "branch_fiveminute"
    else:
        return "branch_false_1"

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=check_minute,
    trigger_rule='all_done',
    dag=dag)

branch_minute = BashOperator(
    task_id='branch_minute',
    bash_command='test1min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_fiveminute = BashOperator(
    task_id='branch_fiveminute',
    bash_command='test5min.sh ',
    trigger_rule='all_done',
    dag=dag)

branch_false_1 = DummyOperator( task_id= "branch_false_1", dag=dag )

branch_task.set_downstream(branch_false_1)
branch_task.set_downstream(branch_fiveminute)
branch_fiveminute.set_downstream(branch_minute)
branch_false_1.set_downstream(branch_minute)

相关问题 更多 >