AMQPConnectionError使用带有Asyncore的Pika和RabbitMQ-为什么?

2024-05-15 10:18:46 发布

您现在位置:Python中文网/ 问答频道 /正文

为什么我得到的AMQPConnectionError是Asyncore而不是BlockingConnection?

如果只是“Asyncore在Windows中不起作用”,那么就这样吧,尽管我还没有找到任何禁止使用它的东西。(这个问题与平台无关)为了便于迁移,我想使用Python2.7和Python3.4上都可用的异步库,Asyncore应该在这里起作用。

我将RabbitMQ 3.2.4与Python 2.7.6和pika 0.9.13一起使用。用户和管理运行级别没有什么区别。记录程序在代码中的存在或不存在与错误无关,但上面更新的警告消息除外。Linux(Ubuntu14.04)和Windows7中也出现了同样的错误,所以这不是平台问题。

因为使用BlockingConnection时pika的性能相当差,所以我想尝试使用Asyncore适配器。对于一个测试台的设置来说似乎很简单(我试着给它证书,但这不重要,如果不给的话,回调就会被终止。。。无论哪种方式都失败了。):

根据教程使用BlockingConnection-它可以工作,但吞吐量较低:

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

使用异步连接-我尝试过的所有变体都会立即失败:

connection = pika.AsyncoreConnection(pika.ConnectionParameters(host='localhost'))

错误:

WARNING:pika.connection:Could not connect, 0 attempts left
Traceback (most recent call last):
  File "C:\workspace\send.py", line 8, in <module>
    connection = pika.AsyncoreConnection(pika.ConnectionParameters(host='localhost'))
  File "C:\Python27\lib\site-packages\pika\adapters\asyncore_connection.py", line 135, in __init__
    stop_ioloop_on_close)
  File "C:\Python27\lib\site-packages\pika\adapters\base_connection.py", line 62, in __init__
    on_close_callback)
  File "C:\Python27\lib\site-packages\pika\connection.py", line 590, in __init__
    self.connect()
  File "C:\Python27\lib\site-packages\pika\connection.py", line 707, in connect
    self.callbacks.process(0, self.ON_CONNECTION_ERROR, self, self)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 61, in wrapper
    return function(*tuple(args), **kwargs)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 92, in wrapper
    return function(*args, **kwargs)
  File "C:\Python27\lib\site-packages\pika\callback.py", line 232, in process
    callback(*args, **keywords)
  File "C:\Python27\lib\site-packages\pika\connection.py", line 1192, in _on_connection_error
    raise exceptions.AMQPConnectionError(self.params.connection_attempts)
pika.exceptions.AMQPConnectionError: 1

Tags: inpyselflibpackageslinecallbacksite
4条回答

阅读这篇文章:No handlers could be found for logger "pika.adapters.blocking_connection"

通过添加修复:

import logging
logging.basicConfig()

编辑

已报告此问题https://github.com/pika/pika/issues/468

试试下面提到的步骤。我在我的centos机器上也面临同样的问题。

  1. sudo yum安装rabbitmq服务器
  2. sudo服务rabbitmq服务器重启

在我看来,它实际上像一只披萨上的虫子。下面是最终引发异常的connection.connect()代码:

def connect(self):
    """Invoke if trying to reconnect to a RabbitMQ server. Constructing the
    Connection object should connect on its own.

    """
    self._set_connection_state(self.CONNECTION_INIT)
    if self._adapter_connect():
        return self._on_connected()
    self.remaining_connection_attempts -= 1
    LOGGER.warning('Could not connect, %i attempts left',
                   self.remaining_connection_attempts)
    if self.remaining_connection_attempts:
        LOGGER.info('Retrying in %i seconds', self.params.retry_delay)
        self.add_timeout(self.params.retry_delay, self.connect)
    else:
        self.callbacks.process(0, self.ON_CONNECTION_ERROR, self, self)
        self.remaining_connection_attempts = self.params.connection_attempts
        self._set_connection_state(self.CONNECTION_CLOSED)

因此,self._adapter_connect()显然没有返回True,这表明连接失败。这里是AsyncoreConnection._adapter_connect代码:

