一个运行更多任务的Celery任务
我正在使用celerybeat来启动一个主要任务,这个主要任务又会启动一些次要任务。我已经写好了这两个任务。
有没有简单的方法可以做到这一点?Celery允许在任务内部运行其他任务吗?
我的例子是:
@task
def compute(users=None):
if users is None:
users = User.objects.all()
tasks = []
for user in users:
tasks.append(compute_for_user.subtask((user.id,)))
job = TaskSet(tasks)
job.apply_async() # raises a IOError: Socket closed
@task
def compute_for_user(user_id):
#do some stuff
compute
是从celerybeat调用的,但当它尝试运行apply_async
时出现了IOError。有什么想法吗?
4 个回答
9
从3.0版本开始,“TaskSet”这个词就不再使用了……现在有了新的概念,叫做组(Groups)、链(Chains)和和弦(Chords),它们是一种特殊的子任务。具体可以查看这个链接:http://docs.celeryproject.org/en/3.1/whatsnew-3.0.html#group-chord-chain-are-now-subtasks
12
你可以使用类似这样的东西(在3.0版本中支持)
g = group(compute_for_user.s(user.id) for user in users)
g.apply_async()
38
来回答你最开始的问题:从2.0版本开始,Celery提供了一种简单的方法,可以从一个任务启动另一个任务。你所说的“次级任务”其实就是Celery所称的“子任务”。你可以查看这份文档,了解更多关于任务集、子任务和回调的信息,@Paperino已经很贴心地给你提供了链接。
到了3.0版本,Celery改用了组来处理这类情况和其他一些行为。
你的代码显示你已经对这个接口有一定了解。你真正想问的似乎是:“为什么我在尝试运行我的子任务集时会收到‘Socket Closed’的IOError
错误?”我觉得这个问题可能没人能回答,因为你没有提供足够的信息来描述你的程序。你的代码片段不能直接运行,所以我们无法自己检查你遇到的问题。请把IOError
的堆栈跟踪信息发上来,希望能有能帮你解决问题的人看到。