变量在气流DAG中的应用

2024-05-23 17:14:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我用cli中的“airflow variables”命令设置变量 我想在DAG中使用这个变量。 我在终端上执行了以下命令 错误继续发生。

 Broken DAG: [/root/airflow/dags/param_test.py] invalid syntax (param_test.py, line 13) 

airflow variables -s sh_path = "/tmp/echo_test.sh"
airflow scheduler

代码如下:

from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator

tmpl_search_path = Variable.get ("sh_path")

dag = DAG ('param_test', schedule_interval = '* / 5 * * * *'
           start_date = datetime (2018,9,4), catchup = False)

bash_task = BashOperator (
      task_id = "bash_task"
      bash_command = 'sh '+ {{var.value.tmpl_search_path}},
      dag = dag)

bash_task.set_downstream (python_task)

bash_task1 = BashOperator (
      task_id = 'echo',
      bash_command = 'echo 1',
      dag = dag)

bash_task.set_downstream (bash_task1)

Tags: pathfromtestimport命令echobashtask
1条回答
网友
1楼 · 发布于 2024-05-23 17:14:45

你需要引用金贾模板。使用方法如下:

bash_task = BashOperator (
      task_id = "bash_task"
      bash_command = "sh {{var.value.tmpl_search_path}}",
      dag = dag)

相关问题 更多 >