具有可插入持久性和扩展支持的工作流库

liteflow.core的Python项目详细描述


文字流

liteflow是一个用于运行工作流的python库。思考:具有多个需要跟踪状态的任务的长时间运行的流程。它支持可插入的持久性和并发性提供程序,允许多节点群集。

安装

安装"liteflow.core"软件包

> pip install liteflow.core

基本概念

步骤

工作流由一系列连接的步骤组成。每个步骤产生一个结果值,后续步骤通过订阅前一步骤的特定结果来触发。 步骤通常是通过继承stepbody抽象类并实现run方法来定义的。

首先,我们定义一些步骤

fromliteflow.coreimport*classHello(StepBody):defrun(self,context:StepExecutionContext)->ExecutionResult:print("Hello world")classGoodbye(StepBody):defrun(self,context:StepExecutionContext)->ExecutionResult:print("Goodbye")returnExecutionResult.next()

然后,我们通过组成一系列步骤来定义工作流结构。

classMyWorkflow(Workflow):defid(self):return"MyWorkflow"defversion(self):return1defbuild(self,builder:WorkflowBuilder):builder\
            .start_with(Hello)\
            .then(Goodbye)

工作流主机使用idversion属性来标识工作流定义。

在每个步骤之间,每个正在运行的工作流都被持久化到所选的持久化提供程序,在以后的某个时间点,可以从中提取工作流以继续执行。步骤的结果可以指示工作流宿主将工作流的进一步执行推迟到将来的时间点或响应外部事件。

第一次调用工作流中的特定步骤时,上下文对象上的persistencedata属性为none。由run方法生成的执行结果可以通过提供结果值使工作流进入下一步,指示工作流在定义的时间段内休眠,或者干脆不向前移动工作流。如果没有产生结果值,则通过设置persistencedata,该步骤将成为重新进入的步骤,因此工作流主机将在将来再次调用该步骤buy将使用其先前的值弹出persistencedata。

在步骤之间传递数据

每个步骤都是一个黑盒,因此它们支持输入和输出。每个工作流实例都带有一个数据属性,用于保存"工作流范围"数据,这些步骤可以使用这些数据进行通信。

下面的示例演示如何定义步骤上的输入和输出,然后演示如何将输入和输出映射到工作流数据属性上的属性。

#Our workflow step with inputs and outputsclassAddNumbers(StepBody):def__init__(self):self.input1=0self.input2=0self.output=0defrun(self,context:StepExecutionContext)->ExecutionResult:self.output=self.input1+self.input2returnExecutionResult.next()#A class to hold workflow wide dataclassMyData:def__init__(self):self.value1=0self.value2=0self.value3=0#Our workflow definition with mapped inputs & outputsclassMyWorkflow(Workflow):defbuild(self,builder:WorkflowBuilder):builder\
            .start_with(Hello)\
            .then(AddNumbers) \
                .input('input1',lambdadata,context:data.value1) \
                .input('input2',lambdadata,context:data.value2) \
                .output('value3',lambdastep:step.output) \
            .then(Goodbye)

事件

工作流也可以在继续之前等待外部事件。在下面的示例中,工作流将等待键为"key1"的名为"event1"的事件。一旦外部源触发此事件,工作流将唤醒并继续处理。

classMyWorkflow(Workflow):defbuild(self,builder:WorkflowBuilder):builder\
            .start_with(Hello) \
            .wait_for('event1',lambdadata,context:'key1') \
            .then(Goodbye)#External events are published via the host#All workflows that have subscribed to event1, key1, will be passed "hello"host.publish_event('event1','key1','hello')#Data from the published event can be captured and mapped to the workflow data object with an output on the WaitFor stepclassMyWorkflow(Workflow):defbuild(self,builder:WorkflowBuilder):builder\
            .start_with(Hello) \
            .wait_for('event1',lambdadata,context:'key1') \
                .output('captured_value',lambdastep:step.event_data) \
            .then(Goodbye)

流量控制

