如何在Python Tornado服务器中有效使用多进程请求?

49 投票
3 回答
31812 浏览
提问于 2025-04-17 18:53

我正在使用一个非阻塞的Python服务器Tornado。我的一个类是处理GET请求的,这些请求可能需要比较长的时间才能完成(大约5到10秒)。问题是,Tornado在处理这些请求时会阻塞,导致后续的快速请求必须等到慢请求完成后才能继续。

我查看了这个链接:https://github.com/facebook/tornado/wiki/Threading-and-concurrency,得出的结论是我想要结合使用#3(其他进程)和#4(其他线程)。单独使用#4时遇到了一些问题,我无法在另一个线程进行“重负载”操作时可靠地将控制权交还给ioloop。(我猜这可能是由于全局解释器锁(GIL)和重负载任务占用高CPU导致的,但这只是我的猜测。)

因此,我一直在尝试通过在这些慢GET请求中使用单独的进程来处理“重负载”任务,然后在进程完成后将回调放回Tornado的ioloop中,以完成请求。这使得ioloop可以处理其他请求。

我创建了一个简单的示例来演示可能的解决方案,但我很想听听社区的反馈。

我的问题有两个方面:这个当前的方法如何简化?可能存在哪些陷阱?

方法

  1. 利用Tornado内置的asynchronous装饰器,这样请求可以保持打开状态,ioloop可以继续运行。

  2. 使用Python的multiprocessing模块为“重负载”任务创建一个单独的进程。我最开始尝试使用threading模块,但无法可靠地将控制权交还给ioloop。看起来multiprocessing还可以利用多核处理器。

  3. 在主ioloop进程中启动一个“监视者”线程,使用threading模块,它的工作是监视multiprocessing.Queue,以获取“重负载”任务完成后的结果。这是必要的,因为我需要一种方法来知道重负载任务已经完成,同时还能通知ioloop这个请求现在已经结束。

  4. 确保“监视者”线程经常使用time.sleep(0)调用将控制权交还给主ioloop,以便其他请求能够继续处理。

  5. 当队列中有结果时,从“监视者”线程添加一个回调,使用tornado.ioloop.IOLoop.instance().add_callback(),这是文档中说明的唯一安全的从其他线程调用ioloop实例的方法。

  6. 然后确保在回调中调用finish()以完成请求并返回回复。

下面是一些示例代码,展示了这种方法。multi_tornado.py是实现上述大纲的服务器,call_multi.py是一个示例脚本,用于以两种不同的方式调用服务器以进行测试。两个测试都使用3个慢GET请求,后面跟着20个快速GET请求。结果显示了在开启和未开启线程的情况下的表现。

在没有开启线程的情况下,3个慢请求会阻塞(每个请求大约需要1秒多一点才能完成)。在一些慢请求之间,有几个快速请求挤了进来(不太确定这是怎么发生的——可能是因为我在同一台机器上同时运行服务器和客户端测试脚本)。这里的重点是,所有快速请求都在不同程度上被阻塞。

而在开启线程的情况下,20个快速请求会立即全部完成,3个慢请求则大约在同一时间完成,因为它们是并行运行的。这是我们想要的效果。3个慢请求并行完成需要2.5秒,而在没有线程的情况下,这3个慢请求总共需要大约3.5秒。因此,整体上大约提高了35%的速度(我猜是因为多核共享)。但更重要的是,快速请求立即得到了处理,而不是被慢请求拖延。

我对多线程编程没有太多经验——所以虽然这个方法似乎有效,但我很想知道:

有没有更简单的方法来实现这个?这个方法可能隐藏着哪些问题?

(注意:未来的一个折中方案可能是运行更多的Tornado实例,并使用像nginx这样的反向代理进行负载均衡。不管怎样,我都会运行多个实例并使用负载均衡器,但我担心仅仅通过增加硬件来解决这个问题,因为这似乎与阻塞问题直接相关。)

示例代码

multi_tornado.py(示例服务器):

import time
import threading
import multiprocessing
import math

from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop


# run in some other process - put result in q
def heavy_lifting(q):
    t0 = time.time()
    for k in range(2000):
        math.factorial(k)

    t = time.time()
    q.put(t - t0)  # report time to compute in queue


