如何确认所有Future在Tornado中已解析?
我有一个解析应用程序,基本上做以下几件事:
- 把一个叫
start()
的方法添加到 IOLoop 中,作为下次循环时要调用的回调函数。 start()
会调用另一个函数,我们叫它get_input()
。get_input()
是一个协程,它会从网上获取一些数据,然后安排另一个协程process_input()
,就像第一步中添加start()
一样。get_input()
还会检查一些依赖于获取数据的条件,并可能根据调整后的参数重新安排自己。
现在,当这个条件返回 False
时,我知道不会有新的输入项需要处理。
但是,我怎么知道 get_input()
和 process_input()
还没有解决的任务呢?
我想这可以通过实现一种计数器来解决,每次调用 process_input()
时增加计数,解决后减少计数。
但是如果有一系列不同的协程呢?我怎么能跟踪它们的状态,以确保在我停止 IOLoop 时,没有任务会在解决之前就结束?
也许应该有某种层次化的计数方式……
编辑:
2 @dano
好的,我明白了……我之前没有注意到。你确实没有阻塞,因为它自己的调用在这个列表里。
但是!
- 这样的组织要求只能使用
yield
结构,不能用add_callback
,否则我们就失去了“等待”的概念。 - 递归层级会增加……嗯,不知道这是不是太糟糕了。
我今天想到的是“元未来”(metafuture)。
我创建了一个空的 Future()
对象。
我用我的装饰器装饰每个支持 @coroutine
的方法,这个装饰器会在“元未来”中增加计数,并为它们的未来任务添加一个自定义的完成回调,用于减少计数。
当计数达到零时,“元未来”通过调用 set_result(None)
来解决。
还有一个 IOLoop 的回调,正好返回这个元未来:
@coroutine
def wait_for_complete(self):
yield self._input_data_collected
yield self._all_futures_resolved
self.complete()
所以在那之后,我们就知道没有待处理的任务了。这是一种比较复杂的方法,就像手动实现引用计数,但它也涵盖了 IOLoop.add_callback()
添加任务的方式。
1 个回答
你可以把你的方法写成在所有工作完成之前不返回,而不是安排回调函数。这样你就可以直接调用 IOLoop.run_sync(start)
,这个调用会一直等到所有处理完成后才返回:
from tornado import gen
from tornado.ioloop import IOLoop
@gen.coroutine
def start():
yield get_input()
@gen.coroutine
def get_input(*args, **kwargs):
data = yield fetch_data_over_net()
futs = [] # list of Future objects
futs.append(process_input(data))
if should_call_myself(data):
futs.append(get_input(newargs, newkwargs))
yield futs # This will wait for all Future objects in the list to complete.
@gen.coroutine
def process_input(data):
# do stuff
if __name__ == "__main__":
IOLoop.instance().run_sync(start)
我们利用了协程会返回 Futures
的特点,而 Tornado 也支持 同时等待多个 Futures,这样我们就可以尽可能多地并行运行,而在所有依赖的工作完成之前,实际上不会从 get_input
(因此也不会从 start
)返回。