Tornado中重任务的并发处理

1 投票
1 回答
1436 浏览
提问于 2025-04-18 14:25

我的代码:

import tornado.tcpserver
import tornado.ioloop
import itertools
import socket
import time

class Talk():
    def __init__(self, id):
        self.id = id

    @tornado.gen.coroutine
    def on_connect(self):
        try:
            while "connection alive":
                self.said = yield self.stream.read_until(b"\n") 

                response = yield tornado.gen.Task(self.task)     ### LINE 1

                yield self.stream.write(response)                   ### LINE 2

        except tornado.iostream.StreamClosedError:
            print('error: socket closed')
        return

    @tornado.gen.coroutine
    def task(self):
        if self.id == 1:
           time.sleep(3)  # sometimes request is heavy blocking
        return b"response"

    @tornado.gen.coroutine
    def on_disconnect(self):
        yield []


class Server(tornado.tcpserver.TCPServer):

    def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None):

        tornado.tcpserver.TCPServer.__init__(self,
            io_loop=io_loop,
            ssl_options=ssl_options,
            max_buffer_size=max_buffer_size)

        self.talk_id_alloc = itertools.count(1)
        return


    @tornado.gen.coroutine
    def handle_stream(self, stream, address):
        talk_id = next(self.talk_id_alloc)
        talk = Talk(talk_id)

        stream.set_close_callback(talk.on_disconnect)
        stream.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        stream.socket.setsockopt(socket.IPPROTO_TCP, socket.SO_KEEPALIVE, 1)

        talk.stream = stream

        yield talk.on_connect()

        return

Server().listen(8888)
tornado.ioloop.IOLoop.instance().start()

问题:

我需要一个像龙卷风(tornado)这样的TCP服务器——它看起来很适合处理很多请求,而且计算量小。

但是:

  99% of requests will last less than 0,05 sec, but
  1% of them can last even 3 sec (special cases).
  single response must be returned at once, not partially.

这里最好的方法是什么?怎么才能让第1行代码的执行时间不超过0.1秒呢?

 yield tornado.gen.with_timeout(
    datetime.timedelta(seconds=0.1), tornado.gen.Task(self.task))

对我来说,这个方法没用——什么都不做。

 tornado.ioloop.IOLoop.current().add_timeout(
      datetime.timedelta(seconds=0.1),
      lambda: result.set_exception(TimeoutError("Timeout"))) 

要么就是没反应。

我在寻找更好的解决方案:

  • 任务可以检测是否需要高计算量(比如API请求……)——可以用超时来判断吗?然后把它放到另一个线程甚至进程中去执行,并发送给龙卷风服务器一个异常信息——“稍后从结果队列中接收我”(生产者/消费者模式)。我不想让超时直接终止重计算任务而不保存结果,而且任务还要在特殊的包装中重新开启——所以生产者/消费者模式应该适用于所有任务吗?

  • 当当前的事件循环被阻塞时,添加一个新的事件循环——怎么检测到阻塞呢?

我在龙卷风中没有看到任何解决方案。

第1行的任务可能很简单(大约99%)或者很复杂,这可能需要:

 I/O:
 - disk/DB access
 - ram/redis access
 network:
 - API call
 CPU:
 - algorithms, regex

(最糟糕的任务会做以上所有事情)。我只有在开始执行任务时才能知道它的复杂程度(负载),所以适合在单独的线程中使用任务队列。我不想拖延简单或快速的任务。

1 个回答

0

如果你能取消那些耗时的任务,我建议你用超时的方法来取消它们,然后把这些任务放到另一个线程去执行。从性能上来说,这样做不是最理想的(因为有个叫GIL的东西),但这样可以防止tornado被阻塞,这才是你最终想要的效果。

关于如何做到这一点,有一篇不错的文章可以参考:http://lbolla.info/blog/2013/01/22/blocking-tornado

如果你想更进一步,可以使用像celery这样的工具,它可以让你把任务透明地转移到其他进程去执行,不过这样做会更复杂一些。

撰写回答