class FastHandler(RequestHandler):
    def get(self):
        res = 'fast result ' + self.get_argument('id')
        print res
        self.write(res)
        self.flush()


class MultiThreadedHandler(RequestHandler):
    # Note:  This handler can be called with threaded = True or False
    def initialize(self, threaded=True):
        self._threaded = threaded
        self._q = multiprocessing.Queue()

    def start_process(self, worker, callback):
        # method to start process and watcher thread
        self._callback = callback

        if self._threaded:
            # launch process
            multiprocessing.Process(target=worker, args=(self._q,)).start()

            # start watching for process to finish
            threading.Thread(target=self._watcher).start()

        else:
            # threaded = False just call directly and block
            worker(self._q)
            self._watcher()

    def _watcher(self):
        # watches the queue for process result
        while self._q.empty():
            time.sleep(0)  # relinquish control if not ready

        # put callback back into the ioloop so we can finish request
        response = self._q.get(False)
        IOLoop.instance().add_callback(lambda: self._callback(response))


class SlowHandler(MultiThreadedHandler):
    @asynchronous
    def get(self):
        # start a thread to watch for
        self.start_process(heavy_lifting, self._on_response)

    def _on_response(self, delta):
        _id = self.get_argument('id')
        res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
        print res
        self.write(res)
        self.flush()
        self.finish()   # be sure to finish request


application = Application([
    (r"/fast", FastHandler),
    (r"/slow", SlowHandler, dict(threaded=False)),
    (r"/slow_threaded", SlowHandler, dict(threaded=True)),
])


if __name__ == "__main__":
    application.listen(8888)
    IOLoop.instance().start()

call_multi.py(客户端测试器):

import sys
from tornado.ioloop import IOLoop
from tornado import httpclient


def run(slow):
    def show_response(res):
        print res.body

    # make 3 "slow" requests on server
    requests = []
    for k in xrange(3):
        uri = 'http://localhost:8888/{}?id={}'
        requests.append(uri.format(slow, str(k + 1)))

    # followed by 20 "fast" requests
    for k in xrange(20):
        uri = 'http://localhost:8888/fast?id={}'
        requests.append(uri.format(k + 1))

    # show results as they return
    http_client = httpclient.AsyncHTTPClient()

    print 'Scheduling Get Requests:'
    print '------------------------'
    for req in requests:
        print req
        http_client.fetch(req, show_response)

    # execute requests on server
    print '\nStart sending requests....'
    IOLoop.instance().start()

if __name__ == '__main__':
    scenario = sys.argv[1]

    if scenario == 'slow' or scenario == 'slow_threaded':
        run(scenario)

测试结果

通过运行python call_multi.py slow(阻塞行为):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20

通过运行python call_multi.py slow_threaded(期望的行为):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s

3 个回答

1

如果你的获取请求花费的时间这么长,那就说明tornado这个框架不太适合你。

我建议你使用nginx来把快速的请求引导到tornado,而把慢一些的请求发送到其他服务器。

PeterBe有一篇有趣的文章,他运行了多个Tornado服务器,并把其中一个设置为“慢服务器”,专门处理那些需要较长时间的请求。你可以看看这篇文章:worrying-about-io-blocking,我觉得这个方法值得一试。

17

multiprocessing.Pool可以和tornado的I/O循环结合使用,但有点麻烦。用concurrent.futures来做会更简单一些(详细信息可以参考我之前的回答),不过如果你还在用Python 2.x,无法安装concurrent.futures的后备版本,这里有个方法可以只用multiprocessing来实现:

multiprocessing.Pool.apply_asyncmultiprocessing.Pool.map_async这两个方法都有一个可选的callback参数,这意味着它们都可以和tornado.gen.Task结合使用。所以在大多数情况下,异步运行子进程中的代码其实很简单:

import multiprocessing
import contextlib

from tornado import gen
from tornado.gen import Return
from tornado.ioloop import IOLoop
from functools import partial

def worker():
    print "async work here"

@gen.coroutine
def async_run(func, *args, **kwargs):
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    raise Return(result)

