在气流中评估模板后如何返回对象?

2024-06-06 16:08:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我们正在设计一个变量选择和参数设置逻辑,当DAG被触发时需要评估什么。我们的DAG是在执行之前生成的。我们决定将静态代码修改为自定义宏


在此之前,在运算符定义之间定义了一个代码,因此,当DAG生成器代码生成DAG时,该代码正在运行。此代码无法处理用于选择正确气流变量的运行时参数

for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
    dag_id = "TableLoader_" + str(table_name)
    default_dag_args={...}
    schedule = None
    globals()[dag_id] = create_dag(dag_id, schedule, default_dag_args)
def create_dag(dag_id, schedule, default_dag_args):

    with DAG(
        default_args=default_dag_args,
        dag_id=dag_id,
        schedule_interval=schedule,
        user_defined_macros={ "load_type_handler": load_type_handler }
    ) as dag:

        # static python code which sets pipeline_rt_args for all generated DAGs the same way
        # this static code could set only one type (INITIAL or INCREMENTAL)
        # but we want to decide during the execution now

        # Operator Definitions
        OP_K = CloudDataFusionStartPipelineOperator(
            task_id='PREFIX_'+str(table),

            # ---> Can't handle runtime parameters <---
            runtime_args=pipeline_rt_args,                             
            # ...
        )

        OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
        return dag

现在,我们希望在从UI或REST API触发DAG时传递load_type(例如:INITIALINCREMENTAL),因此我们需要修改这个旧的(静态)行为(只处理一种情况,但不处理两种情况),以获得适当的气流变量,并为CloudDataFusionStartPipelineOperator创建适当的对象:

例如:

{"load_type":"INCREMENTAL"}
# or
{"load_type":"INITIAL"}

但是如果我们做一些类似的事情:


def create_dag(dag_id, schedule, default_dag_args):

    def extend_runtime_args(prefix, param, field_name, date, job_id):

        # reading the Trigger-time parameter
        load_type = param.conf["load_type"]
        
        # getting the proper Airflow Variable (depending on current load type)
        result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]

        # setting 'job_id', 'dateValue', 'date', 'GCS_Input_Path' for CloudDataFusionStartPipelineOperator
        # ...

        return rt_args


    with DAG( #...
        user_defined_macros={
            "extend_runtime_args": extend_runtime_args
        }) as dag:
        # removed static code (which executes only in generation time)

        # Operator Definitions
        OP_K = CloudDataFusionStartPipelineOperator(
            task_id='PREFIX_'+str(table),

            # ---> handles runtime arguments with custom macro <---
            runtime_args="""{{ extend_runtime_args('PREFIX', dag_run, 'runtime_args', macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"), ti.job_id) }}""",
            # ...
        )

        OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
        return dag

注意:

这里我们需要的是自定义逻辑的“未来”评估(不在DAG生成时进行评估),它将随对象返回,这就是为什么我们在这里使用模板

我们经历了以下几点:

  • 在自定义宏函数extend_runtime_args中,返回类型是一个对象
  • 对Jinja模板求值后,返回类型更改为string
  • CloudDataFusionStartPipelineOperator失败,因为runtime_args属性是字符串而不是对象

问题:

  • 我们如何在评估Jinja模板后返回一个对象(并在“将来”执行此操作)?
    • 我们能转换字符串吗
  • 我们如何确保此处的逻辑将在执行DAG之后执行,而不是在生成DAG之后执行
  • Jinja模板/自定义宏在这里处理触发时间参数的模式是好还是坏

Tags: 对象代码iddefaulttypetableargsload
1条回答
网友
1楼 · 发布于 2024-06-06 16:08:29

How could we return with an object after evaluating the Jinja template (and do this in the 'future')?

您可以创建自己的从CloudDataFusionStartPipelineOperator派生的自定义运算符,使其接受字符串并将其转换为CloudDataFusionStartPipelineOperator所需的对象,然后使用此新运算符。“runtime_args”是一个字典,所以我相信它应该像json.loads()一样容易拿回

Can we convert the string somehow?

是的。只要上面的json.loads()代码就可以了。此外,如果您在运行时参数中只有几个参数可以更改,那么使用多个宏并直接在字典中的多个JINJA字符串中返回更改的值可能会更容易。比如:

runtime_args = {
   'PREFIX' = "{{ dag_run }}",
   'date' = "{{ macros.ds_format(....) }}",
}

当存在模板化字段时,Airflow递归地处理基本结构,如dict或list,因此您可以保留对象结构,并使用jinja宏作为值(实际上您也可以将jinja宏作为键等)

How could we ensure that the logic here will be executed after the DAG is executed and not right after the DAG was generated?

JINJA模板仅在执行任务时进行评估。所以你在这里很好

Are the Jinja templates / custom macros good or bad patterns here for handling the trigger-time arguments?

图案很好。这就是他们的目的

相关问题 更多 >