在订阅过程中向Redis pub/sub添加额外频道
可以在Redis连接中添加额外的订阅吗?我有一个监听线程,但似乎没有受到新的SUBSCRIBE命令的影响。
如果这是预期的行为,那么如果用户添加股票行情订阅或者加入聊天室,应该使用什么样的模式呢?
我想实现一个类似于下面的Python类:
import threading
import redis
class RedisPubSub(object):
def __init__(self):
self._redis_pub = redis.Redis(host='localhost', port=6379, db=0)
self._redis_sub = redis.Redis(host='localhost', port=6379, db=0)
self._sub_thread = threading.Thread(target=self._listen)
self._sub_thread.setDaemon(True)
self._sub_thread.start()
def publish(self, channel, message):
self._redis_pub.publish(channel, message)
def subscribe(self, channel):
self._redis_sub.subscribe(channel)
def _listen(self):
for message in self._redis_sub.listen():
print message
2 个回答
异步方式:
示例代码(订阅):
import txredisapi as redis
from twisted.application import internet
from twisted.application import service
class myProtocol(redis.SubscriberProtocol):
def connectionMade(self):
print "waiting for messages..."
print "use the redis client to send messages:"
print "$ redis-cli publish chat test"
print "$ redis-cli publish foo.bar hello world"
self.subscribe("chat")
self.psubscribe("foo.*")
reactor.callLater(10, self.unsubscribe, "chat")
reactor.callLater(15, self.punsubscribe, "foo.*")
# self.continueTrying = False
# self.transport.loseConnection()
def messageReceived(self, pattern, channel, message):
print "pattern=%s, channel=%s message=%s" % (pattern, channel, message)
def connectionLost(self, reason):
print "lost connection:", reason
class myFactory(redis.SubscriberFactory):
# SubscriberFactory is a wapper for the ReconnectingClientFactory
maxDelay = 120
continueTrying = True
protocol = myProtocol
application = service.Application("subscriber")
srv = internet.TCPClient("127.0.0.1", 6379, myFactory())
srv.setServiceParent(application)
只用一个线程,没烦恼 :)
当然,这要看你在写什么样的应用。如果是网络相关的,推荐用Twisted。
python-redis
中的 Redis
和 ConnectionPool
类是从 threading.local
这个类继承而来的,这就是你看到的“神奇”效果的原因。
总结:你的主线程和工作线程的 self._redis_sub
客户端使用了两个不同的连接到服务器,但只有主线程的连接执行了订阅命令。
详细信息:因为主线程创建了 self._redis_sub
,所以这个客户端被放到了主线程的本地存储中。接下来,我猜主线程会调用 client.subscribe(channel)
。现在主线程的客户端在连接1上已经订阅了。然后你启动了 self._sub_thread
工作线程,这个线程有自己的 self._redis_sub
属性,并且被设置为一个新的 redis.Client 实例,这样就建立了一个新的连接池,并与 redis 服务器建立了新的连接。
这个新的连接还没有订阅你的频道,所以 listen()
立刻返回了。因此,在 python-redis
中,你不能在不同的线程之间传递已经建立的连接和未完成的订阅(或其他有状态的命令)。
根据你打算如何实现你的应用程序,你可能需要换一个不同的客户端,或者想出其他方法来将订阅状态传递给工作线程,比如通过队列发送订阅命令。
还有一个问题是 python-redis
使用的是阻塞套接字,这会导致你的监听线程在等待消息时无法做其他工作,并且它不能在接收到消息后立即表示希望取消订阅。