用于分布式处理的持久消息传递

muppet的Python项目详细描述


muppetmutual的python实现。muppet为跨进程或计算机边界的简单消息传递提供远程通道,为跨进程或计算机边界的持久消息传递提供durablechannel。remotechannel和durablechannel都使用redis进行消息存储。

远程频道

远程频道遵循pub-sub模型,在该模型中,在频道上发送的每一条消息都将广播给在该频道上侦听的所有订阅者。

用法:

frommuppetimportRemoteChannel# define the callback to receive messagesdefcallback(message):print("received:",message)# we are done with the receiverreceiver.end()# redis server detailsredis_options={"redis":{"host":"127.0.0.1","port":6379}}# create a remote channel to send messagessender=RemoteChannel("greeting",redis_options)# create a remote channel to receive messagesreceiver=RemoteChannel("greeting",redis_options)# listen for messages by passing the callbackreceiver.listen(callback)# send a messagesender.send("hello")# we are done with the sendersender.end()

耐用频道

持久通道遵循队列模型,其中在通道上发送的消息由侦听该通道的任何一个接收器接收。使用durablechannel,发送者可以发送超时的消息,因此当消息在指定的超时内未被回复时,会通知他们。每封邮件都保证在指定的超时时间内回复,否则,将通过回调通知发件人。

用法:

frommuppetimportDurableChanneldeftimeout_callback(message):print"timed out:",message# we are done with the workerworker.end()# we are done with dispatcherdispatcher.end()# redis server detailsredis_options={"redis":{"host":"127.0.0.1","port":6379}}# create a durable channel to dispatch messagesdispatcher=DurableChannel("dispatcher.1",redis_options)# create a durable channel to receive messages, note the 3rd argument which is the callback for handling timeoutsworker=DurableChannel("worker.1",redis_options,timeout_callback)# dispatch a message to worker.1dispatcher.send(content="task",to="worker.1")# receive the messagemessage=worker.receive()print"received message:",message["content"]# reply to the messageworker.reply(message=message,response="reply",timeout=5000)# receive the replyreply=dispatcher.receive()print"received reply:",reply["content"]# we are happy with the replydispatcher.close(reply)# we are done with dispatcher and workerworker.end()dispatcher.end()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
显示图像的RGB编号的java   java JavaFX画布2D游戏:背景变换vs.绘画   在到达maxElementsInMemory之前创建的java DiskMarker   a4j:ajax可用事件的java详尽列表?   java从批处理文件运行jar文件,如果出现错误,则显示meessage   音频Java在背景音乐之上播放声音   用于在FTP中上载文件的java更改目录   尽管设置了必要的属性,java列表项仍不会保持选中状态   java Stanford Core NLP解析与CSV   java使用缓冲区合并热态和冷态   java无法初始化类javax。加密。JCE安全   对这个Java循环如此困惑的输入   java Spring RabbitMQ SimpleRabbitListenerContainerFactory用法   java如何使用jGrowl创建JSF消息   安装jRebel插件后,Netbeans项目中的java源文件夹不可见?   如何在Java中解析复杂的json字符串   java Spark KafkaUtils CreateRDD在键上应用过滤器   try块中的java代码被忽略,为什么?