使用Tornado和Pika进行异步队列监控
我有一个AMQP服务器(RabbitMQ),我想在一个Tornado网页服务器中同时进行发布和读取。为此,我打算使用一个异步的amqp Python库,特别是Pika(这个版本据说支持Tornado)。
我写的代码看起来可以成功从队列中读取数据,但在请求结束时,我遇到了一个异常(浏览器返回的结果是正常的):
[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
Traceback (most recent call last):
File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
yield
File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
yield
File "/usr/lib/python2.6/contextlib.py", line 113, in nested
yield vars
File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
callback(*args, **kwargs)
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
self._handle_read()
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
self.on_data_available(chunk)
File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
self.channels[frame.channel_number].frame_handler(frame)
KeyError: 1
我不太确定自己是否正确使用了这个库,可能我做错了什么。我的代码基本流程是:
- 请求到达
- 使用TornadoConnection创建与RabbitMQ的连接,并指定一个回调函数
- 在连接的回调中,创建一个通道,声明/绑定我的队列,并调用basic_consume;同时指定一个回调函数
- 在消费的回调中,关闭通道并调用Tornado的finish函数。
- 看到异常。
我有几个问题:
- 这个流程正确吗?我不太明白连接回调的作用,除了不使用它就无法工作。
- 我应该为每个网页请求创建一个AMQP连接吗?RabbitMQ的文档建议不应该,而是应该只创建通道。但我应该在哪里创建连接,如果连接短暂中断,我该如何尝试重新连接?
- 如果我为每个网页请求创建一个AMQP连接,我应该在哪里关闭它?在我的回调中调用amqp.close()似乎会让事情变得更糟。
我稍后会尝试提供一些示例代码,但我上面描述的步骤已经相当完整地展示了消费的部分。我在发布的部分也遇到了一些问题,但消费队列的事情更紧急。
2 个回答
有人在这里报告说成功地把Tornado和Pika合并在一起了。根据我的理解,这并不是简单地在Tornado中调用Pika就能解决的,因为这两个库都想要各自控制自己的事件循环。
看到一些源代码会更有帮助,但我在多个生产项目中使用这个支持tornado的pika模块,没有遇到任何问题。
你不想为每个请求都创建一个连接。应该创建一个类,把所有的AMQP操作封装起来,并在tornado应用程序层面将其作为单例实例化,这样可以在多个请求(和请求处理器)之间使用。我在一个叫'runapp()'的函数中这样做,函数里做了一些事情,然后启动主tornado的事件循环。
这里有一个叫'Events'的类。这是一个部分实现(具体来说,我没有在这里定义'self.handle_event',这个你可以自己决定)。
class Event(object):
def __init__(self, config):
self.host = 'localhost'
self.port = '5672'
self.vhost = '/'
self.user = 'foo'
self.exchange = 'myx'
self.queue = 'myq'
self.recv_routing_key = 'msgs4me'
self.passwd = 'bar'
self.connected = False
self.connect()
def connect(self):
credentials = pika.PlainCredentials(self.user, self.passwd)
parameters = pika.ConnectionParameters(host = self.host,
port = self.port,
virtual_host = self.vhost,
credentials = credentials)
srs = pika.connection.SimpleReconnectionStrategy()
logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host,
self.port))
self.connection = tornado_adapter.TornadoConnection(parameters,
wait_for_open = False,
reconnection_strategy = srs,
callback = self.on_connected)
def on_connected(self):
# Open the channel
logging.debug("Events: Opening a channel")
self.channel = self.connection.channel()
# Declare our exchange
logging.debug("Events: Declaring the %s exchange" % self.exchange)
self.channel.exchange_declare(exchange = self.exchange,
type = "fanout",
auto_delete = False,
durable = True)
# Declare our queue for this process
logging.debug("Events: Declaring the %s queue" % self.queue)
self.channel.queue_declare(queue = self.queue,
auto_delete = False,
exclusive = False,
durable = True)
# Bind to the exchange
self.channel.queue_bind(exchange = self.exchange,
queue = self.queue,
routing_key = self.recv_routing_key)
self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True)
# We should be connected if we made it this far
self.connected = True
然后我把它放在一个叫'events.py'的文件里。我的请求处理器和任何后端代码都使用一个叫'common.py'的模块,这个模块封装了一些对两者都有用的代码(我的请求处理器不会直接调用任何amqp模块的方法,数据库、缓存等也是如此),所以我在common.py的模块级别定义了'events=None',然后我像这样实例化Event对象:
import events
def runapp(config):
if myapp.common.events is None:
myapp.common.events = myapp.events.Event(config)
logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events)
http_server = tornado.httpserver.HTTPServer(app,
xheaders=config['HTTPServer']['xheaders'],
no_keep_alive=config['HTTPServer']['no_keep_alive'])
http_server.listen(port)
main_loop = tornado.ioloop.IOLoop.instance()
logging.debug("MAIN IOLOOP: %s", main_loop)
main_loop.start()
新年快乐 :-D