Elkhound工作流引擎

elkhound的Python项目详细描述


elkhound是一个固执己见的、以数据为中心的工作流引擎。它对您的项目做出以下假设:

  • 项目工作流可以分为几个任务,每个任务都有明确定义的输入和输出数据文件,理想情况下没有副作用。任务形成有向无环图(即无循环)。
  • 每个数据文件都有明确定义的格式和模式。连续任务之间的信息主要通过文件传输。csv或gzipped csv文件的首选项。

Elkhound将通过以下方式帮助您:

  • versioning通过给数据文件加上时间戳(任务读取带有最新时间戳的输入文件,写入带有当前时间戳的输出文件)。
  • 支持大数据文件,提供方便的跨行模式迭代器(使用python生成器),降低内存占用。
  • 管理任务的工作流,轻松运行任意任务列表。
  • 由于通过数据文件和数据文件版本控制进行任务间通信,中间结果的自动检查点
  • 通过将配置文件的内容和命令行参数作为任务的上下文(减少硬编码常量的诱惑)来管理项目的参数。
  • 记录工作流执行、报告上下文和执行进度。在一个地方自动收集和归档来自不同运行的日志,以便于重复性。
  • 通过将可模拟的数据文件对象作为任务类的输入和输出来促进单元测试

为了运行elkhound工作流,需要创建 引擎配置文件 并在Task子类中实现业务逻辑。

发动机配置

引擎配置文件(在我们的示例中,我们将其命名为engine.yaml) 有三个部分:

  • specs,在这里您将描述数据文件(名称、格式、模式;这些文件是某些任务的输出,其他任务的输入),
  • tasks,在这里您将指向业务逻辑的实现。任务定义数据文件的转换(如何在给定输入文件x和y的情况下构建输出文件z)
  • workflows,在这里您将捆绑多组目标数据文件。

数据文件是埃尔克霍德的一等公民。 它们由四位数字代码(例如1230211043155214)标识。 新工作流的设计应该从注册开始 数据文件,并可选地定义其架构(如果适用)。 以下是在引擎配置文件中定义的数据文件示例:

specs:-code:1230name:peopleextension:csv.gzflags:-gzippedschema:-name:nametype:str-name:dobtype:datetime-name:is_employeetype:bool-code:2110name:budgetextension:xlsxflags:-binary-code:4315name:reportextension:txt-code:5214name:plotsextension:dirflags:-directory

任务是python类,在输入时接受零个或多个数据文件 并在输出时生成零个或多个数据文件。 每个任务类必须实现三种方法:

  • get_input_data_file_codes(self)返回输入数据文件代码列表,
  • get_output_data_file_codes(self)返回输出数据文件代码列表,
  • run(self, input_files, output_files, context)执行业务逻辑。

下面是在引擎配置文件中注册的任务示例:

tasks:-class:myapp.DownloadDataTask-class:myapp.GenerateReportTask-class:myapp.PlotBudgetTask

在我们的例子中,我们假设:

  • DownloadDataTask在输入时不接受数据文件,在输出时生成12302110
  • GenerateReportTask在输入时接受12302110,在输出时生成4315
  • PlotBudgetTask在输入时接受2110,在输出时生成5214

工作流是目标的命名列表,即要创建的数据文件。 下面是一个示例(引擎配置文件的摘录):

workflows:monthly_briefing:-4315-5214

业务逻辑实现

每个任务都作为elkhound.Task的子类实现。 他们的任务是读取所需的输入文件并创建 输出文件。 下面是一个简单的示例:

classGenerateReportTask(Task):defget_input_data_file_codes(self):return[1230,2110]defget_output_data_file_codes(self):return[4315]defrun(self,input_files,output_files,context=None):withoutput_files[4315].open()asf:for_,input_fileininput_files.items():f.write('Used input file {}\n'.format(input_file.get_path()))

当引擎调用方法run时, input_filesoutput_files参数 包含DataFile知道文件确切路径的对象 还有罐头帮助以正确的模式打开它们(读或写、文本或二进制、gzip或非gzip)。 数据文件对象具有针对特定情况的实用方法, 例如,当输入文件为csv格式时,对应的数据文件对象 有类似read_data_frame()的方法返回pandas数据帧, 以及iterate_records(),它返回一个生成记录的生成器 (在扫描无法放入内存的大文件时很有用)。

运行工作流

下面是如何运行工作流的示例:

python -m elkhound.runner --dir /workspace/foo --engine engine.yaml --targets monthly_briefing --deps

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java什么会导致程序在它似乎拥有的监视器上被阻止?   java Android studio设置视图的背景色   java我可以保存一个文本文件而不给用户修改它的能力吗?   pdfbox PDFBOX2。0:java堆堆栈错误   java是维护和操作AllowList的有效方法   JAVAsql。SQLException:找不到适合jdbc的驱动程序:mysql://localhost:3306/asd性爱   如何使用java。lang.NullPointerException:void 安卓。支持v7。应用程序。ActionBar。setElevation(float)“”在空对象引用上'   java调试空指针异常   java正则表达式,以按令牌的特定匹配项拆分,同时忽略其他匹配项   java为JPanel设置边框上的笔划   并发@Schedule方法的java行为   如何在Java中使用泛型与语言运算符和泛型类扩展数   java Rhino Javascript如何为异常堆栈跟踪标记字符串源   运行可执行jar时发生java错误,无法找到或加载主类