将n个任务添加到celery队列并等待结果

29 投票
3 回答
22396 浏览
提问于 2025-04-30 23:57

我想把几个任务放到celery的队列里,然后等着结果。我有很多想法,比如用一些共享存储(像memcached、redis、数据库等等)来实现这个,但我觉得celery应该能自动处理这个问题,只是我在网上找不到相关的资料。

代码示例

def do_tasks(b):
    for a in b:
        c.delay(a)

    return c.all_results_some_how()
暂无标签

3 个回答

2

我觉得你其实并不是想要延迟,而是想要Celery的异步功能。

我想你真正想要的是一个任务集(TaskSet)

from celery.task.sets import TaskSet
from someapp.tasks import sometask

def do_tasks(b):
    job = TaskSet([sometask.subtask((a,)) for a in b])
    result = job.apply_async()
    # might want to handle result.successful() == False
    return result.join()
15

Task.delay 这个方法会返回一个叫做 AsyncResult 的东西。你可以用 AsyncResult.get 来获取每个任务的结果。

为了做到这一点,你需要保存对这些任务的引用。

def do_tasks(b):
    tasks = []
    for a in b:
        tasks.append(c.delay(a))
    return [t.get() for t in tasks]

或者你可以使用 ResultSet

更新ResultSet 已经不推荐使用了,请查看 @laffuste 的回答

def do_tasks(b):
    rs = ResultSet([])
    for a in b:
        rs.add(c.delay(a))
    return rs.get()
42

对于Celery >= 3.0版本,TaskSet这个功能已经被淘汰了,取而代之的是group

from celery import group
from tasks import add

job = group([
             add.s(2, 2),
             add.s(4, 4),
             add.s(8, 8),
             add.s(16, 16),
             add.s(32, 32),
])

在后台启动这个组:

result = job.apply_async()

等待:

result.join()

撰写回答