python的消息传递库
alauda-kombu的Python项目详细描述
Version: | 3.0.37 |
---|
kombu是python的消息传递库。
kombu的目的是通过 为AMQ协议提供惯用的高级接口,以及 为常见的消息传递问题提供经验证和测试的解决方案。
AMQP是高级消息队列协议,一个开放的标准协议 对于消息定向、队列、路由、可靠性和安全性, 其中,RabbitMQ消息服务器是最流行的实现。
功能
允许应用程序作者支持多个邮件服务器 使用可插拔传输的解决方案。
AMQP transport using the py-amqp, librabbitmq, or qpid-python client libraries.
High performance AMQP transport written in C - when using librabbitmq
This is automatically enabled if librabbitmq is installed:
$ pip install librabbitmq
Virtual transports makes it really easy to add support for non-AMQP transports. There is already built-in support for Redis, Beanstalk, Amazon SQS, CouchDB, MongoDB, ZeroMQ, ZooKeeper, SoftLayer MQ and Pyro.
You can also use the SQLAlchemy and Django ORM transports to use a database as the broker.
In-memory transport for unit testing.
支持消息的自动编码、序列化和压缩 有效载荷。
跨传输的一致异常处理。
确保操作由 处理连接和通道错误。
已经修复了amqplib的几个烦恼,比如支持 超时和在多个频道上等待事件的能力。
已经使用carrot的项目可以通过使用 兼容层。
关于amqp的介绍,您应该阅读文章Rabbits and warrens, 以及Wikipedia article about AMQP。
传输比较
Client | Type | Direct | Topic | Fanout |
amqp | Native | Yes | Yes | Yes |
qpid | Native | Yes | Yes | Yes |
redis | Virtual | Yes | Yes | Yes (PUB/SUB) |
mongodb | Virtual | Yes | Yes | Yes |
beanstalk | Virtual | Yes | Yes [1] | No |
SQS | Virtual | Yes | Yes [1] | Yes [2] |
couchdb | Virtual | Yes | Yes [1] | No |
zookeeper | Virtual | Yes | Yes [1] | No |
in-memory | Virtual | Yes | Yes [1] | No |
django | Virtual | Yes | Yes [1] | No |
sqlalchemy | Virtual | Yes | Yes [1] | No |
SLMQ | Virtual | Yes | Yes [1] | No |
[1] | (1, 2, 3, 4, 5, 6, 7, 8) Declarations only kept in memory, so exchanges/queues must be declared by all clients that needs them. |
[2] | Fanout supported via storing routing tables in SimpleDB. Disabled by default, but can be enabled by using the ^{tt1}$ transport option. |
快速概述
from kombu import Connection, Exchange, Queue media_exchange = Exchange('media', 'direct', durable=True) video_queue = Queue('video', exchange=media_exchange, routing_key='video') def process_media(body, message): print body message.ack() # connections with Connection('amqp://guest:guest@localhost//') as conn: # produce producer = conn.Producer(serializer='json') producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013}, exchange=media_exchange, routing_key='video', declare=[video_queue]) # the declare above, makes sure the video queue is declared # so that the messages can be delivered. # It's a best practice in Kombu to have both publishers and # consumers declare the queue. You can also declare the # queue manually using: # video_queue(conn).declare() # consume with conn.Consumer(video_queue, callbacks=[process_media]) as consumer: # Process messages and handle events on all channels while True: conn.drain_events() # Consume from several queues on the same channel: video_queue = Queue('video', exchange=media_exchange, key='video') image_queue = Queue('image', exchange=media_exchange, key='image') with connection.Consumer([video_queue, image_queue], callbacks=[process_media]) as consumer: while True: connection.drain_events()
或手动处理频道:
with connection.channel() as channel: producer = Producer(channel, ...) consumer = Producer(channel)
所有对象也可以在WITH语句之外使用, 使用后请记住关闭对象:
from kombu import Connection, Consumer, Producer connection = Connection() # ... connection.release() consumer = Consumer(channel_or_connection, ...) consumer.register_callback(my_callback) consumer.consume() # .... consumer.cancel()
exchange和queue只是可以pickle的声明 用于配置文件等。
它们也支持操作,但要这样做,它们需要绑定 去一个频道。
将交换和队列绑定到连接将使其使用 连接默认频道。
>>> exchange = Exchange('tasks', 'direct') >>> connection = Connection() >>> bound_exchange = exchange(connection) >>> bound_exchange.delete() # the original exchange is not affected, and stays unbound. >>> exchange.delete() raise NotBoundError: Can't call delete on Exchange not bound to a channel.
安装
您可以通过python包索引(pypi)安装kombu 或者来源。
要使用pip安装,请执行以下操作:
$ pip install kombu
要使用“简易安装”进行安装,请执行以下操作:
$ easy_install kombu
如果你下载了一个源tarball,你可以安装它 执行以下操作:
$ python setup.py build # python setup.py install # as root
术语
在开始之前,您应该熟悉一些概念:
Producers
Producers sends messages to an exchange.
交换
Messages are sent to exchanges. Exchanges are named and can be configured to use one of several routing algorithms. The exchange routes the messages to consumers by matching the routing key in the message with the routing key the consumer provides when binding to the exchange.
消费者
Consumers declares a queue, binds it to a exchange and receives messages from it.
排队
Queues receive messages sent to exchanges. The queues are declared by consumers.
路由键
Every message has a routing key. The interpretation of the routing key depends on the exchange type. There are four default exchange types defined by the AMQP standard, and vendors can define custom types (so see your vendors manual for details).
These are the default exchange types defined by AMQP/0.8:
Direct exchange
Matches if the routing key property of the message and the routing_key attribute of the consumer are identical.
扇出式交换机
Always matches, even if the binding does not have a routing key.
主题交换
Matches the routing key property of the message by a primitive pattern matching scheme. The message routing key then consists of words separated by dots (“.”, like domain names), and two special characters are available; star (“*”) and hash (“#”). The star matches any word, and the hash matches zero or more words. For example “*.stock.#” matches the routing keys “usd.stock” and “eur.stock.db” but not “stock.nasdaq”.
错误跟踪程序
如果您有任何建议、错误报告或烦恼,请报告 我们的问题跟踪者在http://github.com/celery/kombu/issues/