如何在Python Tornado服务器中有效使用多进程请求?
我正在使用一个非阻塞的Python服务器Tornado。我的一个类是处理GET
请求的,这些请求可能需要比较长的时间才能完成(大约5到10秒)。问题是,Tornado在处理这些请求时会阻塞,导致后续的快速请求必须等到慢请求完成后才能继续。
我查看了这个链接:https://github.com/facebook/tornado/wiki/Threading-and-concurrency,得出的结论是我想要结合使用#3(其他进程)和#4(其他线程)。单独使用#4时遇到了一些问题,我无法在另一个线程进行“重负载”操作时可靠地将控制权交还给ioloop。(我猜这可能是由于全局解释器锁(GIL)和重负载任务占用高CPU导致的,但这只是我的猜测。)
因此,我一直在尝试通过在这些慢GET
请求中使用单独的进程来处理“重负载”任务,然后在进程完成后将回调放回Tornado的ioloop中,以完成请求。这使得ioloop可以处理其他请求。
我创建了一个简单的示例来演示可能的解决方案,但我很想听听社区的反馈。
我的问题有两个方面:这个当前的方法如何简化?可能存在哪些陷阱?
方法
利用Tornado内置的
asynchronous
装饰器,这样请求可以保持打开状态,ioloop可以继续运行。使用Python的
multiprocessing
模块为“重负载”任务创建一个单独的进程。我最开始尝试使用threading
模块,但无法可靠地将控制权交还给ioloop。看起来multiprocessing
还可以利用多核处理器。在主ioloop进程中启动一个“监视者”线程,使用
threading
模块,它的工作是监视multiprocessing.Queue
,以获取“重负载”任务完成后的结果。这是必要的,因为我需要一种方法来知道重负载任务已经完成,同时还能通知ioloop这个请求现在已经结束。确保“监视者”线程经常使用
time.sleep(0)
调用将控制权交还给主ioloop,以便其他请求能够继续处理。当队列中有结果时,从“监视者”线程添加一个回调,使用
tornado.ioloop.IOLoop.instance().add_callback()
,这是文档中说明的唯一安全的从其他线程调用ioloop实例的方法。然后确保在回调中调用
finish()
以完成请求并返回回复。
下面是一些示例代码,展示了这种方法。multi_tornado.py
是实现上述大纲的服务器,call_multi.py
是一个示例脚本,用于以两种不同的方式调用服务器以进行测试。两个测试都使用3个慢GET
请求,后面跟着20个快速GET
请求。结果显示了在开启和未开启线程的情况下的表现。
在没有开启线程的情况下,3个慢请求会阻塞(每个请求大约需要1秒多一点才能完成)。在一些慢请求之间,有几个快速请求挤了进来(不太确定这是怎么发生的——可能是因为我在同一台机器上同时运行服务器和客户端测试脚本)。这里的重点是,所有快速请求都在不同程度上被阻塞。
而在开启线程的情况下,20个快速请求会立即全部完成,3个慢请求则大约在同一时间完成,因为它们是并行运行的。这是我们想要的效果。3个慢请求并行完成需要2.5秒,而在没有线程的情况下,这3个慢请求总共需要大约3.5秒。因此,整体上大约提高了35%的速度(我猜是因为多核共享)。但更重要的是,快速请求立即得到了处理,而不是被慢请求拖延。
我对多线程编程没有太多经验——所以虽然这个方法似乎有效,但我很想知道:
有没有更简单的方法来实现这个?这个方法可能隐藏着哪些问题?
(注意:未来的一个折中方案可能是运行更多的Tornado实例,并使用像nginx这样的反向代理进行负载均衡。不管怎样,我都会运行多个实例并使用负载均衡器,但我担心仅仅通过增加硬件来解决这个问题,因为这似乎与阻塞问题直接相关。)
示例代码
multi_tornado.py
(示例服务器):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(客户端测试器):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
测试结果
通过运行python call_multi.py slow
(阻塞行为):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
通过运行python call_multi.py slow_threaded
(期望的行为):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
3 个回答
如果你的获取请求花费的时间这么长,那就说明tornado这个框架不太适合你。
我建议你使用nginx来把快速的请求引导到tornado,而把慢一些的请求发送到其他服务器。
PeterBe有一篇有趣的文章,他运行了多个Tornado服务器,并把其中一个设置为“慢服务器”,专门处理那些需要较长时间的请求。你可以看看这篇文章:worrying-about-io-blocking,我觉得这个方法值得一试。
multiprocessing.Pool
可以和tornado
的I/O循环结合使用,但有点麻烦。用concurrent.futures
来做会更简单一些(详细信息可以参考我之前的回答),不过如果你还在用Python 2.x,无法安装concurrent.futures
的后备版本,这里有个方法可以只用multiprocessing
来实现:
multiprocessing.Pool.apply_async
和multiprocessing.Pool.map_async
这两个方法都有一个可选的callback
参数,这意味着它们都可以和tornado.gen.Task
结合使用。所以在大多数情况下,异步运行子进程中的代码其实很简单:
import multiprocessing
import contextlib
from tornado import gen
from tornado.gen import Return
from tornado.ioloop import IOLoop
from functools import partial
def worker():
print "async work here"
@gen.coroutine
def async_run(func, *args, **kwargs):
result = yield gen.Task(pool.apply_async, func, args, kwargs)
raise Return(result)
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count())
func = partial(async_run, worker)
IOLoop().run_sync(func)
正如我提到的,这在大多数情况下都能很好地工作。但是,如果worker()
抛出异常,callback
就不会被调用,这样gen.Task
就永远不会结束,你的程序就会卡住。如果你知道你的工作绝对不会抛出异常(比如你把整个过程放在了try
/except
中),那么你可以放心地使用这种方法。然而,如果你想让异常从你的工作中抛出,我找到的唯一解决方案是对一些multiprocessing的组件进行子类化,让它们即使在工作子进程抛出异常时也能调用callback
:
from multiprocessing.pool import ApplyResult, Pool, RUN
import multiprocessing
class TornadoApplyResult(ApplyResult):
def _set(self, i, obj):
self._success, self._value = obj
if self._callback:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
'''
assert self._state == RUN
result = TornadoApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
...
if __name__ == "__main__":
pool = TornadoPool(multiprocessing.cpu_count())
...
通过这些修改,异常对象将会被gen.Task
返回,而不是让gen.Task
一直卡住。我还更新了我的async_run
方法,使其在返回时重新抛出异常,并做了一些其他修改,以提供更好的异常追踪信息。以下是完整的代码:
import multiprocessing
from multiprocessing.pool import Pool, ApplyResult, RUN
from functools import wraps
import tornado.web
from tornado.ioloop import IOLoop
from tornado.gen import Return
from tornado import gen
class WrapException(Exception):
def __init__(self):
exc_type, exc_value, exc_tb = sys.exc_info()
self.exception = exc_value
self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
def __str__(self):
return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)
class TornadoApplyResult(ApplyResult):
def _set(self, i, obj):
self._success, self._value = obj
if self._callback:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
'''
assert self._state == RUN
result = TornadoApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
@gen.coroutine
def async_run(func, *args, **kwargs):
""" Runs the given function in a subprocess.
This wraps the given function in a gen.Task and runs it
in a multiprocessing.Pool. It is meant to be used as a
Tornado co-routine. Note that if func returns an Exception
(or an Exception sub-class), this function will raise the
Exception, rather than return it.
"""
result = yield gen.Task(pool.apply_async, func, args, kwargs)
if isinstance(result, Exception):
raise result
raise Return(result)
def handle_exceptions(func):
""" Raise a WrapException so we get a more meaningful traceback"""
@wraps(func)
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
raise WrapException()
return inner
# Test worker functions
@handle_exceptions
def test2(x):
raise Exception("eeee")
@handle_exceptions
def test(x):
print x
time.sleep(2)
return "done"
class TestHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
try:
result = yield async_run(test, "inside get")
self.write("%s\n" % result)
result = yield async_run(test2, "hi2")
except Exception as e:
print("caught exception in get")
self.write("Caught an exception: %s" % e)
finally:
self.finish()
app = tornado.web.Application([
(r"/test", TestHandler),
])
if __name__ == "__main__":
pool = TornadoPool(4)
app.listen(8888)
IOLoop.instance().start()
这是客户端的行为:
dan@dan:~$ curl localhost:8888/test
done
Caught an exception:
Original traceback:
Traceback (most recent call last):
File "./mutli.py", line 123, in inner
return func(*args, **kwargs)
File "./mutli.py", line 131, in test2
raise Exception("eeee")
Exception: eeee
如果我同时发送两个curl请求,我们可以看到它们在服务器端是异步处理的:
dan@dan:~$ ./mutli.py
inside get
inside get
caught exception inside get
caught exception inside get
编辑:
需要注意的是,这段代码在Python 3中变得更简单,因为它为所有异步的multiprocessing.Pool
方法引入了一个error_callback
关键字参数。这使得与Tornado的结合变得更加容易:
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
'''
super().apply_async(func, args, kwds, callback=callback,
error_callback=callback)
@gen.coroutine
def async_run(func, *args, **kwargs):
""" Runs the given function in a subprocess.
This wraps the given function in a gen.Task and runs it
in a multiprocessing.Pool. It is meant to be used as a
Tornado co-routine. Note that if func returns an Exception
(or an Exception sub-class), this function will raise the
Exception, rather than return it.
"""
result = yield gen.Task(pool.apply_async, func, args, kwargs)
raise Return(result)
我们在重写的apply_async
中只需要在调用父类时添加error_callback
关键字参数,除了callback
参数外。无需重写ApplyResult
。
我们甚至可以通过在TornadoPool
中使用元类,让它的*_async
方法可以像协程一样直接调用,来让代码更高级:
import time
from functools import wraps
from multiprocessing.pool import Pool
import tornado.web
from tornado import gen
from tornado.gen import Return
from tornado import stack_context
from tornado.ioloop import IOLoop
from tornado.concurrent import Future
def _argument_adapter(callback):
def wrapper(*args, **kwargs):
if kwargs or len(args) > 1:
callback(Arguments(args, kwargs))
elif args:
callback(args[0])
else:
callback(None)
return wrapper
def PoolTask(func, *args, **kwargs):
""" Task function for use with multiprocessing.Pool methods.
This is very similar to tornado.gen.Task, except it sets the
error_callback kwarg in addition to the callback kwarg. This
way exceptions raised in pool worker methods get raised in the
parent when the Task is yielded from.
"""
future = Future()
def handle_exception(typ, value, tb):
if future.done():
return False
future.set_exc_info((typ, value, tb))
return True
def set_result(result):
if future.done():
return
if isinstance(result, Exception):
future.set_exception(result)
else:
future.set_result(result)
with stack_context.ExceptionStackContext(handle_exception):
cb = _argument_adapter(set_result)
func(*args, callback=cb, error_callback=cb)
return future
def coro_runner(func):
""" Wraps the given func in a PoolTask and returns it. """
@wraps(func)
def wrapper(*args, **kwargs):
return PoolTask(func, *args, **kwargs)
return wrapper
class MetaPool(type):
""" Wrap all *_async methods in Pool with coro_runner. """
def __new__(cls, clsname, bases, dct):
pdct = bases[0].__dict__
for attr in pdct:
if attr.endswith("async") and not attr.startswith('_'):
setattr(bases[0], attr, coro_runner(pdct[attr]))
return super().__new__(cls, clsname, bases, dct)
class TornadoPool(Pool, metaclass=MetaPool):
pass
# Test worker functions
def test2(x):
print("hi2")
raise Exception("eeee")
def test(x):
print(x)
time.sleep(2)
return "done"
class TestHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
try:
result = yield pool.apply_async(test, ("inside get",))
self.write("%s\n" % result)
result = yield pool.apply_async(test2, ("hi2",))
self.write("%s\n" % result)
except Exception as e:
print("caught exception in get")
self.write("Caught an exception: %s" % e)
raise
finally:
self.finish()
app = tornado.web.Application([
(r"/test", TestHandler),
])
if __name__ == "__main__":
pool = TornadoPool()
app.listen(8888)
IOLoop.instance().start()
如果你愿意使用 concurrent.futures.ProcessPoolExecutor
而不是 multiprocessing
,其实这非常简单。Tornado 的 ioloop 已经支持 concurrent.futures.Future
,所以它们可以很顺利地一起工作。concurrent.futures
从 Python 3.2 版本开始就已经包含在内,而且 也可以在 Python 2.x 中使用。
这里有一个例子:
import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen
def f(a, b, c, blah=None):
print "got %s %s %s and %s" % (a, b, c, blah)
time.sleep(5)
return "hey there"
@gen.coroutine
def test_it():
pool = ProcessPoolExecutor(max_workers=1)
fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future
print("running it asynchronously")
ret = yield fut
print("it returned %s" % ret)
pool.shutdown()
IOLoop.instance().run_sync(test_it)
输出结果:
running it asynchronously
got 1 2 3 and ok
it returned hey there
ProcessPoolExecutor
的功能比 multiprocessing.Pool
要少一些,但如果你不需要 multiprocessing.Pool
的那些高级功能,使用它会更好,因为它的整合更简单。