eventlet能否管理AMQP连接,实现消息的异步进出?
实际设计:
对于那些回到这个问题的人,下面的有用回答让我找到了一个可行的设计,现在运行得很好。三个关键点是:
- Eventlet是一个非常安全的环境——如果两个绿色线程同时尝试从同一个套接字进行
recv()
或send()
,Eventlet会优雅地通过抛出异常来终止第二个绿色线程。这太棒了,这意味着如果amqplib
处理得不好,只会出现简单的异常,而不是难以重现的数据交错错误。 amqplib
的方法大致分为两类:一类是recv()
内部的wait()
循环,直到一个AMQP消息组装完成;另一类是send()
消息的,不会尝试自己的recv()
。这真是太幸运了,因为amqplib
的作者根本不知道会有人尝试“绿色化”他们的库!这意味着消息发送不仅安全于wait()
调用的回调,而且可以安全地从完全不受wait()
循环控制的其他绿色线程发送消息。这些安全的方法——可以从任何绿色线程调用,而不仅仅是从wait()
回调中调用——包括:basic_ack
basic_consume
,并且nowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
,并且nowait=True
exchange_delete
,并且nowait=True
queue_bind
,并且nowait=True
queue_unbind
,并且nowait=True
queue_declare
,并且nowait=True
queue_delete
,并且nowait=True
queue_purge
,并且nowait=True
- 信号量可以用作锁:用计数
1
初始化信号量,然后使用acquire()
和release()
来锁定和解锁。所有想要写消息的异步绿色线程都可以使用这样的锁,以避免它们的send()
调用交错,从而破坏AMQP协议。
所以我的代码大致如下:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock = eventlet.semaphore.Semaphore(1)
def listening_greenlet(channel):
# start this using eventlet.spawn_n()
# create Connection and self.channel
self.channel.basic_consume(queue, callback=self.consume)
while True:
self.channel.wait()
def safe_publish(channel, *args, **kw):
with write_lock: # yes, Eventlet supports this!
channel.basic_publish(*args, **kw)
def consume(message):
# Returning immediately frees the wait() loop
eventlet.spawn_n(self.process, message)
def process(message):
# do whatever I want
# whenever I am done, I can async reply:
self.safe_publish(...)
祝你愉快!
原始问题:
想象一下,每分钟有数百条AMQP消息到达一个小型的Python Eventlet应用程序,这些消息都需要被处理和回复——处理的CPU开销会很小,但可能需要等待来自其他服务和套接字的回复。
为了允许一次处理100条消息,我当然可以启动100个单独的TCP连接到RabbitMQ,每个连接都有一个工作线程来接收、处理和逐条回复消息。但为了节省TCP连接,我更希望只创建一个AMQP连接,让RabbitMQ以全速将消息流向我,将这些任务交给工作线程,并在每个工作线程完成时发送回复:
+--------+
+------| worker | <-+
| +--------+ |
| +--------+ |
| +----| worker | <-+
| | +--------+ |
| | +--------+ |
| | +--| worker | <-+
| | | +--------+ |
v v v |
+------------+ |
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
+------------+
请注意:
- 一个Eventlet队列可以优雅地将传入的工作分配给可用的工作线程。
- 来自RabbitMQ的流量控制甚至可能是可行的:我可以在所有工作线程都忙碌时只确认消息,然后在队列开始清空之前等待再发送进一步的确认。
- 工作几乎肯定会以无序的方式完成:一个请求可能很快完成,而另一个更早到达的事件可能需要更长时间;有些请求可能根本不会完成;因此,工作线程将以不可预测和异步的顺序返回响应。
我原本打算在看到关于如何轻松将AMQP库引入Eventlet处理模型的吸引人博客文章后,使用Eventlet和py-amqplib来编写这个:
http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/
我的问题是,在阅读了这两个库的文档、amqplib源代码和Eventlet源代码的大部分内容后,我无法弄清楚如何让拥有AMQP连接的事件线程——在博客文章中称为connect_to_host()
的事件线程——在工作线程完成工作并生成答案时也能“醒来”。wait()
方法只能通过AMQP套接字的活动来唤醒。虽然我觉得我应该能够让工作线程将它们的答案写入一个队列,并让connect_to_host()
事件线程在新消息到达时或工作线程准备好发送答案时醒来,但我找不到任何方法让事件线程说“当这两件事情中的任何一件发生时叫我醒来。”
我想到工作线程可以尝试控制AMQP连接对象——甚至是原始套接字——并通过TCP写回自己的消息;但似乎需要锁来防止工作线程的输出消息与主监听事件线程写入的确认消息交错,而我也找不到Eventlet中可用的锁。
所有这些让我几乎可以肯定,我正在以某种方式反向处理这个问题。像这样的问题——让单个连接在监听-调度器和多个工作线程之间安全共享——是否根本不适合协程模型,而需要一个完整的异步库?(如果是这样:你会推荐哪个库来解决这个问题,如何在传入消息和传出工作线程响应之间进行多路复用?今天早些时候我尝试了像Pika + ioloop这样的组合,但没有找到干净的解决方案——不过我刚刚看到另一个库stormed_amqp,可能比Pika做得更好。)或者如果我想要干净且可维护的代码来实现这个模型,我是否真的需要回退到真正的Python线程?我对所有选项持开放态度。
感谢任何帮助或想法!我一直觉得我对Python中的并发有了很好的理解,但又一次发现我并没有。:) 希望你喜欢上面的ASCII艺术。
1 个回答
在阅读了你的帖子并且使用了gevent这个和eventlet类似的库后,我明白了一些事情,因为我刚刚解决了一个类似的问题。
一般来说,因为同一时间只有一个eventlet或greenlet在运行,所以不需要加锁。只要没有哪个在阻塞,所有的东西看起来都能同时运行。但是,如果你在一个绿色线程(greenlet)里发送数据,而另一个绿色线程也在发送数据,那你是对的,这种情况下确实需要加锁。
如果我遇到像这样的疑问,光看文档是不够的……去看看源代码吧!反正它是开源的,看看别人的代码你能学到很多东西。
这里有一些简化的示例代码,可能会让事情变得更清楚。
在你的调度器里,准备两个队列。
self.worker_queue = Queue() # queue for messages to workers
self.server_queue = Queue() # queue for messages to ampq server
让工作线程把它们的结果放到服务器队列里。
发送和接收的代码
def send_into_ampq():
while True:
message = dispatcher.get_workger_msg()
try:
connection.send(self.encode(message))
except:
connection.kill()
def read_from_ampq():
while True:
message = connection.wait()
dispatcher.put_ampq_msg(self.decode(message))
在你连接代码的发送函数里
self._writelock = Semaphore(1)
# this is a gevent locking thing. eventlet must have something like this too..
# just counts - 1 for locks and +1 for releases blocks otherwise blocks until
# 0 agian.... why not google it i though.. and yes im right:
# eventlet.semaphore.Semaphore(value=1)
def send(self, message):
"""
you need a write lock to prevent more greenlets
sending more messages when previous sent is not done yet.
"""
with self._writelock:
self.socket.sendall(message)