我使用Celery异步执行一组操作。这些操作有很多,而且每一个都可能需要很长时间,因此我不想将结果发送回芹菜工作者函数的返回值中,而是希望将它们作为自定义状态更新一次发送回一个。这样,调用者就可以实现一个带有更改状态回调的进度条,并且worker函数的返回值的大小可以是常量,而不是操作数的线性。你知道吗
下面是一个简单的示例,我使用芹菜worker函数add_pairs_of_numbers
添加一个成对数字的列表,为每个添加的成对数字发送一个自定义状态更新。你知道吗
#!/usr/bin/env python
"""
Run worker with:
celery -A tasks worker --loglevel=info
"""
from celery import Celery
app = Celery("tasks", broker="pyamqp://guest@localhost//", backend="rpc://")
@app.task(bind=True)
def add_pairs_of_numbers(self, pairs):
for x, y in pairs:
self.update_state(state="SUM", meta={"x":x, "y":y, "x+y":x+y})
return len(pairs)
def handle_message(message):
if message["status"] == "SUM":
x = message["result"]["x"]
y = message["result"]["y"]
print(f"Message: {x} + {y} = {x+y}")
def non_looping(*pairs):
task = add_pairs_of_numbers.delay(pairs)
result = task.get(on_message=handle_message)
print(result)
def looping(*pairs):
task = add_pairs_of_numbers.delay(pairs)
print(task)
while True:
pass
if __name__ == "__main__":
import sys
if sys.argv[1:] and sys.argv[1] == "looping":
looping((3,4), (2,7), (5,5))
else:
non_looping((3,4), (2,7), (5,5))
如果只运行./tasks
,它将执行non_looping
函数。这就是标准的芹菜:对worker函数进行延迟调用,然后使用get
等待结果。handle_message
回调函数打印每条消息,并返回添加的对数作为结果。这就是我想要的。你知道吗
$ ./task.py
Message: 3 + 4 = 7
Message: 2 + 7 = 9
Message: 5 + 5 = 10
3
尽管对于这个简单的例子来说,非循环场景已经足够了,但我尝试完成的实际任务是处理一批文件,而不是添加成对的数字。此外,客户机是Flaskrestapi,因此不能包含任何阻塞get
调用。在上面的脚本中,我用looping
函数模拟这个约束。此函数启动异步芹菜任务,但不等待响应。(后面的无限while
循环模拟web服务器继续运行和处理其他请求。)
如果使用参数“looping”运行脚本,它将运行以下代码路径。在这里,它立即打印芹菜任务ID,然后放入无限循环。你知道吗
$ ./tasks.py looping
a39c54d3-2946-4f4e-a465-4cc3adc6cbe5
芹菜工人日志显示add操作已经执行,但是调用者没有定义回调函数,所以它永远不会得到结果。你知道吗
(我意识到这个特殊的例子是令人尴尬的并行,所以我可以使用chunks将其划分为多个任务。但是,在我的非简化现实世界中,我有一些无法并行化的任务。)
我想要的是能够在looping
场景中指定回调。像这样的。你知道吗
def looping(*pairs):
task = add_pairs_of_numbers.delay(pairs, callback=handle_message) # There is no such callback.
print(task)
while True:
pass
在芹菜文档和我能在网上找到的所有示例(例如this)中,没有办法将回调函数定义为delay
调用或其apply_async
等价调用的一部分。只能将一个指定为get
回调的一部分。这让我觉得这是一个有意的设计决定。你知道吗
在我的restapi场景中,我可以通过让芹菜工人进程以HTTP post的形式将“状态更新”发送回Flask服务器来解决这个问题,但是这看起来很奇怪,因为我开始复制芹菜中已经存在的HTTP中的消息传递逻辑。你知道吗
有没有办法编写我的looping
场景,以便调用者接收回调而不进行阻塞调用,或者这在芹菜中是明确禁止的?你知道吗
这是芹菜不支持的模式,尽管您可以(某种程度上)通过向任务as described here发布自定义状态更新来解决它。你知道吗
芹菜不支持这样一种模式的原因是任务生产者(调用者)与任务消费者(工作者)强烈解耦,两者之间的唯一通信是支持生产者与消费者之间通信的代理,结果后端支持消费者与生产者之间的通信。当前最接近的方法是轮询任务状态或编写自定义结果后端,允许您通过AMP-RPC或redis订阅发布事件。你知道吗
相关问题 更多 >
编程相关推荐