未提供项目说明

curv-amqp的Python项目详细描述


曲线amqp

Pika框架,在使用阻塞连接时处理重新连接,具有有用的默认值、构建块、类型提示和优先级重新排队方法

正在安装程序包

pip install curv-amqp

为本地开发安装

^{pr2}$

基本用途

fromcurv_amqp.connectionimportConnection,ConnectionParametersfromcurv_amqp.publisherimportPublisherfromcurv_amqp.consumerimportConsumer,ConsumerMessagehost='localhost'queue_name='test'connection=Connection(parameters=ConnectionParameters(host))publisher=Publisher(connection=connection)publisher.publish(routing_key=queue_name,body=b'message')consumer=Consumer(connection=connection)defon_message_callback(message:ConsumerMessage):print('message.body:',message.body)message.ack()# you don't have to stop consuming here - but you do have to stop the consumer in this thread# eventually since consumer.consume is blockingmessage.consumer.stop_consuming()consumer.consume(queue=queue_name,prefetch_count=1,on_message_callback=on_message_callback)

使用

fromargparseimportArgumentParserfromcurv_amqp.connectionimportConnection,URLParameters,ConnectionParametersfromcurv_amqp.publisherimportPublisherfromcurv_amqp.consumerimportConsumer,ConsumerMessagefromcurv_amqp.exceptionsimportChannelClosedError,ConnectionClosedError,RequeueRetryCountErrordefmain():parser=ArgumentParser()parser.add_argument('--url',type=str,default='localhost',help='amqp url or localhost - ''localhost assumes rabbitmq is installed - ''"brew install rabbitmq"')parser.add_argument('--queue',type=str,default='test-queue-name',help='amqp queue name')parser.add_argument('--body',type=str,default='your message',help='amqp message body')args=parser.parse_args()# pass in URLParameters or ConnectionParameters# its recommended that a single connection per process is used.url:str=args.urlparameters=ConnectionParameters(url)ifurlis'localhost'elseURLParameters(url)queue_name=args.queuebody=bytes(args.body,encoding='utf-8')connection=Connection(parameters=parameters)# testing auto reconnect for connectionconnection.blocking_connection.close()# its required that two different channels are used for a publisher and consumer# NOTE: will automatically declare queue for youpublisher=Publisher(connection=connection)# testing auto reconnect for channel / publisherpublisher.blocking_channel.close()publisher.publish(routing_key=queue_name,body=body)consumer=Consumer(connection=connection)# testing auto reconnect for channel / consumerconsumer.blocking_channel.close()defon_message_callback(message:ConsumerMessage):print('message.body:',message.body)print('message.properties.priority',message.properties.priority)try:message.priority_requeue(publisher)exceptRequeueRetryCountErrorasex:print(ex)message.ack()message.consumer.stop_consuming()consumer.consume(queue=queue_name,prefetch_count=1,on_message_callback=on_message_callback)publisher.publish(routing_key=queue_name,body=body)formsginconsumer.consume_generator(queue=queue_name,prefetch_count=1,auto_ack=True,inactivity_timeout=1):print('msg.body:',msg.body)try:# testing proper channel closepublisher.close()publisher.publish(routing_key=queue_name,body=body)exceptChannelClosedErrorase:print(e)try:# testing proper connection closepublisher=Publisher(connection=connection)connection.close()publisher.publish(routing_key=queue_name,body=body)exceptConnectionClosedErrorase:print(e)if__name__=='__main__':main()

正在更新包

# update __version__ in __init__.py
python setup.py sdist bdist_wheel
twine check dist/*
twine upload dist/*

或者使用这个脚本-注意它将删除build和dist目录

bash deploy.sh

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
NetBeans中的Java Swing滚动窗格   java如何与具有复合键的表建立关系?   Android中读取文件时java数据丢失   java黄瓜场景。embed在ubuntu机器上不工作?   java从spring mvc控制器操作中,我如何获得请求/响应的访问权限?   java减去两个长值   java选择下一个值firebase 安卓   用于起始和结尾连字符的java正则表达式   Java正则表达式解释   java Lifefay freemarker ADT:方法不可用?   java我怎样才能让我的开关盒作用于JFrame?   java在我的场景中使用连接池的理想方式是什么   java我如何接受jsoup的cookies?   java如何将整数数组更改为字符串数组?   java Android操作\u指针\u向上直到剩余触摸移动时才调用   java为什么gradle会出错?   io如何在java中复制/拆分输入流?   java使JButton不可见,但尊重其原始空间   java Spring提交表单获取复选框值不起作用