在Apache Ai中的自定义操作符中访问params参数

2024-05-16 01:00:02 发布

您现在位置:Python中文网/ 问答频道 /正文

问题

我想将一个值列表或任何值作为参数传递给自定义运算符,修改运算符中的值,然后通过{{ params }}宏在sql模板中访问这些值。在

当前设置

以下是我设置的相关部分,为了清晰起见,稍微做了些改动。在

DAG定义:

from airflow import DAG
from datetime import timedelta, datetime
from acme.operators.dwh_operators import ProcessDimensionOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2019, 2, 27),
    'provide_context': True,
    'depends_on_past': True
}

dag = DAG(
    'etl',
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=60),
    template_searchpath=tmpl_search_path,
    default_args=default_args,
    max_active_runs=1)

process_product_dim = ProcessDimensionOperator(
    task_id='process_product_dim',
    mysql_conn_id='mysql_dwh',
    sql='process_dimension.sql',
    database='dwh',
    col_names=[
        'id',
        'name',
        'category',
        'price',
        'available',
        'country',
    ],
    t_name='products',
    dag=dag)

运算符定义:

^{pr2}$

过程_维度.sql在

create table if not exists staging.{{ params.t_name }};

select
    *
from
    source.{{ params.t_name }} as source
join
    target.{{ params.t_name }} as target
    on source.id = target.id {{ params.match_statement }}

但是这会抛出错误,因为{{ params.t_name }}和{}呈现为null。在

我所做的一切

  • 在任务定义的t_name参数中设置t_name和{},并将映射/联接逻辑保留在sql模板中。这是可行的,但如果可能的话,我希望将逻辑排除在模板之外
  • params={xxx}传入super(ProcessDimensionOperator, self).__init__(params=params, *args, **kwargs)
  • 将参数作为parameters={xxx}传入hook.run()方法,并用%(x)s对它们进行模板化,但这会导致在变量周围使用引号呈现时出现问题,这会使各种sql语句混乱

我对python和flow比较陌生,所以我很可能会错过一些明显的东西,任何帮助都将不胜感激,谢谢!在


Tags: namefromimport模板iddefaultsqldatetime
1条回答
网友
1楼 · 发布于 2024-05-16 01:00:02

这里也一样。我刚花了几个小时(几天?)找出问题的原因(上帝保佑IPython.embed和伐木)。根据气流1.10.3,它是由TaskInstance.render_模板(),在呈现任何template_字段或template_ext之后,它不会更新Jinja上下文,只更新任务属性。看到了here!在

所以你只要用

{{ task.params.whatever }}

而不是

{{ params.whatever }}

在.sql模板文件中。在

事实上,如果Jinja上下文要不断更新,那么就必须注意模板的顺序和依赖性。这是一种嵌套/递归渲染。它也可能带来性能上的负面影响。在

另外,我不建议使用“parameters”(它与“params”不同),因为它们似乎是要作为参数传递给数据库游标的,然后就不能传递数字/整数、列或表名,或者仅仅传递一个SQL片段(例如where、having、limit…)。在

相关问题 更多 >