检查是否有芹菜助教

2024-04-24 10:15:40 发布

您现在位置:Python中文网/ 问答频道 /正文

如何检查函数是否由celery执行?在

def notification():
   # in_celery() returns True if called from celery_test(), 
   #                     False if called from not_celery_test()
   if in_celery():
      # Send mail directly without creation of additional celery subtask
      ...
   else:
      # Send mail with creation of celery task
      ...

@celery.task()
def celery_test():
    notification()

def not_celery_test():
    notification()

Tags: of函数infromtestsendtaskif
1条回答
网友
1楼 · 发布于 2024-04-24 10:15:40

这里有一种方法可以使用celery.current_task。以下是任务要使用的代码:

def notification():
    from celery import current_task
    if not current_task:
        print "directly called"
    elif current_task.request.id is None:
        print "called synchronously"
    else:
        print "dispatched"

@app.task
def notify():
    notification()

您可以运行以下代码来执行上述操作:

^{pr2}$

第一个代码片段中的任务代码位于名为core.tasks的模块中。我把最后一段代码塞进了一个自定义的Django管理命令中。测试3种情况:

  • 直接调用notification

  • 通过同步执行的任务调用notification。也就是说,这个任务不是通过芹菜分派给工人的。任务的代码在调用notify的同一进程中执行。

  • 通过工作线程运行的任务调用notification。任务的代码在与启动该任务的进程不同的进程中执行。

结果是:

NOT DISPATCHED
called synchronously
DISPATCHED
DIRECT
directly called

任务中在DISPATCHED之后的输出上没有来自任务中print的行,因为该行在工作进程日志中结束:

[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched

重要提示:我最初在第一个测试中使用if current_task is None,但它没有起作用。我反复检查。不知何故,Celery将current_task设置为一个看起来像None(如果在它上面使用repr,则得到{}),但不是{}。不确定那里发生了什么。使用if not current_task可以。在

另外,我已经在Django应用程序中测试了上面的代码,但是我没有在生产中使用它。可能有我不知道的陷阱。在

相关问题 更多 >