自以为是的轻量级ELT管道框架

mara-pipelines的Python项目详细描述


马拉管道

Build StatusPyPI - LicensePyPI versionSlack Status

这个包包含一个轻量级的数据转换框架,重点是透明性和复杂性的降低。它有许多现成的假设/原则:

  • 作为代码的数据集成管道:管道、任务和命令是使用声明性Python代码创建的。在

  • PostgreSQL作为数据处理引擎。在

  • 广泛的web用户界面。web浏览器作为检查、运行和调试管道的主要工具。在

  • GNU生成语义。节点依赖于上游节点的完成。没有数据依赖关系或数据流。在

  • 无应用内数据处理:命令行工具作为与数据库和数据交互的主要工具。在

  • 基于Python的multiprocessing的单机管道执行。不需要分布式任务队列。易于调试和输出日志记录。在

  • 基于成本的优先级队列:首先运行成本较高的节点(基于记录的运行时间)。在

安装

要直接使用库,请使用pip:

pip install mara-pipelines

或者

^{pr2}$

对于一个集成到flask应用程序的示例,请看一下mara example project 1和{a7}。在

由于大量使用分叉,Mara管道不能在Windows上运行。如果您想在Windows上运行它,请使用Docker或Windows Subsystem for Linux。在

示例

下面是一个由三个相互依赖的节点组成的管道“demo”:任务ping_localhost、管道{}和任务{}:

frommara_pipelines.commands.bashimportRunBashfrommara_pipelines.pipelinesimportPipeline,Taskfrommara_pipelines.ui.cliimportrun_pipeline,run_interactivelypipeline=Pipeline(id='demo',description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')pipeline.add(Task(id='ping_localhost',description='Pings localhost',commands=[RunBash('ping -c 3 localhost')]))sub_pipeline=Pipeline(id='sub_pipeline',description='Pings a number of hosts')forhostin['google','amazon','facebook']:sub_pipeline.add(Task(id=f'ping_{host}',description=f'Pings {host}',commands=[RunBash(f'ping -c 3 {host}.com')]))sub_pipeline.add_dependency('ping_amazon','ping_facebook')sub_pipeline.add(Task(id='ping_foo',description='Pings foo',commands=[RunBash('ping foo')]),['ping_amazon'])pipeline.add(sub_pipeline,['ping_localhost'])pipeline.add(Task(id='sleep',description='Sleeps for 2 seconds',commands=[RunBash('sleep 2')]),['sub_pipeline'])

任务包含执行实际工作的命令列表(在本例中,运行的是ping不同主机的bash命令)。在

为了运行管道,需要配置PostgreSQL数据库来存储运行时信息、运行输出和增量处理状态:

importmara_db.auto_migrationimportmara_db.configimportmara_db.dbsmara_db.config.databases \
    =lambda:{'mara':mara_db.dbs.PostgreSQLDB(host='localhost',user='root',database='example_etl_mara')}mara_db.auto_migration.auto_discover_models_and_migrate()

假定postgresql正在运行并且凭据有效,那么输出如下所示(创建一个包含多个表的数据库):

Created database "postgresql+psycopg2://root@localhost/example_etl_mara"

CREATE TABLE data_integration_file_dependency (
    node_path TEXT[] NOT NULL, 
    dependency_type VARCHAR NOT NULL, 
    hash VARCHAR, 
    timestamp TIMESTAMP WITHOUT TIME ZONE, 
    PRIMARY KEY (node_path, dependency_type)
);

.. more tables

CLI用户界面

这将运行一个输出到stdout的管道:

frommara_pipelines.ui.cliimportrun_pipelinerun_pipeline(pipeline)

Example run cli 1

这将运行管道sub_pipeline的单个节点及其所依赖的所有节点:

run_pipeline(sub_pipeline,nodes=[sub_pipeline.nodes['ping_amazon']],with_upstreams=True)

Example run cli 2

最后,还有一种基于pythondialog的菜单,可以像这样导航和运行管道:

frommara_pipelines.ui.cliimportrun_interactivelyrun_interactively()

Example run cli 3

Web用户界面

更重要的是,这个包提供了一个广泛的web界面。它可以很容易地集成到任何基于Flask的应用程序中,mara example project演示了如何使用mara-app实现这一点。在

对于每个管道,都有一个页面显示

  • 所有子节点及其之间的依赖关系的图
  • 过去30天内管道的总体运行时间及其最昂贵节点的图表(可配置)
  • 所有管道节点及其平均运行时间和结果队列优先级的表
  • 管道最后一次运行的输出和时间线

Mara pipelines web ui 1

每一页都有一个任务

  • 管道中任务的上游和下游
  • 过去30天内任务的运行时间
  • 任务的所有命令
  • 任务最后一次运行的输出

Mara pipelines web ui 2

管道和任务可以直接从web ui运行,这可能是此包的主要功能之一:

Example run web ui

入门

目前正在编制文件。请使用mara example project 1mara example project 2作为入门参考。在

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java为什么@DELETE REST不起作用?   带有JPA2的java表值参数。1和Hibernate,Sql Server   如何将Java类添加到Xamarin VS2017项目   绘制多边形时出现java空指针异常   java Apache WebClient 303状态未重定向   java如何用一组字符串数组从数据库中获取数据   java是否可以使用Google Drive API向文件中添加脚本?   java组织。阿帕奇。贾斯珀。JspC jar文件下载   java在整个JSON映射中将单个值作为JSON流   通过命令行将文件输入到java   java rs.next()总是返回false   java标记异常,通知调用方利用异常消息   java Spring YML数组属性为空