Tornado web服务器中协程的正确使用

1 投票
1 回答
823 浏览
提问于 2025-04-27 23:55

我正在尝试把一个简单的同步服务器转换成异步版本,这个服务器接收POST请求,并从一个外部网络服务(亚马逊SQS)获取响应。下面是同步代码:

def post(self):

    zoom_level = self.get_argument('zoom_level')
    neLat = self.get_argument('neLat')
    neLon = self.get_argument('neLon')
    swLat = self.get_argument('swLat')
    swLon = self.get_argument('swLon')
    data = self._create_request_message(zoom_level, neLat, neLon, swLat, swLon)

    self._send_parking_spots_request(data)

    #....other stuff

def _send_parking_spots_request(self, data):

    msg = Message()
    msg.set_body(json.dumps(data))
    self._sqs_send_queue.write(msg)

我阅读了Tornado的文档和一些相关讨论,最后写出了这段使用协程的代码:

def post(self):

    zoom_level = self.get_argument('zoom_level')
    neLat = self.get_argument('neLat')
    neLon = self.get_argument('neLon')
    swLat = self.get_argument('swLat')
    swLon = self.get_argument('swLon')
    data = self._create_request_message(zoom_level, neLat, neLon, swLat, swLon)
    self._send_parking_spots_request(data)
    self.finish()

@gen.coroutine
def _send_parking_spots_request(self, data):

    msg = Message()
    msg.set_body(json.dumps(data))
    yield gen.Task(write_msg, self._sqs_send_queue, msg)

def write_msg(queue, msg, callback=None):
    queue.write(msg)

通过使用siege来比较性能,我发现第二个版本的表现甚至比原来的还要差,所以可能是我对协程和Tornado的异步编程理解得不够透彻。你能帮我解决这个问题吗?

补充说明: self._sqs_send_queue 是从boto接口获取的一个队列对象,而 queue.write(msg) 则是返回已经写入队列的消息。

暂无标签

1 个回答

0

tornado 这个框架需要你把所有的输入输出操作都改成非阻塞的。如果你只是把之前的代码放进一个 gen.Task 里,性能是不会有任何提升的,因为输入输出操作还是会阻塞事件循环。此外,你还需要把你的 post 方法改成一个协程,并使用 yield 来调用 _send_parking_spots_requests,这样代码才能正常运行。所以,一个“正确”的解决方案大概是这样的:

@gen.coroutine
def post(self):
    ...
    yield self._send_parking_spots_request(data)  # wait (without blocking the event loop) until the method is done
    self.finish()

@gen.coroutine
def _send_parking_spots_request(self, data):

    msg = Message()
    msg.set_body(json.dumps(data))
    yield gen.Task(write_msg, self._sqs_send_queue, msg)

def write_msg(queue, msg, callback=None):
    yield queue.write(msg, callback=callback)  # This has to do non-blocking I/O.

在这个例子中,queue.write 需要是一个使用非阻塞输入输出发送请求的 API,并在收到响应时执行 callback。由于我不知道你原来的例子中的 queue 到底是什么,所以无法具体说明在你的情况下该如何实现。

补充:假设你在使用 boto,你可能想看看 bototornado,它实现了 我上面描述的完全相同的 API

def write(self, message, callback=None):
    """
    Add a single message to the queue.
    :type message: Message
    :param message: The message to be written to the queue
    :rtype: :class:`boto.sqs.message.Message`
    :return: The :class:`boto.sqs.message.Message` object that was written.

撰写回答