未提供项目说明
curv-amqp的Python项目详细描述
曲线amqp
Pika框架,在使用阻塞连接时处理重新连接,具有有用的默认值、构建块、类型提示和优先级重新排队方法
正在安装程序包
pip install curv-amqp
为本地开发安装
^{pr2}$基本用途
fromcurv_amqp.connectionimportConnection,ConnectionParametersfromcurv_amqp.publisherimportPublisherfromcurv_amqp.consumerimportConsumer,ConsumerMessagehost='localhost'queue_name='test'connection=Connection(parameters=ConnectionParameters(host))publisher=Publisher(connection=connection)publisher.publish(routing_key=queue_name,body=b'message')consumer=Consumer(connection=connection)defon_message_callback(message:ConsumerMessage):print('message.body:',message.body)message.ack()# you don't have to stop consuming here - but you do have to stop the consumer in this thread# eventually since consumer.consume is blockingmessage.consumer.stop_consuming()consumer.consume(queue=queue_name,prefetch_count=1,on_message_callback=on_message_callback)
使用
fromargparseimportArgumentParserfromcurv_amqp.connectionimportConnection,URLParameters,ConnectionParametersfromcurv_amqp.publisherimportPublisherfromcurv_amqp.consumerimportConsumer,ConsumerMessagefromcurv_amqp.exceptionsimportChannelClosedError,ConnectionClosedError,RequeueRetryCountErrordefmain():parser=ArgumentParser()parser.add_argument('--url',type=str,default='localhost',help='amqp url or localhost - ''localhost assumes rabbitmq is installed - ''"brew install rabbitmq"')parser.add_argument('--queue',type=str,default='test-queue-name',help='amqp queue name')parser.add_argument('--body',type=str,default='your message',help='amqp message body')args=parser.parse_args()# pass in URLParameters or ConnectionParameters# its recommended that a single connection per process is used.url:str=args.urlparameters=ConnectionParameters(url)ifurlis'localhost'elseURLParameters(url)queue_name=args.queuebody=bytes(args.body,encoding='utf-8')connection=Connection(parameters=parameters)# testing auto reconnect for connectionconnection.blocking_connection.close()# its required that two different channels are used for a publisher and consumer# NOTE: will automatically declare queue for youpublisher=Publisher(connection=connection)# testing auto reconnect for channel / publisherpublisher.blocking_channel.close()publisher.publish(routing_key=queue_name,body=body)consumer=Consumer(connection=connection)# testing auto reconnect for channel / consumerconsumer.blocking_channel.close()defon_message_callback(message:ConsumerMessage):print('message.body:',message.body)print('message.properties.priority',message.properties.priority)try:message.priority_requeue(publisher)exceptRequeueRetryCountErrorasex:print(ex)message.ack()message.consumer.stop_consuming()consumer.consume(queue=queue_name,prefetch_count=1,on_message_callback=on_message_callback)publisher.publish(routing_key=queue_name,body=body)formsginconsumer.consume_generator(queue=queue_name,prefetch_count=1,auto_ack=True,inactivity_timeout=1):print('msg.body:',msg.body)try:# testing proper channel closepublisher.close()publisher.publish(routing_key=queue_name,body=body)exceptChannelClosedErrorase:print(e)try:# testing proper connection closepublisher=Publisher(connection=connection)connection.close()publisher.publish(routing_key=queue_name,body=body)exceptConnectionClosedErrorase:print(e)if__name__=='__main__':main()
正在更新包
# update __version__ in __init__.py
python setup.py sdist bdist_wheel
twine check dist/*
twine upload dist/*
或者使用这个脚本-注意它将删除build和dist目录
bash deploy.sh
- 项目
标签: