非阻塞的Redis发布/订阅可能吗?

35 投票
11 回答
46670 浏览
提问于 2025-04-17 04:52

我想用Redis的发布/订阅功能来传递一些消息,但不想像下面的代码那样被阻塞:

import redis
rc = redis.Redis()

ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])

rc.publish('foo', 'hello world')

for item in ps.listen():
    if item['type'] == 'message':
        print item['channel']
        print item['data']

最后的for部分会让程序停下来。我只是想检查某个频道有没有数据,我该怎么做呢?有没有类似check的方法?

11 个回答

28

被接受的答案已经过时,因为redis-py建议你使用非阻塞的get_message()。不过,它也提供了一种简单使用线程的方法。

https://pypi.python.org/pypi/redis

读取消息有三种不同的策略。

在后台,get_message()使用系统的‘select’模块快速检查连接的套接字。如果有数据可以读取,get_message()就会读取这些数据,格式化消息并返回,或者把它交给消息处理器。如果没有数据可读,get_message()会立即返回None。这使得它很容易集成到你应用程序的现有事件循环中。

 while True:
     message = p.get_message()
     if message:
         # do something with the message
     time.sleep(0.001)  # be nice to the system :)

旧版本的redis-py只能通过pubsub.listen()来读取消息。listen()是一个生成器,它会阻塞,直到有消息可用。如果你的应用程序只需要接收来自redis的消息并进行处理,listen()是一个简单的启动方式。

 for message in p.listen():
     # do something with the message

第三种选择是在一个单独的线程中运行事件循环。pubsub.run_in_thread()会创建一个新线程并启动事件循环。这个线程对象会返回给调用run_in_thread()的地方。调用者可以使用thread.stop()方法来关闭事件循环和线程。在后台,这实际上是对get_message()的一个包装,它在一个单独的线程中运行,基本上为你创建了一个小型的非阻塞事件循环。run_in_thread()接受一个可选的sleep_time参数。如果指定了这个参数,事件循环会在每次循环中调用time.sleep(),使用这个值。

注意:因为我们是在一个单独的线程中运行,所以无法处理那些没有注册消息处理器的消息。因此,如果你订阅了没有消息处理器的模式或频道,redis-py会阻止你调用run_in_thread()。

p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()

所以,回答你的问题,只需检查get_message,看看消息是否已经到达。

50

如果你在考虑非阻塞的异步处理,那你可能正在使用(或者应该使用)异步框架或服务器。

更新: 自从最初的回答已经过去5年,这期间Python增加了 原生的异步IO支持。现在有了 AIORedis,一个异步IO的Redis客户端

8

我觉得这可能不太行。一个频道没有“当前数据”,你是订阅了这个频道,然后开始接收其他客户端在这个频道上推送的消息,所以这是一种阻塞的API。如果你查看Redis的命令文档,关于发布/订阅的内容会更清楚一些。

撰写回答