在一个工作实例中并发执行工作流和活动
希望你一切都好。我现在正在做一个叫做Temporal的项目,目的是管理工作流程的执行。不过我遇到了一些困难,非常希望能得到你的建议和帮助。
问题:
我们在集群中以POD的形式运行一个工作程序(只运行一个副本,不想多于一个)。我们希望能够同时执行多个工作流程,并且在当前工作流程还没完成时就能得到输出(也就是说,我们希望在同一个工作程序实例中实现并行执行)。
我采取的步骤:
我已经在工作程序的配置中设置了工作流程和任务的最大并发数。
代码或配置:
def slp(sec):
sleep(sec)
return f"slept {sec} sec"
@activity.defn(name="Sleeping 1")
async def sleeping1():
response = slp(20)
return response
@activity.defn(name="sleeping 2")
async def sleeping2():
response = slp(120)
return response
@workflow.defn
class sleepingWF:
@workflow.run
async def run(self, body):
s1 = await workflow.execute_activity(
sleeping1))
s2 = await workflow.execute_activity(
sleeping2))
async def main():
TEMPORAL_ENDPOINT = "localhost:7233"
TASK_QUEUE = "mymac"
client = await Client.connect(
target_host=TEMPORAL_ENDPOINT
)
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue=TASK_QUEUE,
workflows=[sleepingWF],
activities=[sleeping1, sleeping2],
activity_executor=activity_executor,
max_concurrent_workflow_tasks=100,
max_concurrent_activities=100,
max_concurrent_workflow_task_polls=10,
max_concurrent_activity_task_polls=10,
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
非常感谢你的时间和帮助。如果你有任何建议、见解或可能的解决方案,请随时分享。你的专业知识对我来说非常重要,我期待从社区中学习。
预期结果:
workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:00:00 end_time -> 00:02:20
两个执行的总时间应该是140秒。
环境:python 11
附加信息:
当前总执行时间:
workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:02:20 end_time -> 00:04:40
现在两个执行的总时间是280秒。
1 个回答
注意:我现在还不太确定发生了什么以及为什么会这样,但在此期间,我想提供一个可能的解释和解决方案。
简而言之
在第2行调用的 sleep()
似乎会阻止其他线程同时执行。
你可以尝试以下解决方案之一:
- 用
asyncio.sleep()
替换sleep()
(这样你就可以完全去掉ThreadPoolExecutor
);或者 - 确保你使用的是最新版本的 Python 和 Temporal 的 Python SDK;或者
- 使用
ProcessPoolExecutor
替代ThreadPoolExecutor
。
详细信息
默认情况下,Temporal 的 Python SDK 在一个线程中执行工作流任务和活动任务,这个线程是基于 asyncio
的。正如 Python SDK 的文档 中所述:
在 Python 中阻塞异步事件循环会将你的异步程序变成一个同步程序,按顺序执行,这样就失去了使用 asyncio 的意义。这还可能导致潜在的死锁和不可预测的行为,使任务无法执行。调试这些问题可能很困难且耗时,因为找到阻塞调用的源头并不总是显而易见的。
通常,可以并且更好地设计你的活动,使其不阻塞线程。例如,如果你的活动需要执行 SQL 查询、HTTP 请求,或者从 S3 下载或上传文件,那么你可以使用与 asyncio 兼容的库。在第2行的 sleep()
的情况下,你可以使用 asyncio.sleep()
。
如果你确实需要在活动中执行阻塞代码,比如你的活动需要长时间进行 CPU 密集型计算,那么你可以配置你的工作者通过 ThreadPoolExecutor
或 ProcessPoolExecutor
来执行活动。
然而,你显然是做了前者,但没有成功。为什么呢?我猜是因为 sleep
函数没有释放 Python 的全局解释器锁(GIL),不过我不太确定原因。
如果你还不知道什么是 Python 的 GIL,我强烈建议你先看看 这个问题的回答,再继续阅读我的回答。
这里需要注意的关键点是,Python 的 GIL 是 按进程 而不是 按线程 的,这与一般的预期不同。这大大限制了你从 ThreadPoolExecutor
中实际期望的并发级别。换句话说,ThreadPoolExecutor
只有在你的代码调用释放 GIL 的本地代码、进行 I/O 操作,或者调用其他类似释放 GIL 的 Python 函数时,才会有帮助。否则,其他线程将没有机会继续执行。
我想你提到的 sleep()
实际上是指 time.sleep()
,对吧?time.sleep()
应该是会释放 GIL 的,尽管在你的情况下看起来并没有。难道第2行的 sleep()
实际上指的是其他不同的函数吗?
如何诊断这种问题?
在你的问题中,你给出了这些时间:
workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:02:20 end_time -> 00:04:40
我怀疑这些数字是正确的,因为这意味着你的第二个工作流执行要等到第一个完成才开始,但在评论中,你明确表示你是同时启动两个工作流的。
我更期待你的时间大致是:
workflow_execution_1 : start_time -> 00:00:00 end_time -> *00:02:40*
workflow_execution_2 : start_time -> *00:00:00* end_time -> 00:04:40
检查工作流历史记录是理解发生了什么的最佳方法。假设没有阻塞睡眠问题,你应该在 Temporal UI 中看到如下示例:
工作流的开始时间和结束时间显示在绿色框中。你还可以看到你的睡眠活动的开始和结束时间(蓝色框)。你也可能会看到在工作流任务处理上花费了一些不可忽视的时间,甚至超时错误,这可能指向其他问题。
如果我之前的建议还不足以解决你的问题,请分享一张类似的工作流历史记录的图片,或者更好的是,分享完整的历史记录的 JSON 格式(按下载按钮)。确保在捕获历史记录之前等待工作流完成。根据你分享的工作流代码,工作流完成时历史记录中应该至少有 17 个事件,且两个历史记录中的最后一个事件应该是 WorkflowExecutionCompleted
。
关于 Temporal 的工作流历史记录还有很多可以说的,超出了我在这里能写的。如果你还没有这样做,我建议你参加 Temporal 的 101 和 102 课程;这两门课程都提供 Python 版本。除了其他内容,这些课程还解释了如何阅读工作流历史记录以调试各种类型的问题。