我可以使用TriggerDagRunOperator将参数传递给触发的dag吗?气流

2024-06-17 09:36:02 发布

您现在位置:Python中文网/ 问答频道 /正文

Im使用气流1.10.11

我可以读到我可以使用UI和CLI(https://airflow.apache.org/docs/apache-airflow/1.10.11/dag-run.html)向dag执行发送参数,但是我可以使用TriggerDagRunOperator吗?(https://airflow.apache.org/docs/apache-airflow/1.10.11/_api/airflow/operators/dagrun_operator/index.html

阅读https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_target_dag.pyhttps://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py似乎我可以,但当我自己尝试这个脚本时,我无法检索conf消息(可能是因为他们似乎在使用airflow 2.0,而我在使用1.10)

所以我的问题是:当TriggerDagRunOperator启动目标时,有没有一种方法可以将参数发送到目标dag

提前谢谢


Tags: httpsorggithubmastercomdocs参数example
2条回答

下面的示例演示了如何设置通过TriggerDagRunOperator (in 1.10.11)触发的dagruns发送的conf

trigger.py

from datetime import datetime
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

dag = DAG(
    dag_id='trigger',
    schedule_interval='@once',
    start_date=datetime(2021, 1, 1)
)


def modify_dro(context, dagrun_order):
    print(context)
    print(dagrun_order)
    dagrun_order.payload = {
        "message": "This is my conf message"
    }
    return dagrun_order


run_this = TriggerDagRunOperator(
    task_id='run_this',
    trigger_dag_id='my_dag',
    python_callable=modify_dro,
    dag=dag
)

达格·皮

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='my_dag',
    schedule_interval='@once',
    start_date=datetime(2021, 1, 1)
)


def run_this_func(**context):
    print(context["dag_run"].conf)


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

以下是my_dagDAG中run_this任务的输出

[2021-03-05 16:39:15,649] {logging_mixin.py:112} INFO - {'message': 'This is my conf message'}

TriggerDagRunOnOperator接受python_可调用函数,允许您修改DagRunOrder。DagRunOrder是一个抽象构造,它保存运行id和有效负载,该id和有效负载将为sent as conf when the target DAG is triggered

对于airflow2.0.x,您需要使用conf来传递参数

如PR中所述,不推荐使用python_callable

https://github.com/apache/airflow/pull/6317

有关更多详细信息,请参阅

https://stackoverflow.com/a/67254524/3152654

相关问题 更多 >