我们正在设计一个变量选择和参数设置逻辑,当DAG被触发时需要评估什么。我们的DAG是在执行之前生成的。我们决定将静态代码修改为自定义宏
在此之前,在运算符定义之间定义了一个代码,因此,当DAG生成器代码生成DAG时,该代码正在运行。此代码无法处理用于选择正确气流变量的运行时参数
for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
dag_id = "TableLoader_" + str(table_name)
default_dag_args={...}
schedule = None
globals()[dag_id] = create_dag(dag_id, schedule, default_dag_args)
def create_dag(dag_id, schedule, default_dag_args):
with DAG(
default_args=default_dag_args,
dag_id=dag_id,
schedule_interval=schedule,
user_defined_macros={ "load_type_handler": load_type_handler }
) as dag:
# static python code which sets pipeline_rt_args for all generated DAGs the same way
# this static code could set only one type (INITIAL or INCREMENTAL)
# but we want to decide during the execution now
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> Can't handle runtime parameters <---
runtime_args=pipeline_rt_args,
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
现在,我们希望在从UI或REST API触发DAG时传递load_type
(例如:INITIAL
,INCREMENTAL
),因此我们需要修改这个旧的(静态)行为(只处理一种情况,但不处理两种情况),以获得适当的气流变量,并为CloudDataFusionStartPipelineOperator
创建适当的对象:
例如:
{"load_type":"INCREMENTAL"}
# or
{"load_type":"INITIAL"}
但是如果我们做一些类似的事情:
def create_dag(dag_id, schedule, default_dag_args):
def extend_runtime_args(prefix, param, field_name, date, job_id):
# reading the Trigger-time parameter
load_type = param.conf["load_type"]
# getting the proper Airflow Variable (depending on current load type)
result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]
# setting 'job_id', 'dateValue', 'date', 'GCS_Input_Path' for CloudDataFusionStartPipelineOperator
# ...
return rt_args
with DAG( #...
user_defined_macros={
"extend_runtime_args": extend_runtime_args
}) as dag:
# removed static code (which executes only in generation time)
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> handles runtime arguments with custom macro <---
runtime_args="""{{ extend_runtime_args('PREFIX', dag_run, 'runtime_args', macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"), ti.job_id) }}""",
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
注意:
这里我们需要的是自定义逻辑的“未来”评估(不在DAG生成时进行评估),它将随对象返回,这就是为什么我们在这里使用模板
我们经历了以下几点:
extend_runtime_args
中,返回类型是一个对象CloudDataFusionStartPipelineOperator
失败,因为runtime_args
属性是字符串而不是对象问题:
您可以创建自己的从
CloudDataFusionStartPipelineOperator
派生的自定义运算符,使其接受字符串并将其转换为CloudDataFusionStartPipelineOperator
所需的对象,然后使用此新运算符。“runtime_args”是一个字典,所以我相信它应该像json.loads()
一样容易拿回是的。只要上面的json.loads()代码就可以了。此外,如果您在运行时参数中只有几个参数可以更改,那么使用多个宏并直接在字典中的多个JINJA字符串中返回更改的值可能会更容易。比如:
当存在模板化字段时,Airflow递归地处理基本结构,如dict或list,因此您可以保留对象结构,并使用jinja宏作为值(实际上您也可以将jinja宏作为键等)
JINJA模板仅在执行任务时进行评估。所以你在这里很好
图案很好。这就是他们的目的
相关问题 更多 >
编程相关推荐