我在DAG中有一组关于气流的任务,用于ETL作业,该作业从开放源代码获取数据,对其进行转换,然后在AWS Athena上可用。数据被分成5个表,其中包含关于类似事件的消息。我按日期和组id对数据进行分区
我的当前设置有一个任务'add\{0}}}{1}',其中{0}是表,{1}是组id。此任务存在于嵌套循环中。外部循环循环遍历组ID,内部循环循环遍历表。在
我目前使用Jinja来创建add的模板来进行分区_分区.sql文件。我们修改了标准的雅典娜操作符,使其能够直接读取文件。但是,循环的项的大小已经增加了,因此现在正在创建大约150个分区任务。我希望能够有一个单独的add_partitions任务,它可以获取.sql文件并生成一个语句列表,以便在单个命令中添加每个分区。我将把group_id的列表、表的列表和日期传递给模板并创建语句。在
ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (group_id= '{{ params.group }}', date = '{{ ds }}')
我希望发送给接线员的声明是
^{pr2}$等等。使用金贾模板可以做到吗?我已经看到循环是可能的使用金贾,但我不确定如何应用它在这里?在
结果发现这是非常容易做一些非常简单的金贾模板。在
然后在python代码中,我使用AWSAthenaOperator遍历表,并将组id、数据库和表名的列表作为参数传递(ds由flow宏提供)。在
相关问题 更多 >
编程相关推荐