在aws数据管道上计划的任务执行

pipewelder的Python项目详细描述


A worker welding a pipe

pipewelder是一个提供命令行工具和python的框架 从平面文件管理AWS Data Pipeline作业的api。 simple将其用作类似cron的作业调度程序。

来源
https://github.com/SimpleFinance/pipewelder
文档
http://pipewelder.readthedocs.org
pypi
https://pypi.python.org/pypi/pipewelder

概述

Pipewelder旨在通过定义 简单的管道只不过是一个执行计划, 将大部分执行逻辑卸载到s3中的文件。管焊机使用 数据管道的概念data staging 在执行开始时从s3中提取输入文件并上载 在执行结束时将文件输出回s3。

如果遵循pipewelder的目录结构,则所有管道 逻辑可以存在于版本控制的平面文件中。包括在内的 命令行界面提供简单的命令来验证 管道定义,将任务定义上载到s3,并激活 管道。

安装

管道焊机可从PyPI通过 pip,与python 2.6、2.7、3.3和3.4兼容:

pip install pipewelder

最简单的方法是从github复制项目copy pipewelder测试的示例项目,然后进行修改以适应:

git clone https://github.com/SimpleFinance/pipewelder.git
cp -r pipewelder/tests/test_data my-pipewelder-project

如果您正在安装管道焊接机并需要帮助,请随时发送电子邮件至 作者。

开发

要在pipewelder上进行开发,请克隆存储库并运行make 安装依赖项并运行测试。

目录结构

要使用pipewelder,请提供模板管道定义和 对应于特定管道的一个或多个目录 实例。目录结构如下所示(请参见 test_data对于一个工作示例:

pipeline_definition.json
pipewelder.json <- optional configuration file
my_first_pipeline/
    run
    values.json
    tasks/
        task1.sh
        task2.sh
my_second_pipeline/
...

每个管道目录中的values.json文件指定参数 使用的值修改模板定义,包括s3 输入、输出和日志的路径。其中一些值被使用 也可直接由管道焊工操作。

一个 `ShellCommandActivity<;http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-shellcommandactivity.html>;。`_ 在模板定义中,只需查找名为 run并执行它。run是任何工作的入口点 你想让你的管道去做。

通常,您的run可执行文件将是执行 各种类似的任务。如果是这样,请使用tasks 保存这些定义的子目录。这些任务可以是文本文件, shell脚本、sql代码,或者您的run文件所需要的其他任何东西。 pipewelder给予tasks文件夹特殊处理,因为cli将 确保在上传文件时删除现有的任务定义。

使用命令行界面

应该始终从顶级目录调用pipewelder cli 您的定义的目录(其中pipeline_definition.json 生命)。如果你的目录结构符合Pipewelder的期望, 它不需要进一步配置就可以工作。

更改模板定义或^{tt3}时$ 文件中,可以检查aws是否考虑您的定义 有效期:

$ pipewelder validate

一旦定义了管道,就需要将文件上载到 S3:

$ pipewelder upload

最后,激活您的管道:

$ pipewelder activate

任何时候更改values.jsonpipeline_definition.json, 您需要再次运行activate子命令。因为活跃 无法修改管道,activate命令将删除 现有管道,并在它的位置创建一个新的管道。的运行历史 以前的管道将被丢弃。

致谢

管焊机的封装结构基于 python-project-template

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
正则表达式使用Java从服务器截断文本   micronaut微服务的java内存消耗   如果私有函数需要相同的输入,java应该在公共函数中显式执行异常检查   为什么我们在java中使用抽象类和抽象方法   Java中接受外来字母的字符串?   cordova Android:ClassNotFoundException,包括ZXing   通过LiveData observer向特定索引添加项时出现java IndexOutOfBoundsException   jsp Java从两个源调用一个servlet   java如何设置网格布局中按钮的位置?   java HashMap返回方法   java JDK错误版本   java如何将现有类集成到新的Swing项目中   java如何在扫描程序位于输入端时使for循环停止   java正则表达式匹配空白表   java组织。格拉德尔。工具。BuildException:设置的代码长度无效   JList中的swing Java格式化字符串   javabeans如何将JavaBean属性映射到另一个名称以进行输出?   ajax请求后的java Rerender RichFaces错误消息