def _adapter_connect(self):
    """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
    Pika's suggested buffer size for socket reading and writing. We pass
    the handle to self so that the AsyncoreDispatcher object can call back
    into our various state methods.

    """
    if super(AsyncoreConnection, self)._adapter_connect():
        self.socket = PikaDispatcher(self.socket, None, self._handle_events)
        self.ioloop = self.socket
        self._on_connected()

它什么也不回!所以connect中的if语句永远不会是真的。如果我更改方法以反映所有其他适配器使用的模式:

def _adapter_connect(self):
    """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
    Pika's suggested buffer size for socket reading and writing. We pass
    the handle to self so that the AsyncoreDispatcher object can call back
    into our various state methods.

    """
    if super(AsyncoreConnection, self)._adapter_connect():
        self.socket = PikaDispatcher(self.socket, None, self._handle_events)
        self.ioloop = self.socket
        return True
    return False

它工作得很好。我一定会把那个臭虫归档的!

编辑:

此错误似乎已在最新版本中修复(从github):

    def _adapter_connect(self):
        """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting Pika's suggested buffer size for socket reading and writing. We pass the handle to self so that the AsyncoreDispatcher object can call back into our various state methods.

        """
        error = super(AsyncoreConnection, self)._adapter_connect()
        if not error:
            self.socket = PikaDispatcher(self.socket, None,
                                         self._handle_events)
            self.ioloop = self.socket
            self._on_connected()
        return error

在我看来,它实际上像一只披萨上的虫子。下面是最终引发异常的connection.connect()代码:

def connect(self):
    """Invoke if trying to reconnect to a RabbitMQ server. Constructing the
    Connection object should connect on its own.

    """
    self._set_connection_state(self.CONNECTION_INIT)
    if self._adapter_connect():
        return self._on_connected()
    self.remaining_connection_attempts -= 1
    LOGGER.warning('Could not connect, %i attempts left',
                   self.remaining_connection_attempts)
    if self.remaining_connection_attempts:
        LOGGER.info('Retrying in %i seconds', self.params.retry_delay)
        self.add_timeout(self.params.retry_delay, self.connect)
    else:
        self.callbacks.process(0, self.ON_CONNECTION_ERROR, self, self)
        self.remaining_connection_attempts = self.params.connection_attempts
        self._set_connection_state(self.CONNECTION_CLOSED)

因此,self._adapter_connect()显然没有返回True,这表明连接失败。这里是AsyncoreConnection._adapter_connect代码:

def _adapter_connect(self):
    """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
    Pika's suggested buffer size for socket reading and writing. We pass
    the handle to self so that the AsyncoreDispatcher object can call back
    into our various state methods.

    """
    if super(AsyncoreConnection, self)._adapter_connect():
        self.socket = PikaDispatcher(self.socket, None, self._handle_events)
        self.ioloop = self.socket
        self._on_connected()

它什么也不回!所以connect中的if语句永远不会是真的。如果我更改方法以反映所有其他适配器使用的模式:

def _adapter_connect(self):
    """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting
    Pika's suggested buffer size for socket reading and writing. We pass
    the handle to self so that the AsyncoreDispatcher object can call back
    into our various state methods.

    """
    if super(AsyncoreConnection, self)._adapter_connect():
        self.socket = PikaDispatcher(self.socket, None, self._handle_events)
        self.ioloop = self.socket
        return True
    return False

它工作得很好。我一定会把那个臭虫归档的!

编辑:

此错误似乎已在最新版本(从github)中修复:

    def _adapter_connect(self):
        """Connect to our RabbitMQ broker using AsyncoreDispatcher, then setting Pika's suggested buffer size for socket reading and writing. We pass the handle to self so that the AsyncoreDispatcher object can call back into our various state methods.

        """
        error = super(AsyncoreConnection, self)._adapter_connect()
        if not error:
            self.socket = PikaDispatcher(self.socket, None,
                                         self._handle_events)
            self.ioloop = self.socket
            self._on_connected()
        return error

相关问题 更多 >