在订阅过程中向Redis pub/sub添加额外频道

4 投票
2 回答
5647 浏览
提问于 2025-04-16 13:41

可以在Redis连接中添加额外的订阅吗?我有一个监听线程,但似乎没有受到新的SUBSCRIBE命令的影响。

如果这是预期的行为,那么如果用户添加股票行情订阅或者加入聊天室,应该使用什么样的模式呢?

我想实现一个类似于下面的Python类:

import threading
import redis

class RedisPubSub(object):
    def __init__(self):
        self._redis_pub = redis.Redis(host='localhost', port=6379, db=0)        
        self._redis_sub = redis.Redis(host='localhost', port=6379, db=0)        
        self._sub_thread = threading.Thread(target=self._listen)
        self._sub_thread.setDaemon(True)
        self._sub_thread.start()

    def publish(self, channel, message):
        self._redis_pub.publish(channel, message)

    def subscribe(self, channel):
        self._redis_sub.subscribe(channel)

    def _listen(self):
        for message in self._redis_sub.listen():
            print message

2 个回答

1

异步方式:

Twisted框架和插件txredisapi

示例代码(订阅):

import txredisapi as redis

from twisted.application import internet
from twisted.application import service


class myProtocol(redis.SubscriberProtocol):
    def connectionMade(self):
        print "waiting for messages..."
        print "use the redis client to send messages:"
        print "$ redis-cli publish chat test"
        print "$ redis-cli publish foo.bar hello world"
        self.subscribe("chat")
        self.psubscribe("foo.*")


        reactor.callLater(10, self.unsubscribe, "chat")
        reactor.callLater(15, self.punsubscribe, "foo.*")

        # self.continueTrying = False
        # self.transport.loseConnection()

    def messageReceived(self, pattern, channel, message):
        print "pattern=%s, channel=%s message=%s" % (pattern, channel, message)

    def connectionLost(self, reason):
        print "lost connection:", reason


class myFactory(redis.SubscriberFactory):
    # SubscriberFactory is a wapper for the ReconnectingClientFactory
    maxDelay = 120
    continueTrying = True
    protocol = myProtocol


application = service.Application("subscriber")
srv = internet.TCPClient("127.0.0.1", 6379, myFactory())
srv.setServiceParent(application)

只用一个线程,没烦恼 :)

当然,这要看你在写什么样的应用。如果是网络相关的,推荐用Twisted。

6

python-redis中的 RedisConnectionPool 类是从 threading.local 这个类继承而来的,这就是你看到的“神奇”效果的原因。

总结:你的主线程和工作线程的 self._redis_sub 客户端使用了两个不同的连接到服务器,但只有主线程的连接执行了订阅命令。

详细信息:因为主线程创建了 self._redis_sub,所以这个客户端被放到了主线程的本地存储中。接下来,我猜主线程会调用 client.subscribe(channel)。现在主线程的客户端在连接1上已经订阅了。然后你启动了 self._sub_thread 工作线程,这个线程有自己的 self._redis_sub 属性,并且被设置为一个新的 redis.Client 实例,这样就建立了一个新的连接池,并与 redis 服务器建立了新的连接。

这个新的连接还没有订阅你的频道,所以 listen() 立刻返回了。因此,在 python-redis 中,你不能在不同的线程之间传递已经建立的连接和未完成的订阅(或其他有状态的命令)。

根据你打算如何实现你的应用程序,你可能需要换一个不同的客户端,或者想出其他方法来将订阅状态传递给工作线程,比如通过队列发送订阅命令。

还有一个问题是 python-redis 使用的是阻塞套接字,这会导致你的监听线程在等待消息时无法做其他工作,并且它不能在接收到消息后立即表示希望取消订阅。

撰写回答