将n个任务添加到celery队列并等待结果
我想把几个任务放到celery的队列里,然后等着结果。我有很多想法,比如用一些共享存储(像memcached、redis、数据库等等)来实现这个,但我觉得celery应该能自动处理这个问题,只是我在网上找不到相关的资料。
代码示例
def do_tasks(b):
for a in b:
c.delay(a)
return c.all_results_some_how()
3 个回答
2
我觉得你其实并不是想要延迟,而是想要Celery的异步功能。
我想你真正想要的是一个任务集(TaskSet):
from celery.task.sets import TaskSet
from someapp.tasks import sometask
def do_tasks(b):
job = TaskSet([sometask.subtask((a,)) for a in b])
result = job.apply_async()
# might want to handle result.successful() == False
return result.join()
15
Task.delay
这个方法会返回一个叫做 AsyncResult
的东西。你可以用 AsyncResult.get
来获取每个任务的结果。
为了做到这一点,你需要保存对这些任务的引用。
def do_tasks(b):
tasks = []
for a in b:
tasks.append(c.delay(a))
return [t.get() for t in tasks]
或者你可以使用 ResultSet
:
更新:ResultSet
已经不推荐使用了,请查看 @laffuste 的回答。
def do_tasks(b):
rs = ResultSet([])
for a in b:
rs.add(c.delay(a))
return rs.get()
42
对于Celery >= 3.0版本,TaskSet这个功能已经被淘汰了,取而代之的是group。
from celery import group
from tasks import add
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
add.s(16, 16),
add.s(32, 32),
])
在后台启动这个组:
result = job.apply_async()
等待:
result.join()