我想将一个值列表或任何值作为参数传递给自定义运算符,修改运算符中的值,然后通过{{ params }}
宏在sql模板中访问这些值。在
以下是我设置的相关部分,为了清晰起见,稍微做了些改动。在
DAG定义:
from airflow import DAG
from datetime import timedelta, datetime
from acme.operators.dwh_operators import ProcessDimensionOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 2, 27),
'provide_context': True,
'depends_on_past': True
}
dag = DAG(
'etl',
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
template_searchpath=tmpl_search_path,
default_args=default_args,
max_active_runs=1)
process_product_dim = ProcessDimensionOperator(
task_id='process_product_dim',
mysql_conn_id='mysql_dwh',
sql='process_dimension.sql',
database='dwh',
col_names=[
'id',
'name',
'category',
'price',
'available',
'country',
],
t_name='products',
dag=dag)
运算符定义:
^{pr2}$过程_维度.sql在
create table if not exists staging.{{ params.t_name }};
select
*
from
source.{{ params.t_name }} as source
join
target.{{ params.t_name }} as target
on source.id = target.id {{ params.match_statement }}
但是这会抛出错误,因为{{ params.t_name }}
和{
t_name
参数中设置t_name
和{params={xxx}
传入super(ProcessDimensionOperator, self).__init__(params=params, *args, **kwargs)
parameters={xxx}
传入hook.run()
方法,并用%(x)s
对它们进行模板化,但这会导致在变量周围使用引号呈现时出现问题,这会使各种sql语句混乱我对python和flow比较陌生,所以我很可能会错过一些明显的东西,任何帮助都将不胜感激,谢谢!在
这里也一样。我刚花了几个小时(几天?)找出问题的原因(上帝保佑IPython.embed和伐木)。根据气流1.10.3,它是由TaskInstance.render_模板(),在呈现任何template_字段或template_ext之后,它不会更新Jinja上下文,只更新任务属性。看到了here!在
所以你只要用
{{ task.params.whatever }}
而不是
{{ params.whatever }}
在.sql模板文件中。在
事实上,如果Jinja上下文要不断更新,那么就必须注意模板的顺序和依赖性。这是一种嵌套/递归渲染。它也可能带来性能上的负面影响。在
另外,我不建议使用“parameters”(它与“params”不同),因为它们似乎是要作为参数传递给数据库游标的,然后就不能传递数字/整数、列或表名,或者仅仅传递一个SQL片段(例如where、having、limit…)。在
相关问题 更多 >
编程相关推荐