在一个工作实例中并发执行工作流和活动

0 投票
1 回答
65 浏览
提问于 2025-04-14 17:13

希望你一切都好。我现在正在做一个叫做Temporal的项目,目的是管理工作流程的执行。不过我遇到了一些困难,非常希望能得到你的建议和帮助。

问题:

我们在集群中以POD的形式运行一个工作程序(只运行一个副本,不想多于一个)。我们希望能够同时执行多个工作流程,并且在当前工作流程还没完成时就能得到输出(也就是说,我们希望在同一个工作程序实例中实现并行执行)。

我采取的步骤:

我已经在工作程序的配置中设置了工作流程和任务的最大并发数。

代码或配置:

def slp(sec):
    sleep(sec)
    return f"slept {sec} sec"

@activity.defn(name="Sleeping 1")
async def sleeping1():
    response = slp(20)
    return response

@activity.defn(name="sleeping 2")
async def sleeping2():
    response = slp(120)
    return response

@workflow.defn
class sleepingWF:
    @workflow.run
    async def run(self, body):

        s1 = await workflow.execute_activity(
            sleeping1))
        s2 = await workflow.execute_activity(
            sleeping2)) 
        
async def main():

    TEMPORAL_ENDPOINT = "localhost:7233"
    TASK_QUEUE = "mymac"

    client = await Client.connect(
        target_host=TEMPORAL_ENDPOINT
    )
    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue=TASK_QUEUE,
            workflows=[sleepingWF],
            activities=[sleeping1, sleeping2],
            activity_executor=activity_executor,
            max_concurrent_workflow_tasks=100,
            max_concurrent_activities=100,
            max_concurrent_workflow_task_polls=10,
            max_concurrent_activity_task_polls=10,
        )
        await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

非常感谢你的时间和帮助。如果你有任何建议、见解或可能的解决方案,请随时分享。你的专业知识对我来说非常重要,我期待从社区中学习。

预期结果:

workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:00:00 end_time -> 00:02:20

两个执行的总时间应该是140秒。

环境:python 11

附加信息:

当前总执行时间:

workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:02:20 end_time -> 00:04:40

现在两个执行的总时间是280秒。

1 个回答

0

注意:我现在还不太确定发生了什么以及为什么会这样,但在此期间,我想提供一个可能的解释和解决方案。

简而言之

在第2行调用的 sleep() 似乎会阻止其他线程同时执行。

你可以尝试以下解决方案之一:

  • asyncio.sleep() 替换 sleep()(这样你就可以完全去掉 ThreadPoolExecutor);或者
  • 确保你使用的是最新版本的 Python 和 Temporal 的 Python SDK;或者
  • 使用 ProcessPoolExecutor 替代 ThreadPoolExecutor

详细信息

默认情况下,Temporal 的 Python SDK 在一个线程中执行工作流任务和活动任务,这个线程是基于 asyncio 的。正如 Python SDK 的文档 中所述:

在 Python 中阻塞异步事件循环会将你的异步程序变成一个同步程序,按顺序执行,这样就失去了使用 asyncio 的意义。这还可能导致潜在的死锁和不可预测的行为,使任务无法执行。调试这些问题可能很困难且耗时,因为找到阻塞调用的源头并不总是显而易见的。

通常,可以并且更好地设计你的活动,使其不阻塞线程。例如,如果你的活动需要执行 SQL 查询、HTTP 请求,或者从 S3 下载或上传文件,那么你可以使用与 asyncio 兼容的库。在第2行的 sleep() 的情况下,你可以使用 asyncio.sleep()

如果你确实需要在活动中执行阻塞代码,比如你的活动需要长时间进行 CPU 密集型计算,那么你可以配置你的工作者通过 ThreadPoolExecutorProcessPoolExecutor 来执行活动。

然而,你显然是做了前者,但没有成功。为什么呢?我猜是因为 sleep 函数没有释放 Python 的全局解释器锁(GIL),不过我不太确定原因。

如果你还不知道什么是 Python 的 GIL,我强烈建议你先看看 这个问题的回答,再继续阅读我的回答。

这里需要注意的关键点是,Python 的 GIL 是 按进程 而不是 按线程 的,这与一般的预期不同。这大大限制了你从 ThreadPoolExecutor 中实际期望的并发级别。换句话说,ThreadPoolExecutor 只有在你的代码调用释放 GIL 的本地代码、进行 I/O 操作,或者调用其他类似释放 GIL 的 Python 函数时,才会有帮助。否则,其他线程将没有机会继续执行。

我想你提到的 sleep() 实际上是指 time.sleep(),对吧?time.sleep() 应该是会释放 GIL 的,尽管在你的情况下看起来并没有。难道第2行的 sleep() 实际上指的是其他不同的函数吗?

如何诊断这种问题?

在你的问题中,你给出了这些时间:

workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:02:20 end_time -> 00:04:40

我怀疑这些数字是正确的,因为这意味着你的第二个工作流执行要等到第一个完成才开始,但在评论中,你明确表示你是同时启动两个工作流的。

我更期待你的时间大致是:

workflow_execution_1 : start_time ->  00:00:00  end_time -> *00:02:40*
workflow_execution_2 : start_time -> *00:00:00* end_time ->  00:04:40

检查工作流历史记录是理解发生了什么的最佳方法。假设没有阻塞睡眠问题,你应该在 Temporal UI 中看到如下示例:

在此输入图片描述

工作流的开始时间和结束时间显示在绿色框中。你还可以看到你的睡眠活动的开始和结束时间(蓝色框)。你也可能会看到在工作流任务处理上花费了一些不可忽视的时间,甚至超时错误,这可能指向其他问题。

如果我之前的建议还不足以解决你的问题,请分享一张类似的工作流历史记录的图片,或者更好的是,分享完整的历史记录的 JSON 格式(按下载按钮)。确保在捕获历史记录之前等待工作流完成。根据你分享的工作流代码,工作流完成时历史记录中应该至少有 17 个事件,且两个历史记录中的最后一个事件应该是 WorkflowExecutionCompleted

关于 Temporal 的工作流历史记录还有很多可以说的,超出了我在这里能写的。如果你还没有这样做,我建议你参加 Temporal 的 101 和 102 课程;这两门课程都提供 Python 版本。除了其他内容,这些课程还解释了如何阅读工作流历史记录以调试各种类型的问题。

撰写回答