一个简单的管道创建和执行工具
sinbadflow的Python项目详细描述
管道创建和简单执行工具
Sinbadflow是一个简单的管道创建和执行工具。它的创建考虑到了Databricks笔记本的工作流程,但是通过灵活的实现选项,该工具可以定制以适应任何任务。该库以著名的动画片《辛巴达:七海传奇》命名,提供以并行或单一模式创建和运行具有特定触发器和条件函数的代理的能力。通过简单但直观的基于代码的语法,我们可以创建精细的管道来帮助任何数据工程、数据科学或软件开发任务。在
安装
要安装,请使用:
pip install sinbadflow
使用
Sinbadflow支持不同执行触发器的单次或并行运行。要构建管道,请在两个代理之间使用>>
符号。Databricks笔记本管道示例(逐个执行):
fromsinbadflow.agents.databricksimportDatabricksAgentasdbrfromsinbadflowimportTriggerpipeline=dbr('/path/to/notebook1')>>dbr('path/to/another/notebook')
并行运行管道(列表中的代理以并行模式执行):
^{pr2}$可以使用触发器来控制流。Sinbadflow支持以下触发器:
Trigger.DEFAULT
-默认触发器,始终执行代理。在Trigger.OK_PREV
-如果前一个代理成功完成,将执行代理。在Trigger.OK_ALL
-如果到目前为止没有在管道中记录失败,则将执行代理。在Trigger.FAIL_PREV
-如果上一个代理运行失败,将执行代理。在Trigger.FAIL_ALL
-如果以前的所有运行都失败,将执行代理。在
示例工作流如下所示:
execution=dbr('/execute')parallel_handles=[dbr('/handle_ok',Trigger.OK_PREV),dbr('/handle_fail',Trigger.FAIL_PREV)]save=dbr('/save_all',Trigger.OK_ALL)fail_handling=dbr('/log_all_failed',Trigger.FAIL_ALL)pipeline=execution>>parallel_handles>>save>>fail_handling
要运行管道:
fromsinbadflowimportSinbadflowsf=Sinbadflow()sf.run(pipeline)
管道将被执行,结果将用所选方法记录(支持print/logging
)。Sinbadflow将始终运行完整的管道,如果管道发生故障,则没有实施早期停止。在
条件函数
为了实现更灵活的工作流控制,Sinbadflow还支持条件函数检查。这是代理的更精细的触发器。在
fromsinbadflow.agents.databricksimportDatabricksAgentasdbrfromdatetimeimportdatedefis_monday():returndate.today().weekday()==0notebook1=dbr('/notebook1',conditional_func=is_monday)notebook2=dbr('/notebook2',conditional_func=is_monday)pipeline=notebook1>>notebook2
在上面的示例中,如果由于conditional_fuc
函数,今天不是星期一,则将跳过笔记本。Sinbadflow还提供了使用apply_conditional_func
方法对整个管道应用条件函数的能力。在
fromsinbadflow.utilsimportapply_conditional_funcpipeline=dbr('/notebook1')>>dbr('/notebook2')>>dbr('/notebook3')pipeline=apply_conditional_func(pipeline,is_monday)
定制代理
Sinbadflow提供创建自己代理的能力。为了做到这一点,您的代理必须从BaseAgent
类继承,将data
和{**kwargs
),并实现run()
方法。例如DummyAgent
:
fromsinbadflow.agentsimportBaseAgentfromsinbadflowimportTriggerfromsinbadflowimportSinbadflowclassDummyAgent(BaseAgent):def__init__(self,data=None,trigger=Trigger.DEFAULT,**kwargs):super(DummyAgent,self).__init__(data,trigger,**kwargs)defrun(self):print(f' Running my DummyAgent with data: {self.data}')defcondition():returnFalsesecret_data=DummyAgent('secret_data')parallel_data=[DummyAgent('simple_data',conditional_func=condition),DummyAgent('important_data',Trigger.OK_ALL)]pipeline=secret_data>>parallel_datasf=Sinbadflow()sf.run(pipeline)
DatabricksAgent-群集模式
现成的Sinbadflow附带DatabricksAgent
,可用于在交互式或作业集群上运行Databricks笔记本。DatabricksAgent
初始化参数:
notebook_path#Notebook location in the workspacetrigger=Trigger.DEFAULT#Triggertimeout=1800#Notebook run timeoutargs={}#Notebook argumentscluster_mode='interactive'#Cluster mode (interactive/job)job_args={)#Job cluster parameters conditional_func=default_func()#Conditional function
作业集群创建的默认job_args
参数(有关作业参数see here)的更多信息:
{'spark_version':'6.4.x-scala2.11','node_type_id':'Standard_DS3_v2','driver_node_type_id':'Standard_DS3_v2','num_workers':1}
默认情况下,笔记本将使用dbutils
库在交互式集群上执行。要在单独的作业群集上运行笔记本,请使用以下代码:
fromsinbadflow.agents.databricksimportDatabricksAgentasdbr,JobSubmitterfromsinbadflow.executorimportSinbadflow#set new job_argsnew_job_args={'num_workers':10,'driver_node_type_id':'Standard_DS3_v2'}job_notebook=dbr('notebook1',job_args=new_job_args,cluster_mode='job')interactive_notebook=dbr('notebook2')pipeline=job_notebook>>interactive_notebook##Access token is used for job cluster creation and notebook submissionJobSubmitter.set_access_token('<DATABRICKS ACCESS TOKEN>')sf=Sinbadflow()sf.run(pipeline)
如上面的示例所示,您可以在交互式/作业集群上混合和匹配代理运行,以实现最佳解决方案。在
其他帮助
完整的API文档可以找到here。在
使用内置的help()
方法获取有关类和方法的其他信息。在
如有任何问题,请随时与我联系。鼓励拉取请求!在
贡献者
特别感谢为项目做出贡献的所有人:
Robertas Sys,Emilija Lamanauskaite
- 项目
标签: