从Tornado的ProcessPoolExecu收集增量结果

2024-04-24 13:14:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个tornado应用程序,它需要在ProcessPoolExecutor上运行一个阻塞函数。此阻塞函数使用一个库,该库通过blinker事件发出增量结果。我想收集这些事件,并在它们发生时将它们发送回我的tornado应用程序。在

起初,tornado似乎是这个用例的理想选择,因为它是异步的。我想我可以简单地将tornado.queues.Queue对象传递给要在池中运行的函数,然后将put()事件作为blinker事件回调的一部分传递到这个队列中。在

但是,通过阅读tornado.queues.Queue的文档,我了解到它们不是跨multiprocessing.Queue这样的进程管理的,而且不是线程安全的。在

有没有办法在事件发生时从pool中检索这些事件?我是否应该包装multiprocessing.Queue以便它生成Futures?这似乎不太可能奏效,因为我怀疑multiprocessing的内部结构是否与{}兼容。在

[编辑] 这里有一些很好的线索:https://gist.github.com/hoffrocket/8050711


Tags: 对象函数应用程序队列queueput事件用例
2条回答

要收集传递给ProcessPoolExecutor的任务的返回值,必须使用multiprocessing.Queue(或{}库中的其他对象)。然后,由于multiprocessing.Queue只公开一个同步接口,所以必须在父进程中使用另一个线程从队列中读取数据(而不涉及实现细节)。这里可以使用一个文件描述符,但是我们暂时忽略它,因为它是未记录的并且可能会更改)。在

下面是一个未经测试的快速示例:

queue = multiprocessing.Queue()
proc_pool = concurrent.futures.ProcessPoolExecutor()
thread_pool = concurrent.futures.ThreadPoolExecutor()

async def read_events():
    while True:
        event = await thread_pool.submit(queue.get)
        print(event)

async def foo():
    IOLoop.current.spawn_callback(read_events)
    await proc_pool.submit(do_something_and_write_to_queue)

你可以做得更简单。下面是一个协程,它向子进程提交四个慢函数调用并等待它们:

from concurrent.futures import ProcessPoolExecutor
from time import sleep

from tornado import gen, ioloop

pool = ProcessPoolExecutor()


def calculate_slowly(x):
    sleep(x)
    return x


async def parallel_tasks():
    # Create futures in a randomized order.
    futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
               for i in [1, 3, 2, 4]]

    wait_iterator = gen.WaitIterator(*futures)
    while not wait_iterator.done():
        try:
            result = await wait_iterator.next()
        except Exception as e:
            print("Error {} from {}".format(e, wait_iterator.current_future))
        else:
            print("Result {} received from future number {}".format(
                result, wait_iterator.current_index))


ioloop.IOLoop.current().run_sync(parallel_tasks)

It输出:

^{pr2}$

您可以看到,协同程序接收结果的顺序是它们完成的顺序,而不是它们提交的顺序:未来的编号1在未来的编号2之后解析,因为未来的编号1睡眠时间更长。convert ium yield将ProcessPoolExecutor返回的期货转换为可在协同程序中等待的Tornado兼容期货。在

每个future解析为calculate_slowly返回的值:在本例中,它与传入calculate \u slowly的数字相同,并且与calculate \u slowly休眠的秒数相同。在

要将其包含在RequestHandler中,请尝试以下操作:

class MainHandler(web.RequestHandler):
    async def get(self):
        self.write("Starting....\n")
        self.flush()

        futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
                   for i in [1, 3, 2, 4]]

        wait_iterator = gen.WaitIterator(*futures)
        while not wait_iterator.done():
            result = await wait_iterator.next()
            self.write("Result {} received from future number {}\n".format(
                result, wait_iterator.current_index))

            self.flush()


if __name__ == "__main__":
    application = web.Application([
        (r"/", MainHandler),
    ])
    application.listen(8888)
    ioloop.IOLoop.instance().start()

您可以观察到curl localhost:8888服务器对客户机请求的响应是增量的。在

相关问题 更多 >