远程AMQP模拟
amqp-mock的Python项目详细描述
AMQP模拟
安装
pip3 install amqp-mock
概述
测试发布
^{pr2}$此处提供完整代码:^{
测试消耗
fromamqp_mockimportcreate_amqp_mock,Message,MessageStatus# 1. Start AMQP mock serverasyncwithcreate_amqp_mock()asmock:# 2. Mock next messageawaitmock.client.publish_message("queue",Message([1,2,3]))# 3. Consume message via "system under test"consume_message("queue")# 4. Test message has been consumedhistory=awaitmock.client.get_queue_message_history("queue")asserthistory[0].status==MessageStatus.ACKED
此处提供完整代码:^{
模拟服务器
发布消息
POST /queues/{queue}/messages
{"id":"9e342ac1-eef6-40b1-9eaf-053ee7887968","value":[1,2,3],"exchange":"","routing_key":"","properties":null}
HTTP
在
$ http POST localhost/queues/test_queue/messages \ value:='[1, 2, 3]'\exchange=test_exchange HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json详细信息>
Python
在
fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()message=Message([1,2,3],exchange="test_exchange")awaitmock_client.publish_message("test_queue",message)详细信息>
获取队列消息历史记录
GET /queues/{queue}/messages/history
HTTP
在
$ http GET localhost/queues/test_queue/messages/history HTTP/1.1 200 OK Content-Length: 190 Content-Type: application/json;charset=utf-8 [{"message": {"exchange": "test_exchange", "id": "94459a41-9119-479a-98c9-80bc9dabb719", "properties": null, "routing_key": "", "value": [1, 2, 3]}, "queue": "test_queue", "status": "ACKED"}]详细信息>
Python
在
fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()awaitmock_client.get_queue_message_history("test_queue")# [# <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,# queue='test_queue',# status=MessageStatus.ACKED># ]详细信息>
获取交换消息
GET /exchanges/{exchange}/messages
HTTP
在
$ http GET localhost/exchanges/test_exchange/messages HTTP/1.1 200 OK Content-Length: 423 Content-Type: application/json;charset=utf-8 [{"exchange": "test_exchange", "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c", "properties": {"app_id": "", "cluster_id": "", "content_encoding": "", "content_type": "", "correlation_id": "", "delivery_mode": 1, "expiration": "", "headers": null, "message_id": "5ec9024c74eca2e419fd7e29f7be846c", "message_type": "", "priority": null, "reply_to": "", "timestamp": null, "user_id": ""}, "routing_key": "", "value": [1, 2, 3]}]详细信息>
Python
在
fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()messages=awaitmock_client.get_exchange_messages("test_exchange")# [# <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''># ]详细信息>
删除exchange邮件
DELETE /exchanges/{exchange}/messages
HTTP
在
$ http DELETE localhost/exchanges/test_exchange/messages HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json详细信息>
Python
在
fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()awaitmock_client.delete_exchange_messages("test_exchange")详细信息>
重置
DELETE /
HTTP
在
$ http DELETE localhost/ HTTP/1.1 200 OK Content-Length: 0 Content-Type: application/json详细信息>
Python
在
fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()awaitmock_client.reset()详细信息>
- 项目
标签: