使用mara创建etl管道的实用程序
mara-etl-tools的Python项目详细描述
mara etl工具
围绕Project A使用mara创建data integration管道的最佳实践的实用程序集合。该包旨在作为新项目的开始。叉子/副本优先于prs。
有关如何使用此包的详细信息,请查看mara example project。
该软件包由多个模块组成,所有模块都可以独立使用:
SQL实用程序函数
etl_tools/initialize_utils/init.py中的函数initialize_utils
返回一个管道,该管道使用许多postgresql函数创建一个util
架构,用于组织数据管道。像这样添加到根管道:
frometl_toolsimportinitialize_utilsmy_pipeline.add(initialize_utils.utils_pipeline(with_hll=True,with_cstore_fdw=True))
有关可用的函数,请查看etl_tools/initialize_utils中的.sql文件。
模式复制
文件etl_tools/schema_copying.py包含函数add_schema_copying_to_pipeline
,该函数在管道运行结束时将postgresql数据库架构从主机复制到另一个主机。这对于在不同的数据库服务器上运行etl和前端工具非常有用,这样运行的etl不会影响仪表板查询的性能。
假设有一个piplinemy_pipeline
有许多子管道,并且标签设置为要复制的相应架构,那么这就是如何将架构复制添加到这些子管道的方法。
frommara_dbimportdbsfromdata_integration.commands.sqlimportExecuteSQLfromdata_integration.pipelinesimportTaskfrometl_tools.schema_copyingimportadd_schema_copying_to_pipeline# when etl und frontend db are different, add schema copyingifdbs.db('mdwh-etl').database!=dbs.db('mdwh-frontend').database \ ordbs.db('mdwh-etl').host!=dbs.db('mdwh-frontend').host:# run some of the files from etl_tools/initalize_utils in frontend dbinitialize_frontend_db_commands=[ExecuteSQL(sql_statement="DROP SCHEMA IF EXISTS util CASCADE; CREATE SCHEMA util;",db_alias='mdwh-frontend')]forfile_namein['schema_switching.sql','data_sets.sql','hll.sql','cstore_fdw.sql']:initialize_frontend_db_commands.append(ExecuteSQL(sql_file_name=str(my_pipeline.nodes['utils'].nodes['initialize_utils'].base_path()/file_name),db_alias='mdwh-frontend'))my_pipeline.nodes['utils'].add(Task(id='initialize_frontend_db',description='Adds some functions to the frontend db so that schema copying works',commands=initialize_frontend_db_commands))# Add schema copying for time schemaadd_schema_copying_to_pipeline(pipeline=my_pipeline.nodes['utils'].nodes['create_time_dimensions'],schema_name='time',source_db_alias='dwh-etl',target_db_alias='dwh-frontend')# Add schema copying to all root pipelinesforpipelineinmy_pipeline.nodes.values():if"Schema"inpipeline.labels:schema=pipeline.labels['Schema']add_schema_copying_to_pipeline(pipeline=pipeline,schema_name=schema+'_next',source_db_alias='dwh-etl',target_db_alias='dwh-frontend')pipeline.final_node.commands_after.append(ExecuteSQL(sql_statement=f"SELECT util.replace_schema('{schema}', '{schema}_next')",db_alias='mdwh-frontend'))
时间维度
文件etl_tools/create_time_dimensions/init.py定义了一个管道,该管道使用表day
和duration
创建并更新time
架构:
select * from time.day order by _date desc limit 10;
day_id | day_name | year_id | iso_year_id | quarter_id | quarter_name | month_id | month_name | week_id | week_name | day_of_week_id | day_of_week_name | day_of_month_id | _date
----------+------------------+---------+-------------+------------+--------------+----------+------------+---------+--------------+----------------+------------------+-----------------+------------
20190815 | Thu, Aug 15 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201933 | 2019 - CW 33 | 4 | Thursday | 15 | 2019-08-15
20190814 | Wed, Aug 14 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201933 | 2019 - CW 33 | 3 | Wednesday | 14 | 2019-08-14
20190813 | Tue, Aug 13 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201933 | 2019 - CW 33 | 2 | Tuesday | 13 | 2019-08-13
20190812 | Mon, Aug 12 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201933 | 2019 - CW 33 | 1 | Monday | 12 | 2019-08-12
20190811 | Sun, Aug 11 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 7 | Sunday | 11 | 2019-08-11
20190810 | Sat, Aug 10 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 6 | Saturday | 10 | 2019-08-10
20190809 | Fri, Aug 09 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 5 | Friday | 9 | 2019-08-09
20190808 | Thu, Aug 08 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 4 | Thursday | 8 | 2019-08-08
20190807 | Wed, Aug 07 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 3 | Wednesday | 7 | 2019-08-07
20190806 | Tue, Aug 06 2019 | 2019 | 2019 | 20193 | 2019 Q3 | 201908 | 2019 Aug | 201932 | 2019 - CW 32 | 2 | Tuesday | 6 | 2019-08-06
select * from time.duration where duration_id >= 0 order by duration_id limit 10;
duration_id | days | days_name | weeks | weeks_name | four_weeks | four_weeks_name | months | months_name | sixth_years | sixth_years_name | half_years | half_years_name | years | years_name
-------------+------+-----------+-------+------------+------------+-----------------+--------+-------------+-------------+------------------+------------+-----------------+-------+------------
0 | 0 | 0 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
1 | 1 | 1 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
2 | 2 | 2 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
3 | 3 | 3 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
4 | 4 | 4 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
5 | 5 | 5 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
6 | 6 | 6 days | 0 | 0-6 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
7 | 7 | 7 days | 1 | 7-13 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
8 | 8 | 8 days | 1 | 7-13 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
9 | 9 | 9 days | 1 | 7-13 days | 0 | 0-27 days | 0 | 0-29 days | 0 | 0-59 days | 0 | 0-179 days | 0 | 0-359 days
使用
from etl_tools import create_time_dimensions my_pipeline.add(create_time_dimensions.pipeline)
通过覆盖etl_tools/config.py中的first_date_in_time_dimensions
和last_date_in_time_dimensions
来设置最小和最大日期。
欧元汇率
文件etl_tools/load_euro_exchange_rates/init.py包含一个从欧洲中央银行加载(历史)欧元汇率的管道。
使用
from etl_tools import load_euro_exchange_rates my_pipeline.add(load_euro_exchange_rates.euro_exchange_rates_pipeline('db-alias'))