集成测试多个Celery工作进程和数据库支持的Django API

9 投票
2 回答
1327 浏览
提问于 2025-04-18 05:08

我正在使用一种软件架构,这种架构有多个 celery 工作者(我们称它们为 worker1worker2worker3)。这三个工作者都是独立的实体(也就是说,它们有各自的代码、各自的代码库、各自的仓库、各自的 celery 实例和各自的机器),而且它们都没有连接到 Django 应用。

与这三个工作者进行通信的是一个基于 Django 的、使用 MySQL 的 RESTful API。

在开发过程中,这些服务都在一个 vagrant 虚拟环境中,每个服务都像独立的机器一样,通过不同的端口运行。我们为所有的 Celery 任务使用一个 RabbitMQ 代理。

这些服务之间的典型工作流程可能是这样的:worker1 从一个设备接收到消息,进行一些处理,然后把一个任务放到 worker2 的队列中,worker2 进行进一步处理,并向 API 发送一个 POST 请求,API 将数据写入 MySQL 数据库,并触发 worker3 的任务,worker3 进行其他处理,再次向 API 发送一个 POST 请求,最终导致 MySQL 的写入。

这些服务之间的通信很顺畅,但每次我们对任何服务进行更改时,测试这个流程都非常麻烦。我真的想要一些完整的集成测试(也就是说,从发送消息到 worker1 开始,经过整个流程),但我不知道从哪里开始。现在我面临的主要问题是:

如果我在 worker1 上放入一个任务,我怎么知道整个流程什么时候结束?当我不知道结果是否已经到达时,我又怎么能对结果做出合理的判断呢?

我该如何处理数据库的设置和清理?我希望在每个测试结束时删除测试期间创建的所有条目,但如果我是在 Django 应用之外开始测试,我不太确定如何高效地清理这些数据。每次测试后手动删除并重新创建数据库似乎会带来太大的开销。

2 个回答

0

要使用完整的设置,你可以配置一个Celery的结果后端。想了解基础知识,可以查看Celery的'下一步'文档。

worker1可以报告它传递给worker2的任务处理情况。worker2返回的结果会是它传递给worker3的任务ID。而worker3返回的结果则意味着整个过程已经完成,你可以查看结果。结果也可以立即报告一些有趣的信息,这样检查起来会更方便。

在Celery中,这可能看起来有点像这样:

worker1_result = mytask.delay(someargs)  # executed by worker1
worker2_result = worker1_result.get()  # waits for worker1 to finish
worker3_result = worker2_result.get()  # waits for worker2 to finish
outcome = worker3_result.get()  # waits for worker3 to finish

(具体细节可能需要不同;我自己还没有使用过这个。我不确定任务结果是否可以序列化,因此是否适合作为任务函数的返回值。)

3

Celery可以让任务同步运行,所以第一步是:把整个流程分成几个独立的任务,模拟请求并检查结果:

原始流程:

device --- worker1 --- worker2 --- django --- worker3 --- django

第一层集成测试:

1.      |- worker1 -|
2.                  |- worker2 -|
3.                              |- django -|
4.                                         |- worker3 -|
5.                                                     |- django -|

对于每个测试,创建一个模拟请求或者同步调用,并检查结果。把这些测试放在对应的代码库里。例如,在worker1的测试中,你可以模拟worker2,检查它是否被正确的参数调用。然后,在另一个测试中,你会调用worker2,并模拟请求来确认它是否调用了正确的API。依此类推。

测试整个流程会比较困难,因为所有任务都是独立的实体。我现在想到的唯一方法是对worker1进行一次模拟调用,设置一个合理的超时时间,然后等待数据库中的最终结果。这种测试只能告诉你它是否工作,而不能告诉你问题出在哪里。

撰写回答