一个基于pika的、更可爱的、更人性化的rabbitmq队列实用程序('`)
PI-KA-CHU的Python项目详细描述
皮卡丘
一个基于pika的、更可爱的、更人性化的rabbitmq队列实用程序(’u12445;`)
快速查看
将消息放入队列(独立于命名空间):
importPIKACHUPIKACHU.SimpleProducor("amqp://localhost").put(dict(data="some message"))
从队列中获取一些消息:
forenvelopeinPIKACHU.SimpleConsumer("amqp://localhost").get():print("get message:",envelope.message)envelope.message_read()
使用Listener持续监听消息到达:
defcallback(envelope):print("get message:",envelope.message)envelope.message_read()consumer=PIKACHU.SimpleAsyncConsumer(settings.amqp)ioloop=consumer.start_listen(callback)ioloop.start()
安装
pip install git+https://github.com/smilefufu/PIKACHU@master
或者来自pypi
pip install PI-KA-CHU
主旨
Pikachu关注消息队列的业务场景,因此用户不必了解RabbitMQ的详细概念,如交换、交换类型、绑定密钥……他们只需要知道他们需要什么样的排队模式来完成他们的业务,然后从Pikachu中选择。这就是全部。
因此pikachu计划以人工方式提供一些常用的队列模式:)比如基本模式put/listen和put/get。还有发布/订阅模式。其他模式也在计划中。
皮卡丘制作人
所有生产者的基类。所有生产商使用两个基本参数共享相同的实例化方法:
- url:要连接到rabbitmq的amqp字符串
- 命名空间:不同业务的命名空间。不需要参数,默认值为“pikachu”。
pikachu.asyncConsumer
所有异步使用者的基类。考虑到几乎所有的业务场景都需要一个异步使用者,所以Pikachu中的所有使用者都是异步使用者,除了提供get方法的SimpleConsumer。血压计:
- url:要连接到rabbitmq的amqp字符串
- 命名空间:不同业务的命名空间。不需要参数,默认值为“pikachu”。
- Tornado_模式:如果为true,则消费者将使用pika.tornado连接,该连接使用与Tornado相同的ioloop。默认值为false,使用pika.selectconnection,因此pikachu有自己的ioloop,如果用户有ioloop,则必须将其ioloop与pikachu的ioloop合并。
皮卡丘.简单消费者
与pikachu.producer的参数相同
文档
托多:完成该死的文档
示例
放置/获取模式
生产代码:
importPIKACHUproducor=PIKACHU.SimpleProducor("amqp://localhost",namespace="my_biz")message=dict(content="some message")producor.put(message)
consmer代码(每次运行最多可获取10条消息):
importPIKACHUconsumer=PIKACHU.SimpleConsumer("amqp://localhost")forenvelopeinconsumer.get(max_len=10):print("get message with content:",envelope.message["conent"])envelope.message_read()# mark message as read, it's necessary in put/get or put/listen pattern. If you miss it, all unmarked message will be delivered again next time you start your consumer.
放置/收听模式
producor代码(与put/get模式相同)
消费者代码(等待并收听信息):
importPIKACHUdefcallback(envelope):print("get message with content:",envelope.message["content"])envelope.message_read()consumer=PIKACHU.SimpleAsyncConsumer(settings.amqp)ioloop=consumer.start_listen(callback)ioloop.start()# start the loop to keep the process running
发布/订阅模式
发布者代码:
importPIKACHUpublisher=PIKACHU.BroadCaster("amqp://localhost",namespace="CCTV")message=dict(news="today's news")publisher.publish(message,to_hub="ten")
用户代码:
importPIKACHUsubscriber=PIKACHU.Receiver("amqp://localhost",namespace="CCTV")defcb(envelope):print("get news:",envelope.message["news"])ioloop=subscriber.subscribe(cb,from_hub="ten")ioloop.start()# start the loop to keep the process running