自以为是的轻量级ELT管道框架
mara-pipelines的Python项目详细描述
马拉管道
这个包包含一个轻量级的数据转换框架,重点是透明性和复杂性的降低。它有许多现成的假设/原则:
- 在
作为代码的数据集成管道:管道、任务和命令是使用声明性Python代码创建的。在
在 - 在
PostgreSQL作为数据处理引擎。在
在 - 在
广泛的web用户界面。web浏览器作为检查、运行和调试管道的主要工具。在
在 - 在
GNU生成语义。节点依赖于上游节点的完成。没有数据依赖关系或数据流。在
在 - 在
无应用内数据处理:命令行工具作为与数据库和数据交互的主要工具。在
在 - 在
基于Python的multiprocessing的单机管道执行。不需要分布式任务队列。易于调试和输出日志记录。在
在 - 在
基于成本的优先级队列:首先运行成本较高的节点(基于记录的运行时间)。在
在
在
安装
要直接使用库,请使用pip:
pip install mara-pipelines
或者
^{pr2}$对于一个集成到flask应用程序的示例,请看一下mara example project 1和{a7}。在
由于大量使用分叉,Mara管道不能在Windows上运行。如果您想在Windows上运行它,请使用Docker或Windows Subsystem for Linux。在
在
示例
下面是一个由三个相互依赖的节点组成的管道“demo”:任务ping_localhost
、管道{
frommara_pipelines.commands.bashimportRunBashfrommara_pipelines.pipelinesimportPipeline,Taskfrommara_pipelines.ui.cliimportrun_pipeline,run_interactivelypipeline=Pipeline(id='demo',description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')pipeline.add(Task(id='ping_localhost',description='Pings localhost',commands=[RunBash('ping -c 3 localhost')]))sub_pipeline=Pipeline(id='sub_pipeline',description='Pings a number of hosts')forhostin['google','amazon','facebook']:sub_pipeline.add(Task(id=f'ping_{host}',description=f'Pings {host}',commands=[RunBash(f'ping -c 3 {host}.com')]))sub_pipeline.add_dependency('ping_amazon','ping_facebook')sub_pipeline.add(Task(id='ping_foo',description='Pings foo',commands=[RunBash('ping foo')]),['ping_amazon'])pipeline.add(sub_pipeline,['ping_localhost'])pipeline.add(Task(id='sleep',description='Sleeps for 2 seconds',commands=[RunBash('sleep 2')]),['sub_pipeline'])
任务包含执行实际工作的命令列表(在本例中,运行的是ping不同主机的bash命令)。在
在
为了运行管道,需要配置PostgreSQL数据库来存储运行时信息、运行输出和增量处理状态:
importmara_db.auto_migrationimportmara_db.configimportmara_db.dbsmara_db.config.databases \ =lambda:{'mara':mara_db.dbs.PostgreSQLDB(host='localhost',user='root',database='example_etl_mara')}mara_db.auto_migration.auto_discover_models_and_migrate()
假定postgresql正在运行并且凭据有效,那么输出如下所示(创建一个包含多个表的数据库):
Created database "postgresql+psycopg2://root@localhost/example_etl_mara"
CREATE TABLE data_integration_file_dependency (
node_path TEXT[] NOT NULL,
dependency_type VARCHAR NOT NULL,
hash VARCHAR,
timestamp TIMESTAMP WITHOUT TIME ZONE,
PRIMARY KEY (node_path, dependency_type)
);
.. more tables
CLI用户界面
这将运行一个输出到stdout的管道:
frommara_pipelines.ui.cliimportrun_pipelinerun_pipeline(pipeline)
在
这将运行管道sub_pipeline
的单个节点及其所依赖的所有节点:
run_pipeline(sub_pipeline,nodes=[sub_pipeline.nodes['ping_amazon']],with_upstreams=True)
在
最后,还有一种基于pythondialog的菜单,可以像这样导航和运行管道:
frommara_pipelines.ui.cliimportrun_interactivelyrun_interactively()
Web用户界面
更重要的是,这个包提供了一个广泛的web界面。它可以很容易地集成到任何基于Flask的应用程序中,mara example project演示了如何使用mara-app实现这一点。在
对于每个管道,都有一个页面显示
- 所有子节点及其之间的依赖关系的图
- 过去30天内管道的总体运行时间及其最昂贵节点的图表(可配置)
- 所有管道节点及其平均运行时间和结果队列优先级的表
- 管道最后一次运行的输出和时间线
每一页都有一个任务
- 管道中任务的上游和下游
- 过去30天内任务的运行时间
- 任务的所有命令
- 任务最后一次运行的输出
管道和任务可以直接从web ui运行,这可能是此包的主要功能之一:
在
入门
目前正在编制文件。请使用mara example project 1和mara example project 2作为入门参考。在
- 项目
标签: