异步应用程序的简单pubsub模式
aiopubsub的Python项目详细描述
异步应用程序的简单pubsub模式。
aiopubsub只在python 3.6中测试。目前还没有支持旧版本的计划。
importasyncioimportaiopubsubimportlogwoodasyncdefmain():logwood.basic_config()hub=aiopubsub.Hub()publisher=aiopubsub.Publisher(hub,prefix=aiopubsub.Key('a'))subscriber=aiopubsub.Subscriber(hub,'subscriber_id')sub_key=aiopubsub.Key('a','b','*')subscriber.subscribe(sub_key)pub_key=aiopubsub.Key('b','c')publisher.publish(pub_key,'Hello subscriber')awaitasyncio.sleep(0.001)# Let the callback fire.# "('a', 'b', 'c') Hello subscriber" will be printed.key,message=awaitsubscriber.consume()assertkey==aiopubsub.Key('a','b','c')assertmessage=='Hello subscriber'subscriber.remove_all_listeners()asyncio.get_event_loop().run_until_complete(main())
或者,我们可以创建一个侦听器来调用同步回调,而不是直接订阅密钥 当新消息到达时。
defprint_message(key,message):print(key,message)subscriber.add_sync_listener(sub_key,print_message)
或者,如果我们有一个协程回调,我们可以创建一个异步侦听器:
asyncdefprint_message(key,message):awaitasyncio.sleep(1)print(key,message)subscriber.add_async_listener(sub_key,print_message)
如果安装了aiopubsub,它将使用logwood,否则它将默认 到标准日志模块。请注意,运行测试需要logwood。
架构
hub接受来自publishers的消息,并将它们路由到subscribers。每条消息都由其 key-构成层次命名空间的字符串的iterable。订阅者可以订阅通配符键, 其中键的任何部分都可以替换为*(星型)。
addedSubscriber和removedSubscriber消息
添加新订户时,中心将发送此消息
{ "key": ("key", "of", "added", "subscriber"), "currentSubscriberCount": 2 }
在键('Hub', 'addedSubscriber', 'key', 'of', 'added', 'subscriber')(在^{tt3}之后的部分$ 由订阅的密钥组成)。注意currentSubscriberCount字段,该字段指示当前有多少订阅者 已订阅。
当删除订阅服务器时,将发送相同格式的消息,但在密钥下 ('Hub', 'removedSubscriber', 'key', 'of', 'added', 'subscriber')。