使用django-celery进行单元测试?

87 投票
6 回答
23045 浏览
提问于 2025-04-16 06:18

我正在为我们的 django-celery 项目想一个测试方法。我看过 文档 中的说明,但没有找到具体该怎么做的好主意。我不太担心测试实际的后台任务,只想测试的代码功能。主要我在想:

  1. 在测试时,怎么能绕过 task.delay() 这个调用(我试过设置 CELERY_ALWAYS_EAGER = True,但没有什么效果)?
  2. 如果推荐使用测试设置(如果这是最好的方法),我们怎么能在不实际更改 settings.py 的情况下使用它?
  3. 我们还可以使用 manage.py test 吗,还是必须用自定义的测试运行器?

总的来说,任何关于使用 celery 进行测试的提示或建议都非常有帮助。

6 个回答

19

这是我测试基础类的一部分,它把 apply_async 方法给“模拟”了,并记录了对它的调用(这也包括 Task.delay)。虽然这个方法有点复杂,但在我过去几个月的使用中,它满足了我的需求。

from django.test import TestCase
from celery.task.base import Task
# For recent versions, Task has been moved to celery.task.app:
# from celery.app.task import Task
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

class CeleryTestCaseBase(TestCase):

    def setUp(self):
        super(CeleryTestCaseBase, self).setUp()
        self.applied_tasks = []

        self.task_apply_async_orig = Task.apply_async

        @classmethod
        def new_apply_async(task_class, args=None, kwargs=None, **options):
            self.handle_apply_async(task_class, args, kwargs, **options)

        # monkey patch the regular apply_sync with our method
        Task.apply_async = new_apply_async

    def tearDown(self):
        super(CeleryTestCaseBase, self).tearDown()

        # Reset the monkey patch to the original method
        Task.apply_async = self.task_apply_async_orig

    def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
        self.applied_tasks.append((task_class, tuple(args), kwargs))

    def assert_task_sent(self, task_class, *args, **kwargs):
        was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
                       for task in self.applied_tasks)
        self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args))

    def assert_task_not_sent(self, task_class):
        was_sent = any(task_class == task[0] for task in self.applied_tasks)
        self.assertFalse(was_sent, 'Task was not expected to be called, but was.  Applied tasks: %s' %                 self.applied_tasks)

下面是一个“随便想出来的”例子,展示你在测试案例中如何使用它:

mymodule.py

from my_tasks import SomeTask

def run_some_task(should_run):
    if should_run:
        SomeTask.delay(1, some_kwarg=2)

test_mymodule.py

class RunSomeTaskTest(CeleryTestCaseBase):
    def test_should_run(self):
        run_some_task(should_run=True)
        self.assert_task_sent(SomeTask, 1, some_kwarg=2)

    def test_should_not_run(self):
        run_some_task(should_run=False)
        self.assert_task_not_sent(SomeTask)
77

我喜欢在需要等待celery结果完成的测试中使用override_settings装饰器。

from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask

class AddTestCase(TestCase):

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        result = mytask.delay()
        self.assertTrue(result.successful())

如果你想把这个应用到所有测试中,可以使用celery测试运行器,具体可以参考这个链接,它基本上设置了这些相同的配置,只是把(BROKER_BACKEND = 'memory')改成了这样。

在设置中:

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

查看CeleryTestSuiteRunner的源代码,你会发现里面的逻辑很清楚。

47

试着设置:

BROKER_BACKEND = 'memory'

(感谢 asksol 的评论。)

撰写回答