如何检查函数是否由celery执行?在
def notification():
# in_celery() returns True if called from celery_test(),
# False if called from not_celery_test()
if in_celery():
# Send mail directly without creation of additional celery subtask
...
else:
# Send mail with creation of celery task
...
@celery.task()
def celery_test():
notification()
def not_celery_test():
notification()
这里有一种方法可以使用
celery.current_task
。以下是任务要使用的代码:您可以运行以下代码来执行上述操作:
^{pr2}$第一个代码片段中的任务代码位于名为
core.tasks
的模块中。我把最后一段代码塞进了一个自定义的Django管理命令中。测试3种情况:直接调用
notification
。通过同步执行的任务调用
notification
。也就是说,这个任务不是通过芹菜分派给工人的。任务的代码在调用notify
的同一进程中执行。通过工作线程运行的任务调用
notification
。任务的代码在与启动该任务的进程不同的进程中执行。结果是:
任务中在
DISPATCHED
之后的输出上没有来自任务中print
的行,因为该行在工作进程日志中结束:重要提示:我最初在第一个测试中使用}),但不是{}。不确定那里发生了什么。使用
if current_task is None
,但它没有起作用。我反复检查。不知何故,Celery将current_task
设置为一个看起来像None
(如果在它上面使用repr
,则得到{if not current_task
可以。在另外,我已经在Django应用程序中测试了上面的代码,但是我没有在生产中使用它。可能有我不知道的陷阱。在
相关问题 更多 >
编程相关推荐