使用Tornado Python的简单异步示例

10 投票
4 回答
22977 浏览
提问于 2025-04-18 01:45

我想找一个简单的异步服务器示例。
我有一些函数,其中有很多等待、数据库操作等等:

def blocking_task(n):
    for i in xrange(n):
        print i
        sleep(1)
    return i

我需要在一个单独的进程中运行这个函数,而不让其他操作被阻塞。这可能吗?

4 个回答

2

Python 3.5 引入了 asyncawait 这两个关键词(使用这些关键词的函数也叫“原生协程”)。为了兼容旧版本的 Python,你可以使用“装饰器”或“基于 yield 的”协程,这需要用到 tornado.gen.coroutine 装饰器。

在可能的情况下,推荐使用原生协程。只有在需要兼容旧版本 Python 时,才使用装饰器协程。Tornado 文档中的示例通常会使用原生形式。

这两种形式之间的转换通常很简单:

# Decorated:                    # Native:

# Normal function declaration
# with decorator                # "async def" keywords
@gen.coroutine
def a():                        async def a():
    # "yield" all async funcs       # "await" all async funcs
    b = yield c()                   b = await c()
    # "return" and "yield"
    # cannot be mixed in
    # Python 2, so raise a
    # special exception.            # Return normally
    raise gen.Return(b)             return b

下面列出了这两种协程形式的其他区别。

  • 原生协程:

    • 通常运行得更快。
    • 可以使用 async forasync with 语句,这样可以让某些模式变得更简单。
    • 除非你使用 awaityield 来调用它们,否则它们根本不会运行。装饰器协程在被调用时可以“在后台”开始运行。需要注意的是,对于这两种协程,使用 awaityield 是很重要的,这样任何异常才有地方去处理。
  • 装饰器协程:

    • concurrent.futures 包有额外的集成,允许直接返回 executor.submit 的结果。对于原生协程,可以使用 IOLoop.run_in_executor 来实现。
    • 支持通过返回一个列表或字典来简化等待多个对象的操作。要在原生协程中实现这一点,可以使用 tornado.gen.multi
    • 可以与其他包(包括 Twisted)集成,通过转换函数的注册表来实现。要在原生协程中访问这个功能,可以使用 tornado.gen.convert_yielded
    • 总是返回一个 Future 对象。原生协程返回的是一个可以等待的对象,但不是 Future。在 Tornado 中,这两者大多数情况下可以互换使用。

值得一看:

3

在这里,我更新了关于Tornado 5.0的信息。Tornado 5.0新增了一个方法IOLoop.run_in_executor。在协程模式章节的“调用阻塞函数”部分提到:

从协程中调用阻塞函数的最简单方法是使用IOLoop.run_in_executor,这个方法会返回与协程兼容的Future对象:

@gen.coroutine def call_blocking(): yield IOLoop.current().run_in_executor(blocking_func, args)

另外,在run_on_executor的文档中提到:

这个装饰器不应该和名字相似的IOLoop.run_in_executor混淆。一般来说,调用阻塞方法时,推荐使用run_in_executor,而不是在定义方法时使用这个装饰器。如果需要与旧版本的Tornado兼容,可以考虑定义一个执行器并在调用时使用executor.submit()。

在5.0版本中,推荐在调用阻塞函数的情况下使用IOLoop.run_in_executor。

7
import tornado.web
from tornado.ioloop import IOLoop
from tornado import gen

from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor   # `pip install futures` for python2

MAX_WORKERS = 16

class TestHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

    """
    In below function goes your time consuming task
    """

    @run_on_executor
    def background_task(self):
        sm = 0
        for i in range(10 ** 8):
            sm = sm + 1

        return sm

    @tornado.gen.coroutine
    def get(self):
        """ Request that asynchronously calls background task. """
        res = yield self.background_task()
        self.write(str(res))

class TestHandler2(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        self.write('Response from server')
        self.finish()


application = tornado.web.Application([
    (r"/A", TestHandler),
    (r"/B", TestHandler2),
    ])

application.listen(5000)
IOLoop.instance().start()

当你运行上面的代码时,你可以在 http://127.0.0.1:5000/A 这个地址进行一些计算量大的操作,而这个操作不会阻塞其他事情的执行。你可以在访问完 http://127.0.0.1:5000/A 后,立刻去访问 http://127.0.0.1:5000/B,这说明其他操作仍然可以继续进行。

18

Tornado是为了让所有操作在一个线程中运行而设计的,但它使用异步输入输出(I/O)来尽量避免阻塞。如果你使用的数据库有异步的Python接口(最好是专门为Tornado设计的,比如MongoDB的Motor或Postgres的momoko),那么你就可以在不阻塞服务器的情况下运行数据库查询,不需要额外的进程或线程。

针对你提到的具体例子,比如调用了time.sleep(1),你可以通过Tornado的协程以异步的方式来实现:

#!/usr/bin/python

import tornado.web
from tornado.ioloop import IOLoop
from tornado import gen 
import time

@gen.coroutine
def async_sleep(seconds):
    yield gen.Task(IOLoop.instance().add_timeout, time.time() + seconds)

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        for i in xrange(100):
            print i
            yield async_sleep(1)
        self.write(str(i))
        self.finish()


application = tornado.web.Application([
    (r"/test", TestHandler),
    ])  

application.listen(9999)
IOLoop.instance().start()

这里面有个有趣的部分是async_sleep。这个方法创建了一个异步任务,它调用了ioloop.add_timeout方法。add_timeout会在指定的秒数后运行一个回调,而在等待超时的过程中不会阻塞ioloop。它需要两个参数:

add_timeout(deadline, callback) # deadline is the number of seconds to wait, callback is the method to call after deadline.

在上面的例子中,我们实际上只给add_timeout提供了一个参数,这意味着我们最终得到了这个:

add_timeout(time.time() + seconds, ???)

我们没有提供预期的回调参数。实际上,当gen.Task执行add_timeout时,它会在显式提供的参数后面添加一个callback关键字参数。所以这段代码:

yield gen.Task(loop.add_timeout, time.time() + seconds)

会在gen.Task()内部执行:

loop.add_timeout(time.time() + seconds, callback=gen.Callback(some_unique_key))

当超时后执行gen.Callback时,它会表示gen.Task已经完成,程序的执行会继续到下一行。这种流程一开始可能不太容易完全理解(我第一次看到的时候也觉得很难)。多读几遍Tornado gen模块的文档可能会有帮助。

撰写回答