if __name__ == "__main__":
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    func = partial(async_run, worker)
    IOLoop().run_sync(func)

正如我提到的,这在大多数情况下都能很好地工作。但是,如果worker()抛出异常,callback就不会被调用,这样gen.Task就永远不会结束,你的程序就会卡住。如果你知道你的工作绝对不会抛出异常(比如你把整个过程放在了try/except中),那么你可以放心地使用这种方法。然而,如果你想让异常从你的工作中抛出,我找到的唯一解决方案是对一些multiprocessing的组件进行子类化,让它们即使在工作子进程抛出异常时也能调用callback

from multiprocessing.pool import ApplyResult, Pool, RUN
import multiprocessing
class TornadoApplyResult(ApplyResult):
    def _set(self, i, obj):
        self._success, self._value = obj 
        if self._callback:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        assert self._state == RUN
        result = TornadoApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result
 ...

 if __name__ == "__main__":
     pool = TornadoPool(multiprocessing.cpu_count())
     ...

通过这些修改,异常对象将会被gen.Task返回,而不是让gen.Task一直卡住。我还更新了我的async_run方法,使其在返回时重新抛出异常,并做了一些其他修改,以提供更好的异常追踪信息。以下是完整的代码:

import multiprocessing
from multiprocessing.pool import Pool, ApplyResult, RUN
from functools import wraps

import tornado.web
from tornado.ioloop import IOLoop
from tornado.gen import Return
from tornado import gen

class WrapException(Exception):
    def __init__(self):
        exc_type, exc_value, exc_tb = sys.exc_info()
        self.exception = exc_value
        self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))

    def __str__(self):
        return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)

class TornadoApplyResult(ApplyResult):
    def _set(self, i, obj):
        self._success, self._value = obj 
        if self._callback:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]   

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        assert self._state == RUN
        result = TornadoApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result

@gen.coroutine
def async_run(func, *args, **kwargs):
    """ Runs the given function in a subprocess.

    This wraps the given function in a gen.Task and runs it
    in a multiprocessing.Pool. It is meant to be used as a
    Tornado co-routine. Note that if func returns an Exception 
    (or an Exception sub-class), this function will raise the 
    Exception, rather than return it.

    """
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    if isinstance(result, Exception):
        raise result
    raise Return(result)

