Elkhound工作流引擎
elkhound的Python项目详细描述
elkhound是一个固执己见的、以数据为中心的工作流引擎。它对您的项目做出以下假设:
- 项目工作流可以分为几个任务,每个任务都有明确定义的输入和输出数据文件,理想情况下没有副作用。任务形成有向无环图(即无循环)。
- 每个数据文件都有明确定义的格式和模式。连续任务之间的信息主要通过文件传输。csv或gzipped csv文件的首选项。
Elkhound将通过以下方式帮助您:
- versioning通过给数据文件加上时间戳(任务读取带有最新时间戳的输入文件,写入带有当前时间戳的输出文件)。
- 支持大数据文件,提供方便的跨行模式迭代器(使用python生成器),降低内存占用。
- 管理任务的工作流,轻松运行任意任务列表。
- 由于通过数据文件和数据文件版本控制进行任务间通信,中间结果的自动检查点。
- 通过将配置文件的内容和命令行参数作为任务的上下文(减少硬编码常量的诱惑)来管理项目的参数。
- 记录工作流执行、报告上下文和执行进度。在一个地方自动收集和归档来自不同运行的日志,以便于重复性。
- 通过将可模拟的数据文件对象作为任务类的输入和输出来促进单元测试。
为了运行elkhound工作流,需要创建 引擎配置文件 并在Task子类中实现业务逻辑。
发动机配置
引擎配置文件(在我们的示例中,我们将其命名为engine.yaml) 有三个部分:
- specs,在这里您将描述数据文件(名称、格式、模式;这些文件是某些任务的输出,其他任务的输入),
- tasks,在这里您将指向业务逻辑的实现。任务定义数据文件的转换(如何在给定输入文件x和y的情况下构建输出文件z)
- workflows,在这里您将捆绑多组目标数据文件。
数据文件是埃尔克霍德的一等公民。 它们由四位数字代码(例如1230,2110,4315,5214)标识。 新工作流的设计应该从注册开始 数据文件,并可选地定义其架构(如果适用)。 以下是在引擎配置文件中定义的数据文件示例:
specs:-code:1230name:peopleextension:csv.gzflags:-gzippedschema:-name:nametype:str-name:dobtype:datetime-name:is_employeetype:bool-code:2110name:budgetextension:xlsxflags:-binary-code:4315name:reportextension:txt-code:5214name:plotsextension:dirflags:-directory
任务是python类,在输入时接受零个或多个数据文件 并在输出时生成零个或多个数据文件。 每个任务类必须实现三种方法:
- get_input_data_file_codes(self)返回输入数据文件代码列表,
- get_output_data_file_codes(self)返回输出数据文件代码列表,
- run(self, input_files, output_files, context)执行业务逻辑。
下面是在引擎配置文件中注册的任务示例:
tasks:-class:myapp.DownloadDataTask-class:myapp.GenerateReportTask-class:myapp.PlotBudgetTask
在我们的例子中,我们假设:
- DownloadDataTask在输入时不接受数据文件,在输出时生成1230和2110。
- GenerateReportTask在输入时接受1230和2110,在输出时生成4315。
- PlotBudgetTask在输入时接受2110,在输出时生成5214。
工作流是目标的命名列表,即要创建的数据文件。 下面是一个示例(引擎配置文件的摘录):
workflows:monthly_briefing:-4315-5214
业务逻辑实现
每个任务都作为elkhound.Task的子类实现。 他们的任务是读取所需的输入文件并创建 输出文件。 下面是一个简单的示例:
classGenerateReportTask(Task):defget_input_data_file_codes(self):return[1230,2110]defget_output_data_file_codes(self):return[4315]defrun(self,input_files,output_files,context=None):withoutput_files[4315].open()asf:for_,input_fileininput_files.items():f.write('Used input file {}\n'.format(input_file.get_path()))
当引擎调用方法run时, input_files和output_files参数 包含DataFile知道文件确切路径的对象 还有罐头帮助以正确的模式打开它们(读或写、文本或二进制、gzip或非gzip)。 数据文件对象具有针对特定情况的实用方法, 例如,当输入文件为csv格式时,对应的数据文件对象 有类似read_data_frame()的方法返回pandas数据帧, 以及iterate_records(),它返回一个生成记录的生成器 (在扫描无法放入内存的大文件时很有用)。
运行工作流
下面是如何运行工作流的示例:
python -m elkhound.runner --dir /workspace/foo --engine engine.yaml --targets monthly_briefing --deps