芹菜能将状态更新传递给非阻塞调用方吗?

2024-06-16 18:50:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用Celery异步执行一组操作。这些操作有很多,而且每一个都可能需要很长时间,因此我不想将结果发送回芹菜工作者函数的返回值中,而是希望将它们作为自定义状态更新一次发送回一个。这样,调用者就可以实现一个带有更改状态回调的进度条,并且worker函数的返回值的大小可以是常量,而不是操作数的线性。你知道吗

下面是一个简单的示例,我使用芹菜worker函数add_pairs_of_numbers添加一个成对数字的列表,为每个添加的成对数字发送一个自定义状态更新。你知道吗

#!/usr/bin/env python

"""
Run worker with:

    celery -A tasks worker --loglevel=info
"""
from celery import Celery

app = Celery("tasks", broker="pyamqp://guest@localhost//", backend="rpc://")

@app.task(bind=True)
def add_pairs_of_numbers(self, pairs):
    for x, y in pairs:
        self.update_state(state="SUM", meta={"x":x, "y":y, "x+y":x+y})
    return len(pairs)

def handle_message(message):
    if message["status"] == "SUM":
        x = message["result"]["x"]
        y = message["result"]["y"]
        print(f"Message: {x} + {y} = {x+y}")

def non_looping(*pairs):
    task = add_pairs_of_numbers.delay(pairs)
    result = task.get(on_message=handle_message)
    print(result)

def looping(*pairs):
    task = add_pairs_of_numbers.delay(pairs)
    print(task)
    while True:
        pass

if __name__ == "__main__":
    import sys

    if sys.argv[1:] and sys.argv[1] == "looping":
        looping((3,4), (2,7), (5,5))
    else:
        non_looping((3,4), (2,7), (5,5))

如果只运行./tasks,它将执行non_looping函数。这就是标准的芹菜:对worker函数进行延迟调用,然后使用get等待结果。handle_message回调函数打印每条消息,并返回添加的对数作为结果。这就是我想要的。你知道吗

$ ./task.py
Message: 3 + 4 = 7
Message: 2 + 7 = 9
Message: 5 + 5 = 10
3

尽管对于这个简单的例子来说,非循环场景已经足够了,但我尝试完成的实际任务是处理一批文件,而不是添加成对的数字。此外,客户机是Flaskrestapi,因此不能包含任何阻塞get调用。在上面的脚本中,我用looping函数模拟这个约束。此函数启动异步芹菜任务,但不等待响应。(后面的无限while循环模拟web服务器继续运行和处理其他请求。)

如果使用参数“looping”运行脚本,它将运行以下代码路径。在这里,它立即打印芹菜任务ID,然后放入无限循环。你知道吗

$ ./tasks.py looping
a39c54d3-2946-4f4e-a465-4cc3adc6cbe5

芹菜工人日志显示add操作已经执行,但是调用者没有定义回调函数,所以它永远不会得到结果。你知道吗

(我意识到这个特殊的例子是令人尴尬的并行,所以我可以使用chunks将其划分为多个任务。但是,在我的非简化现实世界中,我有一些无法并行化的任务。)

我想要的是能够在looping场景中指定回调。像这样的。你知道吗

def looping(*pairs):
    task = add_pairs_of_numbers.delay(pairs, callback=handle_message) # There is no such callback.
    print(task)
    while True:
        pass

在芹菜文档和我能在网上找到的所有示例(例如this)中,没有办法将回调函数定义为delay调用或其apply_async等价调用的一部分。只能将一个指定为get回调的一部分。这让我觉得这是一个有意的设计决定。你知道吗

在我的restapi场景中,我可以通过让芹菜工人进程以HTTP post的形式将“状态更新”发送回Flask服务器来解决这个问题,但是这看起来很奇怪,因为我开始复制芹菜中已经存在的HTTP中的消息传递逻辑。你知道吗

有没有办法编写我的looping场景,以便调用者接收回调而不进行阻塞调用,或者这在芹菜中是明确禁止的?你知道吗


Tags: of函数addmessagetask状态defresult
1条回答
网友
1楼 · 发布于 2024-06-16 18:50:02

这是芹菜不支持的模式,尽管您可以(某种程度上)通过向任务as described here发布自定义状态更新来解决它。你知道吗

Use update_state() to update a task’s state:.

def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})```

芹菜不支持这样一种模式的原因是任务生产者(调用者)与任务消费者(工作者)强烈解耦,两者之间的唯一通信是支持生产者与消费者之间通信的代理,结果后端支持消费者与生产者之间的通信。当前最接近的方法是轮询任务状态或编写自定义结果后端,允许您通过AMP-RPC或redis订阅发布事件。你知道吗

相关问题 更多 >