自以为是的轻量级etl管道框架

data-integration的Python项目详细描述


MARA数据集成

Build StatusPyPI - LicensePyPI versionSlack Status

这个包包含一个轻量级ETL框架,重点是透明性和复杂性降低。它有许多成熟的假设/原则:

  • 数据集成管道作为代码:管道、任务和命令是使用声明性python代码创建的。

  • PostgreSQL作为数据处理引擎。

  • 广泛的网络用户界面。web浏览器作为管道检测、运行和调试的主要工具。

  • GNU生成语义。节点依赖于上游节点的完成。没有数据依赖关系或数据流。

  • 无应用内数据处理:命令行工具是与数据库和数据交互的主要工具。

  • 基于python的multiprocessing的单机管道执行。不需要分布式任务队列。易于调试和输出日志记录。

  • 基于成本的优先级队列:首先运行成本较高的节点(基于记录的运行时间)。

安装

要直接使用库,请使用pip:

pip install data-integration

pip install git+https://github.com/mara/data-integration.git

对于集成到flask应用程序中的示例,请查看mara example project

示例

这里是一个管道“演示”,由三个相互依赖的节点组成:任务ping_localhost、管道sub_pipeline和任务sleep

fromdata_integration.commands.bashimportRunBashfromdata_integration.pipelinesimportPipeline,Taskfromdata_integration.ui.cliimportrun_pipeline,run_interactivelypipeline=Pipeline(id='demo',description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')pipeline.add(Task(id='ping_localhost',description='Pings localhost',commands=[RunBash('ping -c 3 localhost')]))sub_pipeline=Pipeline(id='sub_pipeline',description='Pings a number of hosts')forhostin['google','amazon','facebook']:sub_pipeline.add(Task(id=f'ping_{host}',description=f'Pings {host}',commands=[RunBash(f'ping -c 3 {host}.com')]))sub_pipeline.add_dependency('ping_amazon','ping_facebook')sub_pipeline.add(Task(id='ping_foo',description='Pings foo',commands=[RunBash('ping foo')]),['ping_amazon'])pipeline.add(sub_pipeline,['ping_localhost'])pipeline.add(Task(id='sleep',description='Sleeps for 2 seconds',commands=[RunBash('sleep 2')]),['sub_pipeline'])

任务包含执行实际工作的命令列表(在本例中运行ping各种主机的bash命令)。

为了运行管道,需要配置postgresql数据库来存储运行时信息、运行输出和增量处理的状态:

importmara_db.auto_migrationimportmara_db.configimportmara_db.dbsmara_db.config.databases \
    =lambda:{'mara':mara_db.dbs.PostgreSQLDB(host='localhost',user='root',database='example_etl_mara')}mara_db.auto_migration.auto_discover_models_and_migrate()

假设postgressql正在运行并且凭据正常工作,则输出如下所示(创建一个包含多个表的数据库):

Created database "postgresql+psycopg2://root@localhost/example_etl_mara"

CREATE TABLE data_integration_file_dependency (
    node_path TEXT[] NOT NULL, 
    dependency_type VARCHAR NOT NULL, 
    hash VARCHAR, 
    timestamp TIMESTAMP WITHOUT TIME ZONE, 
    PRIMARY KEY (node_path, dependency_type)
);

.. more tables

客户端用户界面

这将运行一个输出到stdout的管道:

fromdata_integration.ui.cliimportrun_pipelinerun_pipeline(pipeline)

Example run cli 1

它运行管道的单个节点sub_pipeline,以及它所依赖的所有节点:

run_pipeline(sub_pipeline,nodes=[sub_pipeline.nodes['ping_amazon']],with_upstreams=True)

Example run cli 2

最后,还有一种基于pythondialog的菜单,允许导航和运行这样的管道:

fromdata_integration.ui.cliimportrun_interactivelyrun_interactively()

Example run cli 3

网络用户界面

更重要的是,这个包提供了一个广泛的web界面。它可以很容易地集成到任何基于Flask的应用程序中,并且mara example project演示了如何使用mara-app实现这一点。

对于每个管道,都有一个显示

  • 所有子节点及其依赖关系的图
  • 管道的总体运行时间图表以及过去30天内最昂贵的节点(可配置)
  • 所有管道节点及其平均运行时间和由此产生的队列优先级的表
  • 管道最后一次运行的输出和时间线

Mara data integration web ui 1

对于每个任务,都有一个页面显示

  • 管道中任务的上游和下游
  • 过去30天内任务的运行时间
  • 任务的所有命令
  • 任务最后一次运行的输出

Mara data integration web ui 2

管道和任务可以直接从Web UI运行,这可能是此软件包的主要功能之一:

Example run web ui

入门

文档目前正在处理中。请使用mara example project作为入门参考。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
如果字段相同,java共享对象的单个实例   if语句在从参数(JAVA)获取getDiscount()时遇到问题   使用JanusGraph Java API和HBase时出现临时BackendException   java读取嵌套的yaml值而不传入根对象   java为什么我不能用这段代码从链表中删除第一个节点?   javascript为什么验证错误在旧的情况下会持续?   java如何在JSch SFTP上重新发布?   使用java将json转换为xml   java如何将EditText的值放入对话框   java Hibernate:与EmbeddedID重复的getter/setter?   java如何使用TestNG和Maven在套件中的测试类之间共享状态?   java 安卓 studio游戏开发   推荐用于Java编码的Vim插件?   java定期关闭文件   java删除字符串中每5个字符并返回新字符串   如何在Java中使用父目录中的类?   java如何在ArrayList中使用索引添加多个对象   sbt java。util。MissingResourceException:找不到com。太阳xml。内部的信息。萨杰。肥皂LocalStrings包