用于分布式处理的持久消息传递
muppet的Python项目详细描述
muppet是mutual的python实现。muppet为跨进程或计算机边界的简单消息传递提供远程通道,为跨进程或计算机边界的持久消息传递提供durablechannel。remotechannel和durablechannel都使用redis进行消息存储。
远程频道
远程频道遵循pub-sub模型,在该模型中,在频道上发送的每一条消息都将广播给在该频道上侦听的所有订阅者。
用法:
frommuppetimportRemoteChannel# define the callback to receive messagesdefcallback(message):print("received:",message)# we are done with the receiverreceiver.end()# redis server detailsredis_options={"redis":{"host":"127.0.0.1","port":6379}}# create a remote channel to send messagessender=RemoteChannel("greeting",redis_options)# create a remote channel to receive messagesreceiver=RemoteChannel("greeting",redis_options)# listen for messages by passing the callbackreceiver.listen(callback)# send a messagesender.send("hello")# we are done with the sendersender.end()
耐用频道
持久通道遵循队列模型,其中在通道上发送的消息由侦听该通道的任何一个接收器接收。使用durablechannel,发送者可以发送超时的消息,因此当消息在指定的超时内未被回复时,会通知他们。每封邮件都保证在指定的超时时间内回复,否则,将通过回调通知发件人。
用法:
frommuppetimportDurableChanneldeftimeout_callback(message):print"timed out:",message# we are done with the workerworker.end()# we are done with dispatcherdispatcher.end()# redis server detailsredis_options={"redis":{"host":"127.0.0.1","port":6379}}# create a durable channel to dispatch messagesdispatcher=DurableChannel("dispatcher.1",redis_options)# create a durable channel to receive messages, note the 3rd argument which is the callback for handling timeoutsworker=DurableChannel("worker.1",redis_options,timeout_callback)# dispatch a message to worker.1dispatcher.send(content="task",to="worker.1")# receive the messagemessage=worker.receive()print"received message:",message["content"]# reply to the messageworker.reply(message=message,response="reply",timeout=5000)# receive the replyreply=dispatcher.receive()print"received reply:",reply["content"]# we are happy with the replydispatcher.close(reply)# we are done with dispatcher and workerworker.end()dispatcher.end()