一个简单的管道创建和执行工具

sinbadflow的Python项目详细描述


Logo

管道创建和简单执行工具

Tests

Sinbadflow是一个简单的管道创建和执行工具。它的创建考虑到了Databricks笔记本的工作流程,但是通过灵活的实现选项,该工具可以定制以适应任何任务。该库以著名的动画片《辛巴达:七海传奇》命名,提供以并行或单一模式创建和运行具有特定触发器和条件函数的代理的能力。通过简单但直观的基于代码的语法,我们可以创建精细的管道来帮助任何数据工程、数据科学或软件开发任务。在

安装

要安装,请使用:

pip install sinbadflow

使用

Sinbadflow支持不同执行触发器的单次或并行运行。要构建管道,请在两个代理之间使用>>符号。Databricks笔记本管道示例(逐个执行):

fromsinbadflow.agents.databricksimportDatabricksAgentasdbrfromsinbadflowimportTriggerpipeline=dbr('/path/to/notebook1')>>dbr('path/to/another/notebook')

并行运行管道(列表中的代理以并行模式执行):

^{pr2}$

可以使用触发器来控制流。Sinbadflow支持以下触发器:

  • Trigger.DEFAULT-默认触发器,始终执行代理。在
  • Trigger.OK_PREV-如果前一个代理成功完成,将执行代理。在
  • Trigger.OK_ALL-如果到目前为止没有在管道中记录失败,则将执行代理。在
  • Trigger.FAIL_PREV-如果上一个代理运行失败,将执行代理。在
  • Trigger.FAIL_ALL-如果以前的所有运行都失败,将执行代理。在

示例工作流如下所示:

execution=dbr('/execute')parallel_handles=[dbr('/handle_ok',Trigger.OK_PREV),dbr('/handle_fail',Trigger.FAIL_PREV)]save=dbr('/save_all',Trigger.OK_ALL)fail_handling=dbr('/log_all_failed',Trigger.FAIL_ALL)pipeline=execution>>parallel_handles>>save>>fail_handling

要运行管道:

fromsinbadflowimportSinbadflowsf=Sinbadflow()sf.run(pipeline)

管道将被执行,结果将用所选方法记录(支持print/logging)。Sinbadflow将始终运行完整的管道,如果管道发生故障,则没有实施早期停止。在

条件函数

为了实现更灵活的工作流控制,Sinbadflow还支持条件函数检查。这是代理的更精细的触发器。在

fromsinbadflow.agents.databricksimportDatabricksAgentasdbrfromdatetimeimportdatedefis_monday():returndate.today().weekday()==0notebook1=dbr('/notebook1',conditional_func=is_monday)notebook2=dbr('/notebook2',conditional_func=is_monday)pipeline=notebook1>>notebook2

在上面的示例中,如果由于conditional_fuc函数,今天不是星期一,则将跳过笔记本。Sinbadflow还提供了使用apply_conditional_func方法对整个管道应用条件函数的能力。在

fromsinbadflow.utilsimportapply_conditional_funcpipeline=dbr('/notebook1')>>dbr('/notebook2')>>dbr('/notebook3')pipeline=apply_conditional_func(pipeline,is_monday)

定制代理

Sinbadflow提供创建自己代理的能力。为了做到这一点,您的代理必须从BaseAgent类继承,将data和{}参数传递给父类(如果您计划使用条件函数,也可以是**kwargs),并实现run()方法。例如DummyAgent

fromsinbadflow.agentsimportBaseAgentfromsinbadflowimportTriggerfromsinbadflowimportSinbadflowclassDummyAgent(BaseAgent):def__init__(self,data=None,trigger=Trigger.DEFAULT,**kwargs):super(DummyAgent,self).__init__(data,trigger,**kwargs)defrun(self):print(f'        Running my DummyAgent with data: {self.data}')defcondition():returnFalsesecret_data=DummyAgent('secret_data')parallel_data=[DummyAgent('simple_data',conditional_func=condition),DummyAgent('important_data',Trigger.OK_ALL)]pipeline=secret_data>>parallel_datasf=Sinbadflow()sf.run(pipeline)

DatabricksAgent-群集模式

现成的Sinbadflow附带DatabricksAgent,可用于在交互式或作业集群上运行Databricks笔记本。DatabricksAgent初始化参数:

notebook_path#Notebook location in the workspacetrigger=Trigger.DEFAULT#Triggertimeout=1800#Notebook run timeoutargs={}#Notebook argumentscluster_mode='interactive'#Cluster mode (interactive/job)job_args={)#Job cluster parameters  conditional_func=default_func()#Conditional function

作业集群创建的默认job_args参数(有关作业参数see here)的更多信息:

{'spark_version':'6.4.x-scala2.11','node_type_id':'Standard_DS3_v2','driver_node_type_id':'Standard_DS3_v2','num_workers':1}

默认情况下,笔记本将使用dbutils库在交互式集群上执行。要在单独的作业群集上运行笔记本,请使用以下代码:

fromsinbadflow.agents.databricksimportDatabricksAgentasdbr,JobSubmitterfromsinbadflow.executorimportSinbadflow#set new job_argsnew_job_args={'num_workers':10,'driver_node_type_id':'Standard_DS3_v2'}job_notebook=dbr('notebook1',job_args=new_job_args,cluster_mode='job')interactive_notebook=dbr('notebook2')pipeline=job_notebook>>interactive_notebook##Access token is used for job cluster creation and notebook submissionJobSubmitter.set_access_token('<DATABRICKS ACCESS TOKEN>')sf=Sinbadflow()sf.run(pipeline)

如上面的示例所示,您可以在交互式/作业集群上混合和匹配代理运行,以实现最佳解决方案。在

其他帮助

完整的API文档可以找到here。在

使用内置的help()方法获取有关类和方法的其他信息。在

如有任何问题,请随时与我联系。鼓励拉取请求!在

贡献者

特别感谢为项目做出贡献的所有人:

Robertas SysEmilija Lamanauskaite

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
带truezip的java拆分zip   java Spring,AppEngine:在AppEngine的数据源中添加postgresql url   java Android coverflow   java以编程方式创建复合过滤器,以在log4j 2中定义多个过滤器   java jpa eclipselink异常[eclipselink 4002]   中的java WordNet数据库目录相对路径。罐子   java无法在Spring Boot 2/3中显示登录的用户   java Onetomany:未找到联接表错误   java数据模型演化   java方法在类型列表中添加的(对象)不适用于参数(int)意味着什么?   用java打印两个数组   java SNMP4J发送从不超时   java添加/删除联系人(EditText)+类别(SpinnerBox),可以根据需要动态添加/删除多个联系人   语句和PreparedStatement之间的java差异   java在运行作为JAR归档文件分发的项目时加载图像等资源   来自应用程序或外部服务器的java Cron作业   多线程Java并发:并发添加和清除列表项   java更改单元测试的私有方法行为