使用Pika和RabbitMQ的Asyncore时出现AMQPConnectionError - 为什么?
为什么我在使用Asyncore时会遇到AMQPConnectionError,而使用BlockingConnection却没有?
如果只是因为“Asyncore在Windows上不工作”,那就这样吧,不过我还没找到任何禁止它使用的东西。(这个问题与平台无关。)为了方便迁移,我想使用在Python 2.7和Python 3.4上都能用的异步库,而Asyncore应该可以在这里工作。
我在使用RabbitMQ 3.2.4,Python 2.7.6和pika 0.9.13。用户和管理员的运行级别没有影响。代码中有无日志记录器对错误没有影响,除了上面更新的警告信息。在Linux(Ubuntu 14.04)和Windows 7上都出现同样的错误,所以这不是平台问题。
因为使用BlockingConnection时pika的性能比较差,我想尝试Asyncore适配器。对于测试环境来说,这看起来相当简单(我试着提供凭据,虽然这应该无关紧要,如果没有提供回调函数,它们会被忽略……无论如何都会失败):
根据教程,使用BlockingConnection - 它可以工作,但吞吐量很低:
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
使用AsyncoreConnection - 我尝试的所有变体都立即失败:
connection = pika.AsyncoreConnection(pika.ConnectionParameters(host='localhost'))
错误信息:
WARNING:pika.connection:Could not connect, 0 attempts left
Traceback (most recent call last):
File "C:\workspace\send.py", line 8, in <module>
connection = pika.AsyncoreConnection(pika.ConnectionParameters(host='localhost'))
File "C:\Python27\lib\site-packages\pika\adapters\asyncore_connection.py", line 135, in __init__
stop_ioloop_on_close)
File "C:\Python27\lib\site-packages\pika\adapters\base_connection.py", line 62, in __init__
on_close_callback)
File "C:\Python27\lib\site-packages\pika\connection.py", line 590, in __init__
self.connect()
File "C:\Python27\lib\site-packages\pika\connection.py", line 707, in connect
self.callbacks.process(0, self.ON_CONNECTION_ERROR, self, self)
File "C:\Python27\lib\site-packages\pika\callback.py", line 61, in wrapper
return function(*tuple(args), **kwargs)
File "C:\Python27\lib\site-packages\pika\callback.py", line 92, in wrapper
return function(*args, **kwargs)
File "C:\Python27\lib\site-packages\pika\callback.py", line 232, in process
callback(*args, **keywords)
File "C:\Python27\lib\site-packages\pika\connection.py", line 1192, in _on_connection_error
raise exceptions.AMQPConnectionError(self.params.connection_attempts)
pika.exceptions.AMQPConnectionError: 1
4 个回答
你需要通过运行下面的命令来启动rabbitmq-server,这样它才能正常工作。
sudo systemctl start rabbitmq-server
阅读这篇帖子: 找不到“pika.adapters.blocking_connection”的日志处理程序
通过添加以下内容解决了问题:
import logging
logging.basicConfig()
编辑
这个问题已经被报告过了,链接在这里:https://github.com/pika/pika/issues/468
试试下面提到的步骤。我在我的CentOS机器上也遇到了同样的问题。
- 用命令安装RabbitMQ服务器:sudo yum install rabbitmq-server
- 重启RabbitMQ服务器:sudo service rabbitmq-server restart
看起来这其实是pika里的一个bug。这里是引发异常的connection.connect()代码:
def connect(self):
"""Invoke if trying to reconnect to a RabbitMQ server. Constructing the
Connection object should connect on its own.
"""
self._set_connection_state(self.CONNECTION_INIT)
if self._adapter_connect():
return self._on_connected()
self.remaining_connection_attempts -= 1
LOGGER.warning('Could not connect, %i attempts left',
self.remaining_connection_attempts)
if self.remaining_connection_attempts:
LOGGER.info('Retrying in %i seconds', self.params.retry_delay)
self.add_timeout(self.params.retry_delay, self.connect)
else:
self.callbacks.process(0, self.ON_CONNECTION_ERROR, self, self)
self.remaining_connection_attempts = self.params.connection_attempts
self._set_connection_state(self.CONNECTION_CLOSED)
所以,self._adapter_connect()
显然没有返回True,这说明连接失败了。下面是AsyncoreConnection._adapter_connect
的代码:
def _adapter_connect(self):
"""Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
Pika's suggested buffer size for socket reading and writing. We pass
the handle to self so that the AsyncoreDispatcher object can call back
into our various state methods.
"""
if super(AsyncoreConnection, self)._adapter_connect():
self.socket = PikaDispatcher(self.socket, None, self._handle_events)
self.ioloop = self.socket
self._on_connected()
它什么都没有返回!所以在connect
里的那个if语句永远不会为True。如果我把这个方法改成其他适配器使用的那种模式:
def _adapter_connect(self):
"""Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
Pika's suggested buffer size for socket reading and writing. We pass
the handle to self so that the AsyncoreDispatcher object can call back
into our various state methods.
"""
if super(AsyncoreConnection, self)._adapter_connect():
self.socket = PikaDispatcher(self.socket, None, self._handle_events)
self.ioloop = self.socket
return True
return False
那就能正常工作了。我肯定会把这个bug上报的!
补充:
这个bug在最新版本中似乎已经修复了(来自github):
def _adapter_connect(self):
"""Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting Pika's suggested buffer size for socket reading and writing. We pass the handle to self so that the AsyncoreDispatcher object can call back into our various state methods.
"""
error = super(AsyncoreConnection, self)._adapter_connect()
if not error:
self.socket = PikaDispatcher(self.socket, None,
self._handle_events)
self.ioloop = self.socket
self._on_connected()
return error