用于分布式处理的持久消息传递

muppet的Python项目详细描述


muppetmutual的python实现。muppet为跨进程或计算机边界的简单消息传递提供远程通道,为跨进程或计算机边界的持久消息传递提供durablechannel。remotechannel和durablechannel都使用redis进行消息存储。

远程频道

远程频道遵循pub-sub模型,在该模型中,在频道上发送的每一条消息都将广播给在该频道上侦听的所有订阅者。

用法:

frommuppetimportRemoteChannel# define the callback to receive messagesdefcallback(message):print("received:",message)# we are done with the receiverreceiver.end()# redis server detailsredis_options={"redis":{"host":"127.0.0.1","port":6379}}# create a remote channel to send messagessender=RemoteChannel("greeting",redis_options)# create a remote channel to receive messagesreceiver=RemoteChannel("greeting",redis_options)# listen for messages by passing the callbackreceiver.listen(callback)# send a messagesender.send("hello")# we are done with the sendersender.end()

耐用频道

持久通道遵循队列模型,其中在通道上发送的消息由侦听该通道的任何一个接收器接收。使用durablechannel,发送者可以发送超时的消息,因此当消息在指定的超时内未被回复时,会通知他们。每封邮件都保证在指定的超时时间内回复,否则,将通过回调通知发件人。

用法:

frommuppetimportDurableChanneldeftimeout_callback(message):print"timed out:",message# we are done with the workerworker.end()# we are done with dispatcherdispatcher.end()# redis server detailsredis_options={"redis":{"host":"127.0.0.1","port":6379}}# create a durable channel to dispatch messagesdispatcher=DurableChannel("dispatcher.1",redis_options)# create a durable channel to receive messages, note the 3rd argument which is the callback for handling timeoutsworker=DurableChannel("worker.1",redis_options,timeout_callback)# dispatch a message to worker.1dispatcher.send(content="task",to="worker.1")# receive the messagemessage=worker.receive()print"received message:",message["content"]# reply to the messageworker.reply(message=message,response="reply",timeout=5000)# receive the replyreply=dispatcher.receive()print"received reply:",reply["content"]# we are happy with the replydispatcher.close(reply)# we are done with dispatcher and workerworker.end()dispatcher.end()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java接口中的每个方法都是抽象的,但在抽象类中,我们也只能使用抽象方法   初始化Java中声明的、未初始化的变量会发生什么情况?   java BouncyCastle openPGP将字节[]数组加密为csv文件   在Java中将类A(和所有子类)映射到类B的实例的字典   RSA公钥编码,在Java和Android中,代码相同,结果不同   java在安卓中实现数字检测语音识别   java取消选择复选框   java如何在其他配置中重用Maven配置XML片段   java有没有一种有效的方法来检查HashMap是否包含映射到相同值的键?   spring处理程序调度失败;嵌套的例外是java。lang.NoClassDefFoundError:org/apache/http/client/HttpClient   带有ehcache的java多层缓存   java如何访问chromium(或任何其他浏览器)cookie   java通过将两个集合与spring data mongodb data中的条件合并来获取计数   安卓中R.java的语法错误