非阻塞的Redis发布/订阅可能吗?
我想用Redis的发布/订阅功能来传递一些消息,但不想像下面的代码那样被阻塞:
import redis
rc = redis.Redis()
ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])
rc.publish('foo', 'hello world')
for item in ps.listen():
if item['type'] == 'message':
print item['channel']
print item['data']
最后的for
部分会让程序停下来。我只是想检查某个频道有没有数据,我该怎么做呢?有没有类似check
的方法?
11 个回答
被接受的答案已经过时,因为redis-py建议你使用非阻塞的get_message()
。不过,它也提供了一种简单使用线程的方法。
https://pypi.python.org/pypi/redis
读取消息有三种不同的策略。
在后台,get_message()使用系统的‘select’模块快速检查连接的套接字。如果有数据可以读取,get_message()就会读取这些数据,格式化消息并返回,或者把它交给消息处理器。如果没有数据可读,get_message()会立即返回None。这使得它很容易集成到你应用程序的现有事件循环中。
while True:
message = p.get_message()
if message:
# do something with the message
time.sleep(0.001) # be nice to the system :)
旧版本的redis-py只能通过pubsub.listen()来读取消息。listen()是一个生成器,它会阻塞,直到有消息可用。如果你的应用程序只需要接收来自redis的消息并进行处理,listen()是一个简单的启动方式。
for message in p.listen():
# do something with the message
第三种选择是在一个单独的线程中运行事件循环。pubsub.run_in_thread()会创建一个新线程并启动事件循环。这个线程对象会返回给调用run_in_thread()的地方。调用者可以使用thread.stop()方法来关闭事件循环和线程。在后台,这实际上是对get_message()的一个包装,它在一个单独的线程中运行,基本上为你创建了一个小型的非阻塞事件循环。run_in_thread()接受一个可选的sleep_time参数。如果指定了这个参数,事件循环会在每次循环中调用time.sleep(),使用这个值。
注意:因为我们是在一个单独的线程中运行,所以无法处理那些没有注册消息处理器的消息。因此,如果你订阅了没有消息处理器的模式或频道,redis-py会阻止你调用run_in_thread()。
p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()
所以,回答你的问题,只需检查get_message,看看消息是否已经到达。
如果你在考虑非阻塞的异步处理,那你可能正在使用(或者应该使用)异步框架或服务器。
如果你在用 Tornado,那么可以看看 Tornado-Redis。它使用了Tornado的原生生成器调用。它的 Websocket示例 展示了如何和发布/订阅功能一起使用。
看起来你也可以把Redis-py和 Gevent 结合使用,没问题,只需要使用 Gevent的猴子补丁(
gevent.monkey.patch_all()
)。
更新: 自从最初的回答已经过去5年,这期间Python增加了 原生的异步IO支持。现在有了 AIORedis,一个异步IO的Redis客户端。
我觉得这可能不太行。一个频道没有“当前数据”,你是订阅了这个频道,然后开始接收其他客户端在这个频道上推送的消息,所以这是一种阻塞的API。如果你查看Redis的命令文档,关于发布/订阅的内容会更清楚一些。