Pika python 异步发布者:如何通过控制台发送用户数据?

5 投票
1 回答
4074 浏览
提问于 2025-04-17 21:18

我正在使用标准的异步发布者示例,发现发布者会不停地循环发布相同的消息,像是永远都停不下来。

所以我把publish_message中的schedule_next_message这一行注释掉了,以停止这个循环。

但我真正想要的是,当用户提供“message_body”和“Key”时,发布者才开始发布消息。

简单来说,就是希望发布者能根据用户的输入来发布消息。

我找不到任何示例或提示,说明如何让发布者实时接收用户输入。

我对rabbitmq、pika、python等都很陌生。

下面是我提到的代码片段:

def publish_message(self):
    """If the class is not stopping, publish a message to RabbitMQ,
    appending a list of deliveries with the message number that was sent.
    This list will be used to check for delivery confirmations in the
    on_delivery_confirmations method.

    Once the message has been sent, schedule another message to be sent.
    The main reason I put scheduling in was just so you can get a good idea
    of how the process is flowing by slowing down and speeding up the
    delivery intervals by changing the PUBLISH_INTERVAL constant in the
    class.

    """
    if self._stopping:
        return

    message = {"service":"sendgrid", "sender": "nutshi@gmail.com", "receiver": "nutshi@gmail.com", "subject": "test notification", "text":"sample email"}
    routing_key = "email"
    properties = pika.BasicProperties(app_id='example-publisher',
                                      content_type='application/json',
                                      headers=message)

    self._channel.basic_publish(self.EXCHANGE, routing_key,
                                json.dumps(message, ensure_ascii=False),
                                properties)
    self._message_number += 1
    self._deliveries.append(self._message_number)
    LOGGER.info('Published message # %i', self._message_number)
    #self.schedule_next_message()
    #self.stop()

def schedule_next_message(self):
    """If we are not closing our connection to RabbitMQ, schedule another
    message to be delivered in PUBLISH_INTERVAL seconds.

    """
    if self._stopping:
        return
    LOGGER.info('Scheduling next message for %0.1f seconds',
                self.PUBLISH_INTERVAL)
    self._connection.add_timeout(self.PUBLISH_INTERVAL,
                                 self.publish_message)

def start_publishing(self):
    """This method will enable delivery confirmations and schedule the
    first message to be sent to RabbitMQ

    """
    LOGGER.info('Issuing consumer related RPC commands')
    self.enable_delivery_confirmations()
    self.schedule_next_message()

这个网站不让我添加解决方案……我通过使用raw_input()解决了我的问题。

谢谢!

1 个回答

1

我知道我回答这个问题有点晚,但你有没有看过这个链接

这个似乎跟你需要的内容更相关,而不是使用一个完整的异步发布者。通常情况下,你会用这些和Python的队列一起,来在不同的线程之间传递消息。

撰写回答