远程AMQP模拟

amqp-mock的Python项目详细描述


AMQP模拟

LicenseCodecovPyPIPython Version

安装

pip3 install amqp-mock

概述

测试发布

^{pr2}$

此处提供完整代码:^{}

测试消耗

fromamqp_mockimportcreate_amqp_mock,Message,MessageStatus# 1. Start AMQP mock serverasyncwithcreate_amqp_mock()asmock:# 2. Mock next messageawaitmock.client.publish_message("queue",Message([1,2,3]))# 3. Consume message via "system under test"consume_message("queue")# 4. Test message has been consumedhistory=awaitmock.client.get_queue_message_history("queue")asserthistory[0].status==MessageStatus.ACKED

此处提供完整代码:^{}

模拟服务器

发布消息

POST /queues/{queue}/messages

{"id":"9e342ac1-eef6-40b1-9eaf-053ee7887968","value":[1,2,3],"exchange":"","routing_key":"","properties":null}
HTTP

$ http POST localhost/queues/test_queue/messages \
    value:='[1, 2, 3]'\exchange=test_exchange

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()message=Message([1,2,3],exchange="test_exchange")awaitmock_client.publish_message("test_queue",message)

获取队列消息历史记录

GET /queues/{queue}/messages/history

HTTP

$ http GET localhost/queues/test_queue/messages/history

HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json;charset=utf-8

[{"message": {"exchange": "test_exchange",
            "id": "94459a41-9119-479a-98c9-80bc9dabb719",
            "properties": null,
            "routing_key": "",
            "value": [1, 2, 3]},
        "queue": "test_queue",
        "status": "ACKED"}]

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()awaitmock_client.get_queue_message_history("test_queue")# [#   <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,#                  queue='test_queue',#                  status=MessageStatus.ACKED># ]

获取交换消息

GET /exchanges/{exchange}/messages

HTTP

$ http GET localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json;charset=utf-8

[{"exchange": "test_exchange",
        "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
        "properties": {"app_id": "",
            "cluster_id": "",
            "content_encoding": "",
            "content_type": "",
            "correlation_id": "",
            "delivery_mode": 1,
            "expiration": "",
            "headers": null,
            "message_id": "5ec9024c74eca2e419fd7e29f7be846c",
            "message_type": "",
            "priority": null,
            "reply_to": "",
            "timestamp": null,
            "user_id": ""},
        "routing_key": "",
        "value": [1, 2, 3]}]

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()messages=awaitmock_client.get_exchange_messages("test_exchange")# [#   <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''># ]

删除exchange邮件

DELETE /exchanges/{exchange}/messages

HTTP

$ http DELETE localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()awaitmock_client.delete_exchange_messages("test_exchange")

重置

DELETE /

HTTP

$ http DELETE localhost/

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

fromamqp_mockimportAmqpMockClientmock_client=AmqpMockClient()awaitmock_client.reset()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java JavaFX TableView更新单元格,不更新对象值   在扫描器中使用分隔符的java   java OkHttp 4.9.2,连接无法重用,导致端口耗尽   eclipse中的c JNI:运行Java代码   java是否在出厂的所有硬件设备中都有/mnt/sdcard/Android/data文件夹(或等效文件夹)?   Java,在eclipse中访问资源文件夹中的图像   java为什么Bluemix dashDB操作抛出SqlSyntaxErrorException,SQLCODE=1667?   JavaHtmlUnitWebClient。getPage不处理javascript   Google API认证的java问题   java如何将JSON数组反序列化为Apache beam PCollection<javaObject>   ServerSocket停止接收命令,java/安卓   来自Java类的安卓 Toast消息   java如何自动重新加载应用程序引擎开发服务器?   java是否可以尝试/捕获一些东西来检查是否抛出了异常?   java如何做到这一点当我按下load game时,它不仅会加载信息,还会将您带到游戏中?   Java选项Xmx代表什么?   Java映射,它在插入时打印值   设置“ulimit c unlimited”后,java无法生成系统核心转储