并行foreach
classDoStuff(StepBody):defrun(self,context:StepExecutionContext)->ExecutionResult:print(f"doing stuff...{context.execution_pointer.context_item}")returnExecutionResult.next()classMyWorkflow(Workflow):defbuild(self,builder:WorkflowBuilder):builder\
            .start_with(Hello)\
            .for_each(lambdadata,context:["abc","def","xyz"])\
                .do(lambdax:\
                    x.start_with(DoStuff))\
            .then(Goodbye)

而条件

classMyWorkflow(Workflow):defbuild(self,builder:WorkflowBuilder):builder\
            .start_with(Hello)\
            .while_(lambdadata,context:data.value1<3)\
                .do(lambdado:\
                    do.start_with(DoStuff)\
                        .input('my_value',lambdadata,context:data.value1)\
                        .output('value1',lambdastep:step.your_value))\
            .then(Goodbye)

如果条件

classMyWorkflow(Workflow):defbuild(self,builder:WorkflowBuilder):builder\
            .start_with(Hello)\
            .if_(lambdadata,context:data.value1>3)\
                .do(lambdax:\
                    x.start_with(DoStuff))\
            .then(Goodbye)

主机

工作流主机是负责执行工作流的服务。它通过对准备运行的工作流实例的持久性提供程序进行轮询,执行这些实例,然后在下次运行时通过存储将它们传回持久性提供程序。它还负责将事件发布到可能正在等待的任何工作流一方面,

用法

当应用程序启动时,使用configure_workflow_host创建WorkflowHost服务,调用register_workflow,以便工作流宿主了解所有工作流,然后调用start启动执行工作流的事件循环。使用启动工作流方法启动特定工作流的新实例。

fromliteflow.coreimport*host=configure_workflow_host()host.register_workflow(MyWorkflow())host.start()wid=host.start_workflow("MyWorkflow",1,None)

持久性

由于工作流通常是长时间运行的流程,因此需要在步骤之间将它们持久化到存储中。 有多个持久性提供程序作为单独的包提供。

  • 内存持久性提供程序(用于演示和测试的默认提供程序)
  • mongodb
  • (即将有更多内容…)

多节点群集

默认情况下,WorkflowHost服务将使用内置队列作为单个节点运行,并为单个节点配置锁定提供程序。如果希望运行多节点集群,则需要配置外部排队机制和分布式锁管理器来协调集群。这些是当前可用的提供商。

队列提供程序

  • singlenodequeueprovider(默认内置提供程序)
  • 蔚蓝
  • RabbitMQ(即将推出…)

分布式锁管理器
  • locallockprovider(默认内置提供程序)
  • 蔚蓝
  • redis redlock(即将推出…)

样品

作者

  • daniel gerlag-初始工作

许可证

此项目是根据麻省理工学院的许可证授权的-有关详细信息,请参见license.md文件

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
如果在每次迭代后返回到原始状态,java是否可以安全地对正在迭代的数组进行变异?   linux java对'main'集合的未定义引用2:ld返回1退出状态   java如何在dynamodb上按多个字段进行排序(有解决方法吗)?   java错误处理servlet停止工作,我不知道原因:(   多线程如何在Java中暂停/恢复ExecutorService中的所有线程?   java为什么是我的jsoup。连接(url)无法使用某些url?   多线程理解Java多线程中的内存可见性   java sonar scanner可以在没有sonarqube服务器的情况下工作吗   如何从java中的页面获取不同的url?   java从文件中查找命令行中指定的单词   java类的“set”方法是否应该返回“void”或“boolean”?   如何使用java代码批量读取文件内容   二进制十进制(java.lang.NumberFormatException)   java如何在同一个句子中分别替换同一个单词但大小写不同?   java Spring `RestController`方法注释了`Transactional`,但“当前没有活动的事务”?   java OWL API:如何将域分配给OWLObjectProperty上的范围   java变量末尾的$有特殊意义吗?   java什么距离计算(经度、纬度)更精确?   卡片布局java GUI