人工智能中的分区动态生成

2024-04-24 17:26:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我在DAG中有一组关于气流的任务,用于ETL作业,该作业从开放源代码获取数据,对其进行转换,然后在AWS Athena上可用。数据被分成5个表,其中包含关于类似事件的消息。我按日期和组id对数据进行分区

我的当前设置有一个任务'add\{0}}}{1}',其中{0}是表,{1}是组id。此任务存在于嵌套循环中。外部循环循环遍历组ID,内部循环循环遍历表。在

我目前使用Jinja来创建add的模板来进行分区_分区.sql文件。我们修改了标准的雅典娜操作符,使其能够直接读取文件。但是,循环的项的大小已经增加了,因此现在正在创建大约150个分区任务。我希望能够有一个单独的add_partitions任务,它可以获取.sql文件并生成一个语句列表,以便在单个命令中添加每个分区。我将把group_id的列表、表的列表和日期传递给模板并创建语句。在

ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
  PARTITION (group_id= '{{ params.group }}', date = '{{ ds }}')

我希望发送给接线员的声明是

^{pr2}$

等等。使用金贾模板可以做到吗?我已经看到循环是可能的使用金贾,但我不确定如何应用它在这里?在


Tags: 文件数据add模板id列表sql作业
1条回答
网友
1楼 · 发布于 2024-04-24 17:26:55

结果发现这是非常容易做一些非常简单的金贾模板。在

{% for group in params.groups %}
ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
  PARTITION (group_id= '{{ group }}', date = '{{ ds }}')
{% endfor %}

然后在python代码中,我使用AWSAthenaOperator遍历表,并将组id、数据库和表名的列表作为参数传递(ds由flow宏提供)。在

相关问题 更多 >