远程AMQP模拟

amqp-mock的Python项目详细描述


AMQP模拟

LicenseCodecovPyPIPython Version

安装

pip3 install amqp-mock

概述

测试发布

^{pr2}$

此处提供完整代码:^{}

测试消耗

fromamqp_mockimportcreate_amqp_mock,Message,MessageStatus# 1. Start AMQP mock serverasyncwithcreate_amqp_mock()asmock:# 2. Mock next messageawaitmock.client.publish_message("queue",Message([1,2,3]))# 3. Consume message via "system under test"consume_message("queue")# 4. Test message has been consumedhistory=awaitmock.client.get_queue_message_history("queue")asserthistory[0].status==MessageStatus.ACKED

此处提供完整代码:^{}

模拟服务器

发布消息

POST /queues/{queue}/messages

{"id":"9e342ac1-eef6-40b1-9eaf-053ee7887968","value":[1,2,3],"exchange":"","routing_key":"","properties":null}
HTTP

$ http POST localhost/queues/test_queue/messages \
    value:='[1, 2, 3]'\exchange=test_exchange

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()message=Message([1,2,3],exchange="test_exchange")awaitmock_client.publish_message("test_queue",message)

获取队列消息历史记录

GET /queues/{queue}/messages/history

HTTP

$ http GET localhost/queues/test_queue/messages/history

HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json;charset=utf-8

[{"message": {"exchange": "test_exchange",
            "id": "94459a41-9119-479a-98c9-80bc9dabb719",
            "properties": null,
            "routing_key": "",
            "value": [1, 2, 3]},
        "queue": "test_queue",
        "status": "ACKED"}]

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()awaitmock_client.get_queue_message_history("test_queue")# [#   <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,#                  queue='test_queue',#                  status=MessageStatus.ACKED># ]

获取交换消息

GET /exchanges/{exchange}/messages

HTTP

$ http GET localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json;charset=utf-8

[{"exchange": "test_exchange",
        "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
        "properties": {"app_id": "",
            "cluster_id": "",
            "content_encoding": "",
            "content_type": "",
            "correlation_id": "",
            "delivery_mode": 1,
            "expiration": "",
            "headers": null,
            "message_id": "5ec9024c74eca2e419fd7e29f7be846c",
            "message_type": "",
            "priority": null,
            "reply_to": "",
            "timestamp": null,
            "user_id": ""},
        "routing_key": "",
        "value": [1, 2, 3]}]

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()messages=awaitmock_client.get_exchange_messages("test_exchange")# [#   <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''># ]

删除exchange邮件

DELETE /exchanges/{exchange}/messages

HTTP

$ http DELETE localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()awaitmock_client.delete_exchange_messages("test_exchange")

重置

DELETE /

HTTP

$ http DELETE localhost/

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()awaitmock_client.reset()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java根据选择随机设置图标   java如何将Android Studio与本地服务器数据库SQL server 2008连接?   java在点击鼠标后绘制一个椭圆形   java选项窗格相对于其父项的位置   java如何在Android中的switch case中使用String[]输入?   安卓无法从“15.0.1”确定java版本   如果满足特定条件,java是否重置计时器?   java是一个实体类,可以在OOAD中返回其他实体实例吗?   bash将对jar文件| cut | awk和java程序的调用集成到一个统一进程中   Spring安全更新身份验证成功时的最后登录日期   数据库身份验证中基于Java控制台的客户端服务器登录应用程序错误   java Selenium junit测试失败,驱动程序过早获取URL   java LibGdx多点触摸事件混淆   带扫描仪的java打印空间   来自旧版应用程序的java springcloudsleuth头   java如何实现模板设计模式?   java如何比较具有多个值的键的两个映射?   读取时R中的java MemoryError。xlsx