正确模拟芹菜任务,这是被调用的另一个芹菜助教

2024-05-21 02:48:02 发布

您现在位置:Python中文网/ 问答频道 /正文

如何正确模拟在另一个芹菜任务中调用的芹菜任务?(下面是虚拟代码)

@app.task
def task1(smthg):
    do_so_basic_stuff_1
    do_so_basic_stuff_2
    other_thing(smthg)

@app.task
def task2(smthg):
    if condition:
        task1.delay(smthg[1])
    else:
        task1.delay(smthg)

我有完全相同的代码结构在我的模块。项目/项目/我的_模块.py 我正试着在proj/tests/cel_test中编写test/测试.py在

测试功能:

^{pr2}$

Tags: 模块项目代码pyapptasksobasic
2条回答

您没有调用task1()task2(),而是调用它们的方法:delay()和{}-因此您需要测试这些方法是否被调用。在

下面是我根据您的代码编写的一个工作示例:

任务.py

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def task1():
    return 'task1'

@app.task
def task2():
    task1.delay()

测试.py

^{pr2}$

测试结果:

$ pytest -vvv test.py
============================= test session starts ==============================
platform linux   Python 3.5.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0   /home/kris/.virtualenvs/3/bin/python3
cachedir: .cache
rootdir: /home/kris/projects/tmp, inifile:
plugins: mock-1.6.2, celery-4.1.0
collected 1 item                                                                

test.py::test_task2 PASSED

=========================== 1 passed in 0.02 seconds ===========================

首先,测试芹菜任务可能非常困难。我通常把我所有的逻辑放到一个不是任务的函数中,然后创建一个只调用该函数的任务,这样就可以正确地测试逻辑。在

第二,我不认为你想在任务内部调用任务(不确定,但我认为这通常不被推荐)。相反,根据您的需要,您可能应该链接或分组:

http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives

最后,为了回答您的实际问题,您需要将delay方法精确地修补在代码中出现的位置,如this post所述。在

相关问题 更多 >