将gevent迁移到Tornado ioloop - 使用协程/生成器结构化代码

4 投票
1 回答
677 浏览
提问于 2025-04-18 18:29

我正在尝试把一些比较简单的 gevent 代码转换成使用 Tornado 的异步功能。下面的示例代码使用 ZMQ 库来进行一个非常简单的请求-响应。

import zmq.green as zmq

def fun():
    i = zmq.Context.instance()
    sock = i.socket(zmq.REQ)
    sock.connect('tcp://localhost:9005')
    sock.send('Ping')
    return sock.recv()

我可以在代码的任何地方运行这个 fun() 函数。.recv() 这个调用会在等待回复时阻塞,这时候 gevent 的中心可以安排代码的其他部分。当收到值时,函数就会返回这个值。

我读过一些关于这些隐式返回可能出现的问题的文章,想要使用 Tornado 的 IOLoop 来运行这个(也是因为我想在 IPython Notebook 中运行它)。下面是一个选项,其中 recv_future() 返回一个包含结果的 Future 对象:

@gen.coroutine
def fun():
    i = zmq.Context.instance()
    sock = i.socket(zmq.REQ)
    sock.connect('tcp://localhost:9005')
    sock.send('Ping')
    msg = yield recv_future(sock)
    print "Received {}".format(msg[0])
    raise gen.Return(msg)

def recv_future(socket):
    zmqstream = ZMQStream(socket)  # Required for ZMQ
    future = Future()
    def _finish(reply):
        future.set_result(reply)
    zmqstream.on_recv(_finish)
    return future

问题是现在 fun() 不是一个普通的函数,而是一个生成器。所以如果我需要从另一个函数调用它,就需要用 yield fun()。但这样调用的函数也变成了生成器!

使用 Python 生成器的代码应该怎么结构才合适?我是否需要把每个函数都变成生成器才能让它工作?如果我需要从 __init__() 调用其中一个函数,那这个函数也应该变成生成器吗?

1 个回答

1

如果我需要在__init__()里调用这些函数,那它们也应该变成生成器吗?

这是目前在使用yieldyield from进行显式异步编程时遇到的一个问题(适用于Python 3.3及以上版本)。魔法方法不支持这些特性。你可以在这里阅读一些Python核心开发者关于异步编程的有趣看法,里面提到了这个问题:这里

使用Python生成器的代码应该怎么结构?我必须把每个函数都变成生成器才能工作吗?并不是说每个函数都要这样做,但你想要调用的那些协程函数,确实需要变成生成器,并且在继续执行之前要等它们完成。当你切换到显式异步编程模型时,通常希望全面采用这个模型——你的整个程序都在tornado的ioloop里运行。所以,在这个简单的例子中,你只需要这样做:

from tornado.ioloop import IOLoop
from tornado.gen import coroutine
from tornado.concurrent import Future

@gen.coroutine
def fun():
    i = zmq.Context.instance()
    sock = i.socket(zmq.REQ)
    sock.connect('tcp://localhost:9005')
    sock.send('Ping')
    msg = yield recv_future(sock)
    print "Received {}".format(msg[0])
    raise gen.Return(msg)

def recv_future(socket):
    zmqstream = ZMQStream(socket)  # Required for ZMQ
    future = Future()
    def _finish(reply):
        future.set_result(reply)
    zmqstream.on_recv(_finish)
    return future

if __name__ == "__main__":
    ioloop = IOLoop.instance()
    ioloop.add_callback(fun)
    ioloop.start() # This will run fun, and then block forever.
    #ioloop.run_sync(fun) # This will start the ioloop, run fun, then stop the ioloop

看起来你可以通过IPython.kernel API访问IPython正在使用的ioloop:

In [4]: from IPython.kernel.ioloop import manager

In [5]: manager.ioloop.IOLoop.instance()
Out[5]: <zmq.eventloop.ioloop.ZMQIOLoop at 0x4249ac8>

撰写回答