我有一个tornado
应用程序,它需要在ProcessPoolExecutor
上运行一个阻塞函数。此阻塞函数使用一个库,该库通过blinker
事件发出增量结果。我想收集这些事件,并在它们发生时将它们发送回我的tornado
应用程序。在
起初,tornado
似乎是这个用例的理想选择,因为它是异步的。我想我可以简单地将tornado.queues.Queue
对象传递给要在池中运行的函数,然后将put()
事件作为blinker
事件回调的一部分传递到这个队列中。在
但是,通过阅读tornado.queues.Queue
的文档,我了解到它们不是跨multiprocessing.Queue
这样的进程管理的,而且不是线程安全的。在
有没有办法在事件发生时从pool
中检索这些事件?我是否应该包装multiprocessing.Queue
以便它生成Futures
?这似乎不太可能奏效,因为我怀疑multiprocessing
的内部结构是否与{
[编辑] 这里有一些很好的线索:https://gist.github.com/hoffrocket/8050711
要收集传递给}库中的其他对象)。然后,由于
ProcessPoolExecutor
的任务的返回值,必须使用multiprocessing.Queue
(或{multiprocessing.Queue
只公开一个同步接口,所以必须在父进程中使用另一个线程从队列中读取数据(而不涉及实现细节)。这里可以使用一个文件描述符,但是我们暂时忽略它,因为它是未记录的并且可能会更改)。在下面是一个未经测试的快速示例:
你可以做得更简单。下面是一个协程,它向子进程提交四个慢函数调用并等待它们:
It输出:
^{pr2}$您可以看到,协同程序接收结果的顺序是它们完成的顺序,而不是它们提交的顺序:未来的编号1在未来的编号2之后解析,因为未来的编号1睡眠时间更长。convert ium yield将ProcessPoolExecutor返回的期货转换为可在协同程序中等待的Tornado兼容期货。在
每个future解析为calculate_slowly返回的值:在本例中,它与传入calculate \u slowly的数字相同,并且与calculate \u slowly休眠的秒数相同。在
要将其包含在RequestHandler中,请尝试以下操作:
您可以观察到
curl localhost:8888
服务器对客户机请求的响应是增量的。在相关问题 更多 >
编程相关推荐