使用mara创建etl管道的实用程序

mara-etl-tools的Python项目详细描述


mara etl工具

Build StatusPyPI - LicensePyPI versionSlack Status

围绕Project A使用mara创建data integration管道的最佳实践的实用程序集合。该包旨在作为新项目的开始。叉子/副本优先于prs。

有关如何使用此包的详细信息,请查看mara example project

该软件包由多个模块组成,所有模块都可以独立使用:

SQL实用程序函数

etl_tools/initialize_utils/init.py中的函数initialize_utils返回一个管道,该管道使用许多postgresql函数创建一个util架构,用于组织数据管道。像这样添加到根管道:

frometl_toolsimportinitialize_utilsmy_pipeline.add(initialize_utils.utils_pipeline(with_hll=True,with_cstore_fdw=True))

有关可用的函数,请查看etl_tools/initialize_utils中的.sql文件。

模式复制

文件etl_tools/schema_copying.py包含函数add_schema_copying_to_pipeline,该函数在管道运行结束时将postgresql数据库架构从主机复制到另一个主机。这对于在不同的数据库服务器上运行etl和前端工具非常有用,这样运行的etl不会影响仪表板查询的性能。

假设有一个piplinemy_pipeline有许多子管道,并且标签设置为要复制的相应架构,那么这就是如何将架构复制添加到这些子管道的方法。

frommara_dbimportdbsfromdata_integration.commands.sqlimportExecuteSQLfromdata_integration.pipelinesimportTaskfrometl_tools.schema_copyingimportadd_schema_copying_to_pipeline# when etl und frontend db are different, add schema copyingifdbs.db('mdwh-etl').database!=dbs.db('mdwh-frontend').database \
        ordbs.db('mdwh-etl').host!=dbs.db('mdwh-frontend').host:# run some of the files from etl_tools/initalize_utils in frontend dbinitialize_frontend_db_commands=[ExecuteSQL(sql_statement="DROP SCHEMA IF EXISTS util CASCADE; CREATE SCHEMA util;",db_alias='mdwh-frontend')]forfile_namein['schema_switching.sql','data_sets.sql','hll.sql','cstore_fdw.sql']:initialize_frontend_db_commands.append(ExecuteSQL(sql_file_name=str(my_pipeline.nodes['utils'].nodes['initialize_utils'].base_path()/file_name),db_alias='mdwh-frontend'))my_pipeline.nodes['utils'].add(Task(id='initialize_frontend_db',description='Adds some functions to the frontend db so that schema copying works',commands=initialize_frontend_db_commands))# Add schema copying for time schemaadd_schema_copying_to_pipeline(pipeline=my_pipeline.nodes['utils'].nodes['create_time_dimensions'],schema_name='time',source_db_alias='dwh-etl',target_db_alias='dwh-frontend')# Add schema copying to all root pipelinesforpipelineinmy_pipeline.nodes.values():if"Schema"inpipeline.labels:schema=pipeline.labels['Schema']add_schema_copying_to_pipeline(pipeline=pipeline,schema_name=schema+'_next',source_db_alias='dwh-etl',target_db_alias='dwh-frontend')pipeline.final_node.commands_after.append(ExecuteSQL(sql_statement=f"SELECT util.replace_schema('{schema}', '{schema}_next')",db_alias='mdwh-frontend'))

时间维度

文件etl_tools/create_time_dimensions/init.py定义了一个管道,该管道使用表dayduration创建并更新time架构:

