任意未来列表中的龙卷风生成器恢复

3 投票
1 回答
2226 浏览
提问于 2025-04-17 21:25

在tornado(或者asyncio)中,有没有一种方式可以等待一个列表中的任意一个Future,而不是等所有的Future都完成?

yield any_of([future1, future2, future3])

比如说,如果future2已经准备好了,那么结果应该是:

[None, <result>, None]

1 个回答

5

更新: Tornado 现在有了 tornado.gen.WaitIterator,请根据它的文档中的示例使用,而不是我下面的想法。

你可以创建一个名为 Any 的类,它继承自 Future,并且可以包装一个未来列表。这个 Any 类会等到其中一个未来完成,然后给你返回结果列表:

import time
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.concurrent import Future


@gen.coroutine
def delayed_msg(seconds, msg):
    yield gen.Task(IOLoop.current().add_timeout,
                   time.time() + seconds)
    raise gen.Return(msg)


class Any(Future):
    def __init__(self, futures):
        super(Any, self).__init__()
        self.futures = futures
        for future in futures:
            future.add_done_callback(self.done_callback)

    def done_callback(self, future):
        try:
            self.set_result(self.make_result())
        except Exception as e:
            self.set_exception(e)

    def make_result(self):
        """A list of results: None for each pending future, a result for
        each resolved future. Raises an exception for the first future
        that has an exception.
        """
        return [f.result() if f.done() else None
                for f in self.futures]

    def clear(self):
        """Break reference cycle with any pending futures."""
        self.futures = None


@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_msg(3, '3')
    future3 = delayed_msg(1, '1')
    results = yield Any([future1, future2, future3])
    end = time.time()
    print "finished in %.1f sec: %r" % (end - start, results)

    results = yield Any([future1, future2])
    end = time.time()
    print "finished in %.1f sec: %r" % (end - start, results)

IOLoop.current().run_sync(f)

如你所料,这会打印:

finished in 1.0 sec: [None, None, '1']
finished in 2.0 sec: ['2', None]

不过,你会发现这种方法有一些复杂之处。首先,如果你想在第一个未来完成后继续等待其余的未来,构建仍在等待的未来列表就变得复杂。我想你可以这样做:

results = yield Any(f for f in [future1, future2, future3] if not f.done())

这看起来不太好,而且甚至不正确!这里有一个竞争条件。如果一个未来在连续执行 yield Any(...) 之间完成,那么你将永远无法收到它的结果。第一次 yield 没有得到未来的结果,因为它还在等待,但第二次 yield 也得不到结果,因为到那时这个未来已经“完成”,而且不再包含在传给 Any 的列表中。

另一个复杂之处是 Any 引用每个未来,而每个未来又引用一个回调,这个回调又回到 Any。为了及时清理内存,你应该调用 Any.clear()。

此外,你无法区分一个等待中的未来和一个结果为 None 的未来。你需要一个特殊的标记值来表示一个等待中的未来,这个值要和 None 不同。

最后一个复杂之处是最糟糕的。如果多个未来完成,并且其中一些有异常,Any 没有明显的方法将所有这些信息传达给你。将异常和结果混合在一个列表中会很麻烦。

我认为有一个更简单的方法。我们可以让 Any 只返回第一个完成的未来,而不是结果列表:

class Any(Future):
    def __init__(self, futures):
        super(Any, self).__init__()
        for future in futures:
            future.add_done_callback(self.done_callback)

    def done_callback(self, future):
        self.set_result(future)

这样就消除了引用循环,异常处理的问题也解决了:Any 类返回整个未来给你,而不是它的结果或异常。你可以根据需要检查它。等待剩下的未来在一些完成后也变得简单:

@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_msg(3, '3')
    future3 = delayed_msg(1, '1')

    futures = set([future1, future2, future3])
    while futures:
        resolved = yield Any(futures)
        end = time.time()
        print "finished in %.1f sec: %r" % (end - start, resolved.result())
        futures.remove(resolved)

如你所愿,这会打印:

finished in 1.0 sec: '1'
finished in 2.0 sec: '2'
finished in 3.0 sec: '3'

我们可以通过添加一个新的全局函数来测试异常处理的行为:

@gen.coroutine
def delayed_exc(seconds, msg):
    yield gen.Task(IOLoop.current().add_timeout,
                   time.time() + seconds)
    raise Exception(msg)

并且用它替代 delayed_msg:

@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_exc(3, '3')  # Exception!
    future3 = delayed_msg(1, '1')

    futures = set([future1, future2, future3])
    while futures:
        resolved = yield Any(futures)
        end = time.time()
        try:
            outcome = resolved.result()
        except Exception as e:
            outcome = e

        print "finished in %.1f sec: %r" % (end - start, outcome)
        futures.remove(resolved)

现在,脚本会先打印 "1",然后 "2",最后打印 "Exception('3',)"。

撰写回答