用于异步和人的aiormq的包装器。

aio-pika的Python项目详细描述


爱奥皮卡

ReadTheDocsCoverallsDrone CILatest Versionhttps://img.shields.io/pypi/wheel/aio-pika.svghttps://img.shields.io/pypi/pyversions/aio-pika.svghttps://img.shields.io/pypi/l/aio-pika.svg

异步和人类的aiormq包装器。

注意

由于版本5.0.0,此库不使用pika作为amqp连接器。 低于5.0.0的版本包含或需要pika的源代码。

请参阅documentation中的示例和教程。

如果您是rabbitmq的新手,让我们开始adopted official RabbitMQ tutorial

功能

  • 完全异步的api。
  • 面向对象的api。
  • 透明的自动重新连接,具有完全的状态恢复功能,连接功能强大 (例如,已声明的队列或交换、使用状态和绑定)。
  • python 3.5+兼容(包括3.7)。
  • 对于python 3.4用户可用aio pika<;4
  • 透明publisher confirms支持
  • Transactions支持

安装

pip install aio-pika

使用示例

简单消费者:

importasyncioimportaio_pikaasyncdefmain(loop):connection=awaitaio_pika.connect_robust("amqp://guest:guest@127.0.0.1/",loop=loop)asyncwithconnection:queue_name="test_queue"# Creating channelchannel=awaitconnection.channel()# type: aio_pika.Channel# Declaring queuequeue=awaitchannel.declare_queue(queue_name,auto_delete=True)# type: aio_pika.Queueasyncwithqueue.iterator()asqueue_iter:# Cancel consuming after __aexit__asyncformessageinqueue_iter:asyncwithmessage.process():print(message.body)ifqueue.nameinmessage.body.decode():breakif__name__=="__main__":loop=asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()

简单发布者:

importasyncioimportaio_pikaasyncdefmain(loop):connection=awaitaio_pika.connect_robust("amqp://guest:guest@127.0.0.1/",loop=loop)routing_key="test_queue"channel=awaitconnection.channel()# type: aio_pika.Channelawaitchannel.default_exchange.publish(aio_pika.Message(body='Hello {}'.format(routing_key).encode()),routing_key=routing_key)awaitconnection.close()if__name__=="__main__":loop=asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()

获取单个消息示例:

importasynciofromaio_pikaimportconnect_robust,Messageasyncdefmain(loop):connection=awaitconnect_robust("amqp://guest:guest@127.0.0.1/",loop=loop)queue_name="test_queue"routing_key="test_queue"# Creating channelchannel=awaitconnection.channel()# Declaring exchangeexchange=awaitchannel.declare_exchange('direct',auto_delete=True)# Declaring queuequeue=awaitchannel.declare_queue(queue_name,auto_delete=True)# Binding queueawaitqueue.bind(exchange,routing_key)awaitexchange.publish(Message(bytes('Hello','utf-8'),content_type='text/plain',headers={'foo':'bar'}),routing_key)# Receiving messageincoming_message=awaitqueue.get(timeout=5)# Confirm messageawaitincoming_message.ack()awaitqueue.unbind(exchange,routing_key)awaitqueue.delete()awaitconnection.close()if__name__=="__main__":loop=asyncio.get_event_loop()loop.run_until_complete(main(loop))

请参阅documentation中的其他示例和教程。

版本控制

这个软件遵循Semantic Versioning

对于贡献者

设置开发环境

克隆项目:

git clone https://github.com/mosquito/aio-pika.git
cd aio-pika

aio_pika

virtualenv -p python3.5 env

安装aio_pika

的所有要求
env/bin/pip install -e '.[develop]'

运行测试

注意:要在本地运行测试,需要使用默认用户/密码(guest/guest)和端口(5672)运行rabbitmq实例。

  • protip:使用Docker进行此操作:
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

要测试,请运行:

make test

创建拉取请求

您可以随意创建pull请求,但您应该描述您的案例并添加一些示例。

更改应遵循简单的规则:

  • 当您的更改破坏公共api时,必须增加主版本。
  • 当您的更改对公共api是安全的时(例如,添加了一个具有默认值的参数)
  • 您必须添加测试用例(请参见tests/文件夹)
  • 必须添加docstrings
  • 你可以随意加入“thank’s to” section

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
如何在Java中超时Future而不使用Future。get()是阻塞操作吗?   使用CXF 2.7.17部署到WebSphere 8.5.5.6的java问题   为什么是布尔。类是java。lang.课堂和字符串。类是java。串?   java是验收测试SOAP Web服务的最佳方法?   java如何替换kotlin中的::new?   字符串LastIndexOf和java。lang.IndexOutOfBoundsException   java移动游戏窗口在其他窗口前面LibGDX   java如何构建真正本地的ApacheSpark“胖”jar。JRE内存问题?   JavaGSON更新json文件   java如何替换字符串中的所有#{key}?   java应用程序没有在Android Pie中获取MySQL数据库   java是否可以在JavaDoc中重用@param描述?   java在MangedBean clsass中的任何@Autowired注入都不起作用,它总是等于NULL吗?   java当我为登录用户创建会话时,如何在struts2中维护hibernate会话?   按反向日期排列的java排序适配器ArrayList   Mockito中带doNothing()的java非类方法?