没有项目描述
aioamqp-consumer-best的Python项目详细描述
AIOAMQP消费者最佳
用法
importasynciofromtypingimportListfromaioamqp_consumer_bestimport(ConnectionParams,Consumer,Exchange,Message,ProcessBulk,Queue,QueueBinding,ToBulks,load_json,)asyncdefcallback(messages:List[Message])->None:print(messages)consumer=Consumer(middleware=(load_json|ToBulks(max_bulk_size=10,bulk_timeout=3.0)|ProcessBulk(callback)),prefetch_count=10,queue=Queue(name='test-queue',bindings=[QueueBinding(exchange=Exchange('test-exchange'),routing_key='test-routing-key',),],),connection_params=[# Round robinConnectionParams(),ConnectionParams.from_string('amqp://user@rmq-host:5672/'),],)asyncio.get_event_loop().run_until_complete(consumer.start())