python的消息传递库

alauda-kombu的Python项目详细描述


Version:3.0.37

kombu是python的消息传递库。

kombu的目的是通过 为AMQ协议提供惯用的高级接口,以及 为常见的消息传递问题提供经验证和测试的解决方案。

AMQP是高级消息队列协议,一个开放的标准协议 对于消息定向、队列、路由、可靠性和安全性, 其中,RabbitMQ消息服务器是最流行的实现。

功能

  • 允许应用程序作者支持多个邮件服务器 使用可插拔传输的解决方案。

    • AMQP transport using the py-amqp, librabbitmq, or qpid-python client libraries.

    • High performance AMQP transport written in C - when using librabbitmq

      This is automatically enabled if librabbitmq is installed:

      $ pip install librabbitmq
      
    • Virtual transports makes it really easy to add support for non-AMQP transports. There is already built-in support for Redis, Beanstalk, Amazon SQS, CouchDB, MongoDB, ZeroMQ, ZooKeeper, SoftLayer MQ and Pyro.

    • You can also use the SQLAlchemy and Django ORM transports to use a database as the broker.

    • In-memory transport for unit testing.

  • 支持消息的自动编码、序列化和压缩 有效载荷。

  • 跨传输的一致异常处理。

  • 确保操作由 处理连接和通道错误。

  • 已经修复了amqplib的几个烦恼,比如支持 超时和在多个频道上等待事件的能力。

  • 已经使用carrot的项目可以通过使用 兼容层。

关于amqp的介绍,您应该阅读文章Rabbits and warrens, 以及Wikipedia article about AMQP

传输比较

ClientTypeDirectTopicFanout
amqpNativeYesYesYes
qpidNativeYesYesYes
redisVirtualYesYesYes (PUB/SUB)
mongodbVirtualYesYesYes
beanstalkVirtualYesYes [1]No
SQSVirtualYesYes [1]Yes [2]
couchdbVirtualYesYes [1]No
zookeeperVirtualYesYes [1]No
in-memoryVirtualYesYes [1]No
djangoVirtualYesYes [1]No
sqlalchemyVirtualYesYes [1]No
SLMQVirtualYesYes [1]No
[1](1, 2, 3, 4, 5, 6, 7, 8) Declarations only kept in memory, so exchanges/queues must be declared by all clients that needs them.
[2]Fanout supported via storing routing tables in SimpleDB. Disabled by default, but can be enabled by using the ^{tt1}$ transport option.

文件

Kombu正在使用Sphinx,最新的文档可以在这里找到:

https://kombu.readthedocs.io/

快速概述

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections
with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue.  You can also declare the
    # queue manually using:
    #     video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')

with connection.Consumer([video_queue, image_queue],
                         callbacks=[process_media]) as consumer:
    while True:
        connection.drain_events()

或手动处理频道:

with connection.channel() as channel:
    producer = Producer(channel, ...)
    consumer = Producer(channel)

所有对象也可以在WITH语句之外使用, 使用后请记住关闭对象:

from kombu import Connection, Consumer, Producer

connection = Connection()
    # ...
connection.release()

consumer = Consumer(channel_or_connection, ...)
consumer.register_callback(my_callback)
consumer.consume()
    # ....
consumer.cancel()

exchangequeue只是可以pickle的声明 用于配置文件等。

它们也支持操作,但要这样做,它们需要绑定 去一个频道。

将交换和队列绑定到连接将使其使用 连接默认频道。

>>> exchange = Exchange('tasks', 'direct')

>>> connection = Connection()
>>> bound_exchange = exchange(connection)
>>> bound_exchange.delete()

# the original exchange is not affected, and stays unbound.
>>> exchange.delete()
raise NotBoundError: Can't call delete on Exchange not bound to
    a channel.

安装

您可以通过python包索引(pypi)安装kombu 或者来源。

要使用pip安装,请执行以下操作:

$ pip install kombu

要使用“简易安装”进行安装,请执行以下操作:

$ easy_install kombu

如果你下载了一个源tarball,你可以安装它 执行以下操作:

$ python setup.py build
# python setup.py install # as root

术语

在开始之前,您应该熟悉一些概念:

  • Producers

    Producers sends messages to an exchange.

  • 交换

    Messages are sent to exchanges. Exchanges are named and can be configured to use one of several routing algorithms. The exchange routes the messages to consumers by matching the routing key in the message with the routing key the consumer provides when binding to the exchange.

  • 消费者

    Consumers declares a queue, binds it to a exchange and receives messages from it.

  • 排队

    Queues receive messages sent to exchanges. The queues are declared by consumers.

  • 路由键

    Every message has a routing key. The interpretation of the routing key depends on the exchange type. There are four default exchange types defined by the AMQP standard, and vendors can define custom types (so see your vendors manual for details).

    These are the default exchange types defined by AMQP/0.8:

    • Direct exchange

      Matches if the routing key property of the message and the routing_key attribute of the consumer are identical.

    • 扇出式交换机

      Always matches, even if the binding does not have a routing key.

    • 主题交换

      Matches the routing key property of the message by a primitive pattern matching scheme. The message routing key then consists of words separated by dots (“.”, like domain names), and two special characters are available; star (“*”) and hash (“#”). The star matches any word, and the hash matches zero or more words. For example “*.stock.#” matches the routing keys “usd.stock” and “eur.stock.db” but not “stock.nasdaq”.

获取帮助

邮件列表

加入carrot-users邮件列表。

错误跟踪程序

如果您有任何建议、错误报告或烦恼,请报告 我们的问题跟踪者在http://github.com/celery/kombu/issues/

贡献

在github:http://github.com/celery/kombu

开发kombu

我们非常鼓励您参与开发。如果你不知道 像github(出于某种原因)一样,欢迎您发送常规补丁。

许可证

此软件在新的bsd许可证下获得许可。查看许可证 文件位于顶级分发目录中,以获取完整的许可证文本。

Bitdeli badge

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java使用并发hashmap减少线程池的内存使用?   java为什么在提交片段后出现错误“getParentActivityIntent:badActivity name”?   vim UltiSnips扩展java包   java给出了一个名称列表,如何插入、删除、显示、搜索和退出?Java程序   java Spring集成:只从FTP服务器下载新的或更新的文件?   使用Java中的Scala:将函数作为参数传递   java线程1每秒填充一个映射,线程2每60秒保存一个条目   java从私有类访问公共类中的方法/字段   如何使用ApacheSpark流媒体和JavaAPI从所有人那里获取英语推文?   java是否可以在父标记和子标记中编写XMLSchemainstance?   java无法读取文本文件,也找不到解决方案   java在运行时加载类时无法创建bean