Elkhound工作流引擎

elkhound的Python项目详细描述


elkhound是一个固执己见的、以数据为中心的工作流引擎。它对您的项目做出以下假设:

  • 项目工作流可以分为几个任务,每个任务都有明确定义的输入和输出数据文件,理想情况下没有副作用。任务形成有向无环图(即无循环)。
  • 每个数据文件都有明确定义的格式和模式。连续任务之间的信息主要通过文件传输。csv或gzipped csv文件的首选项。

Elkhound将通过以下方式帮助您:

  • versioning通过给数据文件加上时间戳(任务读取带有最新时间戳的输入文件,写入带有当前时间戳的输出文件)。
  • 支持大数据文件,提供方便的跨行模式迭代器(使用python生成器),降低内存占用。
  • 管理任务的工作流,轻松运行任意任务列表。
  • 由于通过数据文件和数据文件版本控制进行任务间通信,中间结果的自动检查点
  • 通过将配置文件的内容和命令行参数作为任务的上下文(减少硬编码常量的诱惑)来管理项目的参数。
  • 记录工作流执行、报告上下文和执行进度。在一个地方自动收集和归档来自不同运行的日志,以便于重复性。
  • 通过将可模拟的数据文件对象作为任务类的输入和输出来促进单元测试

为了运行elkhound工作流,需要创建 引擎配置文件 并在Task子类中实现业务逻辑。

发动机配置

引擎配置文件(在我们的示例中,我们将其命名为engine.yaml) 有三个部分:

  • specs,在这里您将描述数据文件(名称、格式、模式;这些文件是某些任务的输出,其他任务的输入),
  • tasks,在这里您将指向业务逻辑的实现。任务定义数据文件的转换(如何在给定输入文件x和y的情况下构建输出文件z)
  • workflows,在这里您将捆绑多组目标数据文件。

数据文件是埃尔克霍德的一等公民。 它们由四位数字代码(例如1230211043155214)标识。 新工作流的设计应该从注册开始 数据文件,并可选地定义其架构(如果适用)。 以下是在引擎配置文件中定义的数据文件示例:

specs:-code:1230name:peopleextension:csv.gzflags:-gzippedschema:-name:nametype:str-name:dobtype:datetime-name:is_employeetype:bool-code:2110name:budgetextension:xlsxflags:-binary-code:4315name:reportextension:txt-code:5214name:plotsextension:dirflags:-directory

任务是python类,在输入时接受零个或多个数据文件 并在输出时生成零个或多个数据文件。 每个任务类必须实现三种方法:

  • get_input_data_file_codes(self)返回输入数据文件代码列表,
  • get_output_data_file_codes(self)返回输出数据文件代码列表,
  • run(self, input_files, output_files, context)执行业务逻辑。

下面是在引擎配置文件中注册的任务示例:

tasks:-class:myapp.DownloadDataTask-class:myapp.GenerateReportTask-class:myapp.PlotBudgetTask

在我们的例子中,我们假设:

  • DownloadDataTask在输入时不接受数据文件,在输出时生成12302110
  • GenerateReportTask在输入时接受12302110,在输出时生成4315
  • PlotBudgetTask在输入时接受2110,在输出时生成5214

工作流是目标的命名列表,即要创建的数据文件。 下面是一个示例(引擎配置文件的摘录):

workflows:monthly_briefing:-4315-5214

业务逻辑实现

每个任务都作为elkhound.Task的子类实现。 他们的任务是读取所需的输入文件并创建 输出文件。 下面是一个简单的示例:

classGenerateReportTask(Task):defget_input_data_file_codes(self):return[1230,2110]defget_output_data_file_codes(self):return[4315]defrun(self,input_files,output_files,context=None):withoutput_files[4315].open()asf:for_,input_fileininput_files.items():f.write('Used input file {}\n'.format(input_file.get_path()))

当引擎调用方法run时, input_filesoutput_files参数 包含DataFile知道文件确切路径的对象 还有罐头帮助以正确的模式打开它们(读或写、文本或二进制、gzip或非gzip)。 数据文件对象具有针对特定情况的实用方法, 例如,当输入文件为csv格式时,对应的数据文件对象 有类似read_data_frame()的方法返回pandas数据帧, 以及iterate_records(),它返回一个生成记录的生成器 (在扫描无法放入内存的大文件时很有用)。

运行工作流

下面是如何运行工作流的示例:

python -m elkhound.runner --dir /workspace/foo --engine engine.yaml --targets monthly_briefing --deps

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
serversocket Java服务器socket中断如何   java通过随机类和排序为数组生成数据   @Column length大于255个字符的java实体   java在使用NIO2 API时消除检查异常   java使用用户凭据通过Microsoft Graph API上的控制台应用程序发送电子邮件   java将2个ArrayList一个接一个地显示到jTextArea中,根据输入的数据多次显示   java无法使用导入的库。安卓工作室。谷歌云端点jar   spring boot Neo4j Apoc在java中获取结果摘要时的情况   转义特殊字符javajson   无法在java中使用OpenCV读取图像,涉及unicode路径   swing Document Listener java“无法实例化DocumentListener类型”   java Piglatin,难以理解如何移动“.”一串   java映射请求基于一个可观察对象中的每个元素,使用改进?   java在可扩展字符串中搜索时,如何忽略字符的大小写?   在包中分组类的java约定   python为Java应用程序创建Ubuntu指示小程序   java Camunda:如何通过processInstanceId获取所有已完成的任务