select * from time.day order by _date desc limit 10;
     day_id  |     day_name     | year_id | iso_year_id | quarter_id | quarter_name | month_id | month_name | week_id |  week_name   | day_of_week_id | day_of_week_name | day_of_month_id |   _date    
   ----------+------------------+---------+-------------+------------+--------------+----------+------------+---------+--------------+----------------+------------------+-----------------+------------
    20190815 | Thu, Aug 15 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201933 | 2019 - CW 33 |              4 | Thursday         |              15 | 2019-08-15
    20190814 | Wed, Aug 14 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201933 | 2019 - CW 33 |              3 | Wednesday        |              14 | 2019-08-14
    20190813 | Tue, Aug 13 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201933 | 2019 - CW 33 |              2 | Tuesday          |              13 | 2019-08-13
    20190812 | Mon, Aug 12 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201933 | 2019 - CW 33 |              1 | Monday           |              12 | 2019-08-12
    20190811 | Sun, Aug 11 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201932 | 2019 - CW 32 |              7 | Sunday           |              11 | 2019-08-11
    20190810 | Sat, Aug 10 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201932 | 2019 - CW 32 |              6 | Saturday         |              10 | 2019-08-10
    20190809 | Fri, Aug 09 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201932 | 2019 - CW 32 |              5 | Friday           |               9 | 2019-08-09
    20190808 | Thu, Aug 08 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201932 | 2019 - CW 32 |              4 | Thursday         |               8 | 2019-08-08
    20190807 | Wed, Aug 07 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201932 | 2019 - CW 32 |              3 | Wednesday        |               7 | 2019-08-07
    20190806 | Tue, Aug 06 2019 |    2019 |        2019 |      20193 | 2019 Q3      |   201908 | 2019 Aug   |  201932 | 2019 - CW 32 |              2 | Tuesday          |               6 | 2019-08-06
select * from time.duration where duration_id >= 0 order by duration_id limit 10;
 duration_id | days | days_name | weeks | weeks_name | four_weeks | four_weeks_name | months | months_name | sixth_years | sixth_years_name | half_years | half_years_name | years | years_name 
-------------+------+-----------+-------+------------+------------+-----------------+--------+-------------+-------------+------------------+------------+-----------------+-------+------------
           0 |    0 | 0 days    |     0 | 0-6 days   |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days
           1 |    1 | 1 days    |     0 | 0-6 days   |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days
           2 |    2 | 2 days    |     0 | 0-6 days   |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days
           3 |    3 | 3 days    |     0 | 0-6 days   |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days
           4 |    4 | 4 days    |     0 | 0-6 days   |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days
           5 |    5 | 5 days    |     0 | 0-6 days   |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days
           6 |    6 | 6 days    |     0 | 0-6 days   |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days
           7 |    7 | 7 days    |     1 | 7-13 days  |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days
           8 |    8 | 8 days    |     1 | 7-13 days  |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days
           9 |    9 | 9 days    |     1 | 7-13 days  |          0 | 0-27 days       |      0 | 0-29 days   |           0 | 0-59 days        |          0 | 0-179 days      |     0 | 0-359 days

使用

from etl_tools import create_time_dimensions

my_pipeline.add(create_time_dimensions.pipeline)

通过覆盖etl_tools/config.py中的first_date_in_time_dimensionslast_date_in_time_dimensions来设置最小和最大日期。

欧元汇率

文件etl_tools/load_euro_exchange_rates/init.py包含一个从欧洲中央银行加载(历史)欧元汇率的管道。

使用

from etl_tools import load_euro_exchange_rates

my_pipeline.add(load_euro_exchange_rates.euro_exchange_rates_pipeline('db-alias'))

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
c#Java Tcp服务器和。Net Tcp客户端的发送和接收问题   安卓应用程序上的java标记地理位置,其位置位于我周围5Km半径范围内。   向java添加对话框并检索html文件   当eclipse甚至无法打开时,java会在eclipse中更改不兼容的JVM   java中同一jframe中的jlabel和paintComponent   基于另一数组排序的java排序   java AADSTS7000012:该补助金是为另一个租户获得的   java在JSF中使用foreach循环   java如何通过maven为运行junit测试创建运行配置?   java Selenium webDriver不稳定错误堆栈跟踪   java有没有办法创建以键为大写的JSON对象?