从未知任务中获取Celery的'task_id'结果

15 投票
2 回答
14072 浏览
提问于 2025-04-18 10:54

我该如何获取一个任务的结果,如果我事先不知道是哪个任务被执行了呢?

这是我的设置:

假设有一个名为 'tasks.py' 的文件:

from celery import Celery

app = Celery('tasks', backend="db+mysql://u:p@localhost/db", broker = 'amqp://guest:guest@localhost:5672//')

@app.task
def add(x,y):
   return x + y


@app.task
def mul(x,y):
   return x * y

在本地运行着 RabbitMQ 3.3.2:

marcs-mbp:sbin marcstreeter$ ./rabbitmq-server

              RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.

在本地运行着 Celery 3.1.12:

 -------------- celery@Marcs-MacBook-Pro.local v3.1.12 (Cipater)
---- **** -----
--- * ***  * -- Darwin-13.2.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x105dea3d0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

我可以导入这个方法,并通过 'task_id' 来获取结果:

from tasks import add, mul
from celery.result import AsyncResult

result = add.delay(2,2)
task_id = result.task_id
result.get() # 4

result = AsyncResult(id=task_id)
result.get() # 4

result = add.AsyncResult(id=task_id)
result.get() # 4

# and the same for the 'mul' task. Just imagine I put it here

在接下来的例子中,我把这些步骤分开在不同的进程中。在一个进程里,我这样获取 'task_id':

from tasks import add

result = add.delay(5,5)
task_id = result.task_id

然后在另一个进程中,如果我使用相同的 'task_id'(复制并粘贴到另一个 REPL,或者在不同的 HTTP 请求中),我可以这样做:

from celery.result import AsyncResult

result = AsyncResult(id="copied_task_id", backend="db+mysql://u:p@localhost/db")
result.get() # AttributeError: 'str' object has no attribute 'get_task_meta'
result.state # AttributeError: 'str' object has no attribute 'get_task_meta'
result.status # AttributeError: 'str' object has no attribute 'get_task_meta'

在另一个进程中,如果我这样做:

from task import add # in this instance I know that an add task was performed

result = add.AsyncResult(id="copied_task_id")
result.status # "SUCCESSFUL"
result.state # "SUCCESSFUL"
result.get() # 10

我希望能够在不知道具体是哪个任务生成结果的情况下获取结果。在我的实际环境中,我打算把这个 task_id 返回给客户端,让他们通过 HTTP 请求查询他们工作的状态。

2 个回答

0

如果这对某些人有帮助,实际上backend这个参数并不是期待一个字符串,而是需要一个Backend对象:如何为celery任务重写后端

对我有效的是:

from celery.backends.rpc import RPCBackend
from myapp.workers.main import app as worker

@worker.task(backend=RPCBackend(app=worker))
def status_check():
    return "OK"
26

好的,我找了很久的解决方案,现在终于正式发帖了,并且查看了文档,我发现了这个宝藏

class celery.result.AsyncResult(id, backend=None, task_name=None, app=None, parent=None)

查询任务状态。

参数

id – 参见 id

backend – 参见 backend

异常 TimeoutError

超时时引发的错误。

AsyncResult.app = None

所以,我没有提供backend参数,而是提供了“app”参数,像这样:

from celery.result import AsyncResult
from task import app

# Assuming add.delay(10,10) was called in another process
# and that I'm using a 'task_id' I retrieved from that process

result = AsyncResult(id='copied_task_id', app=app)
result.state # 'SUCCESSFUL'
result.get() # 20

这对很多人来说可能很明显,但我当时并没有意识到。现在我只能说这个解决方案“就是好用”,但如果我知道这是官方推荐的做法,我会更安心。如果你知道文档中有哪个部分能更清楚地说明这一点,请在评论中或作为答案发布,我会尽量选择它作为答案。

撰写回答