在后台运行pika ioloop或使用自定义ioloop

7 投票
3 回答
7319 浏览
提问于 2025-04-16 13:30

我觉得这件事其实不应该那么复杂,但到目前为止我还没成功。

假设我有一个叫做 PikaClass 的类,它是用来封装 pika 的,并且提供一些业务方法。

def PikaClass(object):
  def __init__(self):
     # connect to the broker
     self.connection = pika.SelectConnection(<connection parameters>, self.on_connect)
     # ..other init stuff..

  def on_connect(self, connection):
     # called when the connection has been established 
     # ..open a channel, declare some queues, etc.

  def start(self):
     # start the polling loop 
     self.connection.ioloop.start()

  def foo(self, **kwargs):
     # do some business logic, e.g., send messages to particular queues

直观来说,我想要实现的是:用户创建一个 PikaClass 的实例,让后台的循环开始运行,然后通过调用一些业务方法来与这个对象互动。

p = PikaClass()
p.start()
bar = p.foo(..)

问题是,调用 p.start() 后会阻塞,导致主代码无法在调用 start() 后与这个对象互动。我最开始的想法是把这个调用放在一个线程里:

Thread(target=p.start()).start()
bar = p.foo(..)

但这样还是会阻塞,你根本无法调用到 p.foo(..)。文档提到不应该在多个线程之间共享连接,这可能会引发一些问题。

我还尝试过用 AsyncoreConnection 代替 SelectConnection,并直接调用 _connect()(而不是使用 ioloop),但这样也没有任何效果(什么都没发生)。

那么,我该如何在后台运行 ioloop,或者至少运行我自己的 ioloop 呢?

注意:这是在 win64(xp)上使用 Python 2.6 和最新的 pika 0.9.4。

3 个回答

0

你可能需要一个第二个进程。我对pika了解不多,但如果它是纯Python写的,那么它会想要保持全局解释器锁(GIL)。记住,无论你有多少个核心,每个进程一次只能执行一个线程,这主要是因为Python的垃圾回收机制使用的引用计数有一些限制。

如果你在if __name__ == "__main__":这个代码块中,在做任何其他事情之前就启动一个新进程,并让它在一个循环中等待事件,你就可以给这个第二个进程发送指令,让它去执行某项工作,然后等着它把结果发回给你。你可能还想实现一个索引系统,这样你就可以把工作发给另一个进程,然后不再管它,直到你需要答案的时候再去找。

如果你选择这个方法,我唯一的建议就是要避免副作用,否则你就得考虑竞争条件、死锁以及其他一些全局解释器锁可以保护你免受的麻烦事。在进程开始之前,把它需要的信息发送给它,或者确保在它请求那部分数据之前不要去修改它。

这是少数几次Python会把“枪”递给你,并只给你一个礼貌的提示,告诉你不要把它对准自己脚的情况之一。

10

你现在是直接调用了 'p.start',而不是把它当作参数传进去。正确的写法应该是:

Thread(target=p.start).start()

当执行 Thread.start 的时候,线程会调用 p.start。

我不确定这样做是否能解决你的问题,但这可能会帮助你找到解决办法。

6

这里的全局解释锁(GIL)不是问题,因为 ioloop 几乎所有的时间都在执行 select(2) 系统调用。在这个过程中,GIL 会被释放,其他的 Python 线程可以继续运行,去做其他的事情。

最简单的方法是为每个请求建立和拆除一个队列连接。你可能会觉得这样做太费劲了——因为每次连接都需要重新认证,还可能需要重复进行 SSL 协商——但这应该是最简单、最稳健、最容易编写的方法。除非你知道建立和拆除连接会对你应用的整体性能造成影响(这最好通过测试来衡量),否则这些因素应该是你考虑的重点。

另一种方法是只在有消息要发送时才 start() ioloop,然后让接收回复的方法停止 ioloop,这样你的程序就能重新获得控制权。你可以通过以下方式让 ioloop 提前返回:

    connection.ioloop.poller.open = False

然后记得在再次调用 start() 等待另一个回复之前,把它设置回 True

撰写回答