集成测试多个Celery工作进程和数据库支持的Django API
我正在使用一种软件架构,这种架构有多个 celery 工作者(我们称它们为 worker1
、worker2
和 worker3
)。这三个工作者都是独立的实体(也就是说,它们有各自的代码、各自的代码库、各自的仓库、各自的 celery 实例和各自的机器),而且它们都没有连接到 Django 应用。
与这三个工作者进行通信的是一个基于 Django 的、使用 MySQL 的 RESTful API。
在开发过程中,这些服务都在一个 vagrant 虚拟环境中,每个服务都像独立的机器一样,通过不同的端口运行。我们为所有的 Celery 任务使用一个 RabbitMQ 代理。
这些服务之间的典型工作流程可能是这样的:worker1
从一个设备接收到消息,进行一些处理,然后把一个任务放到 worker2
的队列中,worker2
进行进一步处理,并向 API
发送一个 POST 请求,API
将数据写入 MySQL 数据库,并触发 worker3
的任务,worker3
进行其他处理,再次向 API
发送一个 POST 请求,最终导致 MySQL 的写入。
这些服务之间的通信很顺畅,但每次我们对任何服务进行更改时,测试这个流程都非常麻烦。我真的想要一些完整的集成测试(也就是说,从发送消息到 worker1
开始,经过整个流程),但我不知道从哪里开始。现在我面临的主要问题是:
如果我在 worker1
上放入一个任务,我怎么知道整个流程什么时候结束?当我不知道结果是否已经到达时,我又怎么能对结果做出合理的判断呢?
我该如何处理数据库的设置和清理?我希望在每个测试结束时删除测试期间创建的所有条目,但如果我是在 Django 应用之外开始测试,我不太确定如何高效地清理这些数据。每次测试后手动删除并重新创建数据库似乎会带来太大的开销。
2 个回答
要使用完整的设置,你可以配置一个Celery的结果后端。想了解基础知识,可以查看Celery的'下一步'文档。
worker1
可以报告它传递给worker2
的任务处理情况。worker2
返回的结果会是它传递给worker3
的任务ID。而worker3
返回的结果则意味着整个过程已经完成,你可以查看结果。结果也可以立即报告一些有趣的信息,这样检查起来会更方便。
在Celery中,这可能看起来有点像这样:
worker1_result = mytask.delay(someargs) # executed by worker1
worker2_result = worker1_result.get() # waits for worker1 to finish
worker3_result = worker2_result.get() # waits for worker2 to finish
outcome = worker3_result.get() # waits for worker3 to finish
(具体细节可能需要不同;我自己还没有使用过这个。我不确定任务结果是否可以序列化,因此是否适合作为任务函数的返回值。)
Celery可以让任务同步运行,所以第一步是:把整个流程分成几个独立的任务,模拟请求并检查结果:
原始流程:
device --- worker1 --- worker2 --- django --- worker3 --- django
第一层集成测试:
1. |- worker1 -|
2. |- worker2 -|
3. |- django -|
4. |- worker3 -|
5. |- django -|
对于每个测试,创建一个模拟请求或者同步调用,并检查结果。把这些测试放在对应的代码库里。例如,在worker1的测试中,你可以模拟worker2,检查它是否被正确的参数调用。然后,在另一个测试中,你会调用worker2,并模拟请求来确认它是否调用了正确的API。依此类推。
测试整个流程会比较困难,因为所有任务都是独立的实体。我现在想到的唯一方法是对worker1进行一次模拟调用,设置一个合理的超时时间,然后等待数据库中的最终结果。这种测试只能告诉你它是否工作,而不能告诉你问题出在哪里。