def handle_exceptions(func):
    """ Raise a WrapException so we get a more meaningful traceback"""
    @wraps(func)
    def inner(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception:
            raise WrapException()
    return inner

# Test worker functions
@handle_exceptions
def test2(x):
    raise Exception("eeee")

@handle_exceptions
def test(x):
    print x
    time.sleep(2)
    return "done"

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        try:
            result = yield async_run(test, "inside get")
            self.write("%s\n" % result)
            result = yield async_run(test2, "hi2")
        except Exception as e:
            print("caught exception in get")
            self.write("Caught an exception: %s" % e)
        finally:
            self.finish()

app = tornado.web.Application([
    (r"/test", TestHandler),
])

if __name__ == "__main__":
    pool = TornadoPool(4)
    app.listen(8888)
    IOLoop.instance().start()

这是客户端的行为:

dan@dan:~$ curl localhost:8888/test
done
Caught an exception: 

Original traceback:
Traceback (most recent call last):
  File "./mutli.py", line 123, in inner
    return func(*args, **kwargs)
  File "./mutli.py", line 131, in test2
    raise Exception("eeee")
Exception: eeee

如果我同时发送两个curl请求,我们可以看到它们在服务器端是异步处理的:

dan@dan:~$ ./mutli.py 
inside get
inside get
caught exception inside get
caught exception inside get

编辑:

需要注意的是,这段代码在Python 3中变得更简单,因为它为所有异步的multiprocessing.Pool方法引入了一个error_callback关键字参数。这使得与Tornado的结合变得更加容易:

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        super().apply_async(func, args, kwds, callback=callback,
                            error_callback=callback)

@gen.coroutine
def async_run(func, *args, **kwargs):
    """ Runs the given function in a subprocess.

    This wraps the given function in a gen.Task and runs it
    in a multiprocessing.Pool. It is meant to be used as a
    Tornado co-routine. Note that if func returns an Exception
    (or an Exception sub-class), this function will raise the
    Exception, rather than return it.

    """
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    raise Return(result)

我们在重写的apply_async中只需要在调用父类时添加error_callback关键字参数,除了callback参数外。无需重写ApplyResult

我们甚至可以通过在TornadoPool中使用元类,让它的*_async方法可以像协程一样直接调用,来让代码更高级:

import time
from functools import wraps
from multiprocessing.pool import Pool

import tornado.web
from tornado import gen
from tornado.gen import Return
from tornado import stack_context
from tornado.ioloop import IOLoop
from tornado.concurrent import Future

def _argument_adapter(callback):
    def wrapper(*args, **kwargs):
        if kwargs or len(args) > 1:
            callback(Arguments(args, kwargs))
        elif args:
            callback(args[0])
        else:
            callback(None)
    return wrapper

def PoolTask(func, *args, **kwargs):
    """ Task function for use with multiprocessing.Pool methods.

    This is very similar to tornado.gen.Task, except it sets the
    error_callback kwarg in addition to the callback kwarg. This
    way exceptions raised in pool worker methods get raised in the
    parent when the Task is yielded from.

    """
    future = Future()
    def handle_exception(typ, value, tb):
        if future.done():
            return False
        future.set_exc_info((typ, value, tb))
        return True
    def set_result(result):
        if future.done():
            return
        if isinstance(result, Exception):
            future.set_exception(result)
        else:
            future.set_result(result)
    with stack_context.ExceptionStackContext(handle_exception):
        cb = _argument_adapter(set_result)
        func(*args, callback=cb, error_callback=cb)
    return future

def coro_runner(func):
    """ Wraps the given func in a PoolTask and returns it. """
    @wraps(func)
    def wrapper(*args, **kwargs):
        return PoolTask(func, *args, **kwargs)
    return wrapper

class MetaPool(type):
    """ Wrap all *_async methods in Pool with coro_runner. """
    def __new__(cls, clsname, bases, dct):
        pdct = bases[0].__dict__
        for attr in pdct:
            if attr.endswith("async") and not attr.startswith('_'):
                setattr(bases[0], attr, coro_runner(pdct[attr]))
        return super().__new__(cls, clsname, bases, dct)

class TornadoPool(Pool, metaclass=MetaPool):
    pass

# Test worker functions
def test2(x):
    print("hi2")
    raise Exception("eeee")

def test(x):
    print(x)
    time.sleep(2)
    return "done"

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        try:
            result = yield pool.apply_async(test, ("inside get",))
            self.write("%s\n" % result)
            result = yield pool.apply_async(test2, ("hi2",))
            self.write("%s\n" % result)
        except Exception as e:
            print("caught exception in get")
            self.write("Caught an exception: %s" % e)
            raise
        finally:
            self.finish()

app = tornado.web.Application([
    (r"/test", TestHandler),
])

if __name__ == "__main__":
    pool = TornadoPool()
    app.listen(8888)
    IOLoop.instance().start()
32

如果你愿意使用 concurrent.futures.ProcessPoolExecutor 而不是 multiprocessing,其实这非常简单。Tornado 的 ioloop 已经支持 concurrent.futures.Future,所以它们可以很顺利地一起工作。concurrent.futures 从 Python 3.2 版本开始就已经包含在内,而且 也可以在 Python 2.x 中使用

这里有一个例子:

import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen

def f(a, b, c, blah=None):
    print "got %s %s %s and %s" % (a, b, c, blah)
    time.sleep(5)
    return "hey there"

@gen.coroutine
def test_it():
    pool = ProcessPoolExecutor(max_workers=1)
    fut = pool.submit(f, 1, 2, 3, blah="ok")  # This returns a concurrent.futures.Future
    print("running it asynchronously")
    ret = yield fut
    print("it returned %s" % ret)
    pool.shutdown()

IOLoop.instance().run_sync(test_it)

输出结果:

running it asynchronously
got 1 2 3 and ok
it returned hey there

ProcessPoolExecutor 的功能比 multiprocessing.Pool 要少一些,但如果你不需要 multiprocessing.Pool 的那些高级功能,使用它会更好,因为它的整合更简单。

撰写回答