自以为是的轻量级etl管道框架
data-integration的Python项目详细描述
MARA数据集成
这个包包含一个轻量级ETL框架,重点是透明性和复杂性降低。它有许多成熟的假设/原则:
数据集成管道作为代码:管道、任务和命令是使用声明性python代码创建的。
PostgreSQL作为数据处理引擎。
广泛的网络用户界面。web浏览器作为管道检测、运行和调试的主要工具。
GNU生成语义。节点依赖于上游节点的完成。没有数据依赖关系或数据流。
无应用内数据处理:命令行工具是与数据库和数据交互的主要工具。
基于python的multiprocessing的单机管道执行。不需要分布式任务队列。易于调试和输出日志记录。
基于成本的优先级队列:首先运行成本较高的节点(基于记录的运行时间)。
安装
要直接使用库,请使用pip:
pip install data-integration
或
pip install git+https://github.com/mara/data-integration.git
对于集成到flask应用程序中的示例,请查看mara example project。
示例
这里是一个管道“演示”,由三个相互依赖的节点组成:任务ping_localhost
、管道sub_pipeline
和任务sleep
:
fromdata_integration.commands.bashimportRunBashfromdata_integration.pipelinesimportPipeline,Taskfromdata_integration.ui.cliimportrun_pipeline,run_interactivelypipeline=Pipeline(id='demo',description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')pipeline.add(Task(id='ping_localhost',description='Pings localhost',commands=[RunBash('ping -c 3 localhost')]))sub_pipeline=Pipeline(id='sub_pipeline',description='Pings a number of hosts')forhostin['google','amazon','facebook']:sub_pipeline.add(Task(id=f'ping_{host}',description=f'Pings {host}',commands=[RunBash(f'ping -c 3 {host}.com')]))sub_pipeline.add_dependency('ping_amazon','ping_facebook')sub_pipeline.add(Task(id='ping_foo',description='Pings foo',commands=[RunBash('ping foo')]),['ping_amazon'])pipeline.add(sub_pipeline,['ping_localhost'])pipeline.add(Task(id='sleep',description='Sleeps for 2 seconds',commands=[RunBash('sleep 2')]),['sub_pipeline'])
任务包含执行实际工作的命令列表(在本例中运行ping各种主机的bash命令)。
为了运行管道,需要配置postgresql数据库来存储运行时信息、运行输出和增量处理的状态:
importmara_db.auto_migrationimportmara_db.configimportmara_db.dbsmara_db.config.databases \ =lambda:{'mara':mara_db.dbs.PostgreSQLDB(host='localhost',user='root',database='example_etl_mara')}mara_db.auto_migration.auto_discover_models_and_migrate()
假设postgressql正在运行并且凭据正常工作,则输出如下所示(创建一个包含多个表的数据库):
Created database "postgresql+psycopg2://root@localhost/example_etl_mara"
CREATE TABLE data_integration_file_dependency (
node_path TEXT[] NOT NULL,
dependency_type VARCHAR NOT NULL,
hash VARCHAR,
timestamp TIMESTAMP WITHOUT TIME ZONE,
PRIMARY KEY (node_path, dependency_type)
);
.. more tables
客户端用户界面
这将运行一个输出到stdout的管道:
fromdata_integration.ui.cliimportrun_pipelinerun_pipeline(pipeline)
它运行管道的单个节点sub_pipeline
,以及它所依赖的所有节点:
run_pipeline(sub_pipeline,nodes=[sub_pipeline.nodes['ping_amazon']],with_upstreams=True)
最后,还有一种基于pythondialog的菜单,允许导航和运行这样的管道:
fromdata_integration.ui.cliimportrun_interactivelyrun_interactively()
网络用户界面
更重要的是,这个包提供了一个广泛的web界面。它可以很容易地集成到任何基于Flask的应用程序中,并且mara example project演示了如何使用mara-app实现这一点。
对于每个管道,都有一个显示
- 所有子节点及其依赖关系的图
- 管道的总体运行时间图表以及过去30天内最昂贵的节点(可配置)
- 所有管道节点及其平均运行时间和由此产生的队列优先级的表
- 管道最后一次运行的输出和时间线
对于每个任务,都有一个页面显示
- 管道中任务的上游和下游
- 过去30天内任务的运行时间
- 任务的所有命令
- 任务最后一次运行的输出
管道和任务可以直接从Web UI运行,这可能是此软件包的主要功能之一:
入门
文档目前正在处理中。请使用mara example project作为入门参考。