具有任意大小的组/和弦的异步芹菜任务

2024-04-27 00:37:03 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我有以下芹菜代码:

from celery import Celery, group

app = Celery()
app.conf.update(
    broker_url=some_broker,
    result_backend=some_backend,
)


@app.task
def generate_list(n):
    return list(range(n))


@app.task
def double_it(value):
    return value * 2


@app.task
def tsum(value):
    return sum(value

创建一个组或一个和弦可以很容易地产生一个异步结果,从而产生实际的结果:

^{pr2}$

然而,我不知道如何在不同步部件的情况下,对任意大小的列表执行类似的操作。我试过了:

>>> @app.task
... def make_group(items):
...     return group([double_it.s(x) for x in items])()

>>> @app.task
... def make_chord(items):
...     return chord(
...         [double_it.s(x) for x in items],
...         tsum.s(),
...     )()

>>> (generate_list.s(5) | make_group.s())().get()
['5d23a31f-6da3-4fec-8bd0-6e57a4da9c43',
 [[['7d2cf535-96d6-40fe-b76b-d118aaffbc95', None], None],
  [['5ef96f17-8ac7-4c1b-a794-c0e6e85c2b8c', None], None],
  [['ec595134-76af-40e8-a211-d2592acd250f', None], None],
  [['146b0da2-c9bf-49c3-a259-a98808c2b1af', None], None],
  [['9ad3eb4a-92e2-4194-ae9f-3aa17dbf0333', None], None]]]

>>> (generatelist.s(5) | make_chord.s())().get()
    [['045a5130-cc33-4ba7-99cc-58dd94675515',
      ['771f444a-2df5-4436-96e0-6e1774c2b437',
       [[['4b88c20c-f5ac-485e-aae8-b562c5865cd5', None], None],
        [['b5890545-48eb-49d9-8a5b-d638381ce3ab', None], None],
        [['e8f19456-5b29-4d2f-b8e2-d81cb9f1c258', None], None],
        [['aa95ba0f-04d3-4bc1-a34a-905076fe4cb7', None], None],
        [['155f2183-e673-4921-be4e-568b5011b1e9', None], None]]]],
     None]

…它似乎返回了一个奇怪的嵌套列表中的各种任务ID,该列表包含一组None。尝试生成chord会导致错误,即无法对GroupResult进行JSON序列化,这可能是因为make_group正在立即返回并转到下一个任务,而不是正在生成一个chord。显式地生成和弦(而不是隐式地使用group->;任务链)可以消除错误,但仍然可以使用任务ID立即返回

解决这个问题的唯一方法是等待链的结果,然后使用任务ID为适当的任务创建异步结果。在

>>> from celery.result import AsyncResult
>>> task_id = (generate_list.s(5) | make_chord.s())().get()[0][0]
>>> AsyncResult(task_id).get()
20

您应该如何在不依赖于获取适当的任务ID并从中创建一个AsyncResult的情况下实现这一点?在


Tags: noneidapptaskgetmakereturnvalue