使用RabbitMQ等代理构建管道的消息交换引擎
barterdude的Python项目详细描述
调酒师
消息交换引擎使用RabbitMQ这样的代理来构建管道。这个项目是建立在伟大的async-worker之上的。在
安装
使用Python3.6+
pip install barterdude
使用
使用以下完整示例构建您的消费者:
^{pr2}$建立自己的钩子
底座挂钩(简单型)
这些钩子在消息检索、成功和失败时调用。在
frombarterdude.hooksimportBaseHookfromasyncworker.rabbitmq.messageimportRabbitMQMessageclassMyCounterHook(BaseHook):_consume=_fail=_success=0asyncdefon_success(self,message:RabbitMQMessage):self._success+=1asyncdefon_fail(self,message:RabbitMQMessage,error:Exception):self._fail+=1asyncdefbefore_consume(self,message:RabbitMQMessage):self._consume+=1
Http Hook(开放路由)
这些钩子可以完成简单钩子所做的所有事情,但对路由做出响应。在
fromaiohttpimportwebfrombarterdude.hooksimportHttpHookfromasyncworker.rabbitmq.messageimportRabbitMQMessageclassMyCounterHttpHook(HttpHook):_consume=_fail=_success=0asyncdef__call__(self,req:web.Request):returnweb.json_response(dict(consumed=self._consume,success=self._success,fail=self._fail))asyncdefon_success(self,message:RabbitMQMessage):self._success+=1asyncdefon_fail(self,message:RabbitMQMessage,error:Exception):self._fail+=1asyncdefbefore_consume(self,message:RabbitMQMessage):self._consume+=1
数据共享
遵循async-worker和aiohttp中的方法,
BarterDude
不鼓励使用全局变量,也就是单例变量。在
要在应用程序中全局共享数据状态,BarterDude
的行为类似于dict
。
例如,可以在BarterDude
实例中保存类似全局的变量:
frombarterdudeimportBarterDudebarterdude=BarterDude()baterdude["my_variable"]=data
把它带回消费者那里
asyncdefconsumer_access_storage(msg):data=baterdude["my_variable"]
模式验证
使用的消息可以通过json架构进行验证:
@barterdude.consume_amqp(["queue1","queue2"],monitor,validation_schema={"$schema":"http://json-schema.org/draft-04/schema#","$id":"http://example.com/example.json","type":"object","title":"Message Schema","description":"The root schema comprises the entire JSON document.","additionalProperties":True,"required":["key"],"properties":{"key":{"$id":"#/properties/key","type":"string","title":"The Key Schema","description":"An explanation about message.","default":""}}},requeue_on_validation_fail=False# invalid messages are removed without requeue)
您仍然可以在生成消息之前或在需要时验证消息:
frombarterdude.messageimportMessageValidationvalidator=MessageValidation(json_schema)validator.validate(message)
数据保护
Barterdude考虑了GDPR数据保护,默认情况下不记录消息体,但是您可以使用环境变量BARTERDUDE_LOG_REDACTED=0
停用它
现在消息体将被日志钩子记录。在
此配置只控制BarterDude的默认日志挂钩,对用户自定义用户日志没有影响。如果要使用此配置控制日志,请使用:
frombaterdude.confimportBARTERDUDE_LOG_REDACTED
测试
为了测试异步使用者,我们建议使用asynctest
lib
fromasynctestimportTestCaseclassTestMain(TestCase):deftest_should_pass(self):self.assertTrue(True)
我们希望你喜欢!:传情动漫:
贡献
对于开发和贡献,请遵循Contributing Guide和ALWAYS尊重Code of Conduct
- 项目
标签: