Python芹菜:如何无序地加入任务结果?

2024-04-20 03:08:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个简单的项目,我创建了一堆彼此不相关的工作,创建任务,将它们传递给Redis,让一些工人分散在一个Docker Swarm上,在长时间运行的任务队列中咀嚼。当worker完成时,他们将完成的工作转储到NFS共享中,并将文本值发送回芹菜客户端。你知道吗

我在用芹菜.result.ResultSet的.join()函数。join()包含一个回调(目前)只打印结果。你知道吗

我的问题是join()阻塞,直到它按照给定的顺序接收每个asyncresult值。我的swarm是由许多主机组成的,这些主机是完全不同的机器,对我来说,重要的是在它们完成时返回结果,而不是按顺序或一旦它们全部完成。你知道吗

有没有一种方法可以在任务完成时通过芹菜正确地触发回调函数?我在网上看了很多例子,似乎我唯一的选择就是试试asyncio,但Python并不是我的强项。你知道吗

用于创建任务和结果集对象的函数:

def populateQueue(encodeTasks):
r = ResultSet([])
taskHandles = {}

for task in encodeTasks:
    try:
        ret = encode.delay(task)
        r.add(ret)
        logging.debug("Task ID: " + str(ret.task_id))
        taskHandles[ret.task_id] = ret 
    except:
        logging.info("populateQueue fail: " + str(task.traceback))

logging.info("Tasks queued: " + str(len(taskHandles)))
return taskHandles, r

main()的一部分,用于等待结果:

        frameCountTotal = getFrameCount(targetFile)
        encodeTasks = buildCmdString(targetFile, frameCountTotal, clientCount)
        taskHandles, retSet = populateQueue(encodeTasks)

        logging.info("Waiting on tasks...")
        retSet.join(callback=testCallback)

提前谢谢


Tags: 函数infoidtask顺序logging芹菜join
1条回答
网友
1楼 · 发布于 2024-04-20 03:08:47

找到了我自己问题的答案:

ResultSet有另一个名为join\u native()的方法,我认为只要代理是几种已知产品(RabbitMQ、Redis等)中的一种,它就会使用对代理更具体的API调用。芹菜的文件只是说,它提供了更好的性能,如果你满足经纪人的要求。文档没有说明的是它允许无序退货(至少在Redis上,还没有尝试过RMQ)。你知道吗

相关问题 更多 >