手动返回celery任务的错误结果和状态失败?

5 投票
2 回答
10982 浏览
提问于 2025-04-17 22:37

我创建了一些celery任务,用来运行一些用nodejs写的javascript工作。这些任务基本上是通过subprocess.popen来调用nodejs。

当nodejs的工作结束时,如果出现错误,它会返回一个非零的状态,同时在错误输出中写入错误信息。

当这种情况发生时,我想把错误输出的内容拿来,作为“结果”返回给celery,并且带上一个FAILURE的状态,这样我的工作监控工具就能显示这个工作失败了。

我该怎么做呢?

这是我的任务

@app.task
def badcommand():
    try:
       output = subprocess.check_output('ls foobar',stderr=subprocess.STDOUT,shell=True)
       return output
    except subprocess.CalledProcessError as er:
       #What do I do here to return er.output, and set the status to fail?

如果我不捕捉这个子进程的异常,任务会正常失败,但结果是空的,反而会得到一个错误追踪的堆栈信息。

如果我捕捉了这个异常,并返回er.output,那么任务就会被标记为成功完成。

2 个回答

5

你可以使用一个基础类,并指定在失败时该怎么处理。

class YourBase(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print "Failure"

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print "Success"

@app.task(base=YourBase)
def badcommand():
   output = subprocess.check_output('ls foobar', stderr=subprocess.STDOUT, shell=True)
   return output

这些是你的基础类可以使用的处理器:http://celery.readthedocs.org/en/latest/userguide/tasks.html#handlers

8

你可以使用 celery.app.task.Task.update_state 这个方法来更新当前任务的状态。

@app.task(bind=True)
def badcommand(self):
    try:
       output = subprocess.check_output('ls foobar',stderr=subprocess.STDOUT,shell=True)
       return output
    except subprocess.CalledProcessError as er:
        self.update_state(state='FAILURE', meta={'exc': er})

需要注意的是,bind 参数是在 Celery 3.1 版本中引入的。如果你还在使用旧版本,我认为你可以这样调用 update_state 这个任务方法:

@app.task
def badcommand():
    ...
    except subprocess.CalledProcessError as er:
        badcommand.update_state(state='FAILURE', meta={'exc': er})    

撰写回答