如何确认所有Future在Tornado中已解析?

4 投票
1 回答
2713 浏览
提问于 2025-04-18 11:44

我有一个解析应用程序,基本上做以下几件事:

  • 把一个叫 start() 的方法添加到 IOLoop 中,作为下次循环时要调用的回调函数。
  • start() 会调用另一个函数,我们叫它 get_input()
  • get_input() 是一个协程,它会从网上获取一些数据,然后安排另一个协程 process_input(),就像第一步中添加 start() 一样。
  • get_input() 还会检查一些依赖于获取数据的条件,并可能根据调整后的参数重新安排自己。

现在,当这个条件返回 False 时,我知道不会有新的输入项需要处理。
但是,我怎么知道 get_input()process_input() 还没有解决的任务呢?

我想这可以通过实现一种计数器来解决,每次调用 process_input() 时增加计数,解决后减少计数。
但是如果有一系列不同的协程呢?我怎么能跟踪它们的状态,以确保在我停止 IOLoop 时,没有任务会在解决之前就结束?

也许应该有某种层次化的计数方式……

编辑:

2 @dano
好的,我明白了……我之前没有注意到。你确实没有阻塞,因为它自己的调用在这个列表里。
但是!

  1. 这样的组织要求只能使用 yield 结构,不能用 add_callback,否则我们就失去了“等待”的概念。
  2. 递归层级会增加……嗯,不知道这是不是太糟糕了。

我今天想到的是“元未来”(metafuture)。
我创建了一个空的 Future() 对象。
我用我的装饰器装饰每个支持 @coroutine 的方法,这个装饰器会在“元未来”中增加计数,并为它们的未来任务添加一个自定义的完成回调,用于减少计数。

当计数达到零时,“元未来”通过调用 set_result(None) 来解决。
还有一个 IOLoop 的回调,正好返回这个元未来:

    @coroutine
    def wait_for_complete(self):
      yield self._input_data_collected
      yield self._all_futures_resolved
      self.complete()

所以在那之后,我们就知道没有待处理的任务了。这是一种比较复杂的方法,就像手动实现引用计数,但它也涵盖了 IOLoop.add_callback() 添加任务的方式。

1 个回答

4

你可以把你的方法写成在所有工作完成之前不返回,而不是安排回调函数。这样你就可以直接调用 IOLoop.run_sync(start),这个调用会一直等到所有处理完成后才返回:

from tornado import gen
from tornado.ioloop import IOLoop

@gen.coroutine
def start():
    yield get_input()

@gen.coroutine
def get_input(*args, **kwargs):
    data = yield fetch_data_over_net()
    futs = []  # list of Future objects
    futs.append(process_input(data))
    if should_call_myself(data):
        futs.append(get_input(newargs, newkwargs))
    yield futs # This will wait for all Future objects in the list to complete.

@gen.coroutine
def process_input(data):
    # do stuff

if __name__ == "__main__":
    IOLoop.instance().run_sync(start)

我们利用了协程会返回 Futures 的特点,而 Tornado 也支持 同时等待多个 Futures,这样我们就可以尽可能多地并行运行,而在所有依赖的工作完成之前,实际上不会从 get_input(因此也不会从 start)返回。

撰写回答