在Tornado中如何正确处理Redis连接?(异步 - 发布/订阅)
我在用Redis和我的Tornado应用程序,使用的是异步客户端Brukva。当我查看Brukva网站上的示例应用时,发现他们在websocket的init方法中创建了新的连接。
class MessagesCatcher(tornado.websocket.WebSocketHandler):
def __init__(self, *args, **kwargs):
super(MessagesCatcher, self).__init__(*args, **kwargs)
self.client = brukva.Client()
self.client.connect()
self.client.subscribe('test_channel')
def open(self):
self.client.listen(self.on_message)
def on_message(self, result):
self.write_message(str(result.body))
def close(self):
self.client.unsubscribe('test_channel')
self.client.disconnect()
在websocket的情况下这样做没问题,但在普通的Tornado RequestHandler的post方法中,比如说长轮询操作(发布-订阅模型),该怎么处理呢?我在每个更新处理器的post方法中都创建了新的客户端连接,这样做对吗?我在Redis控制台上看到每次新的post操作后,客户端的数量在增加。
这是我的代码示例。
c = brukva.Client(host = '127.0.0.1')
c.connect()
class MessageNewHandler(BaseHandler):
@tornado.web.authenticated
def post(self):
self.listing_id = self.get_argument("listing_id")
message = {
"id": str(uuid.uuid4()),
"from": str(self.get_secure_cookie("username")),
"body": str(self.get_argument("body")),
}
message["html"] = self.render_string("message.html", message=message)
if self.get_argument("next", None):
self.redirect(self.get_argument("next"))
else:
c.publish(self.listing_id, message)
logging.info("Writing message : " + json.dumps(message))
self.write(json.dumps(message))
class MessageUpdatesHandler(BaseHandler):
@tornado.web.authenticated
@tornado.web.asynchronous
def post(self):
self.listing_id = self.get_argument("listing_id", None)
self.client = brukva.Client()
self.client.connect()
self.client.subscribe(self.listing_id)
self.client.listen(self.on_new_messages)
def on_new_messages(self, messages):
# Closed client connection
if self.request.connection.stream.closed():
return
logging.info("Getting update : " + json.dumps(messages.body))
self.finish(json.dumps(messages.body))
self.client.unsubscribe(self.listing_id)
def on_connection_close(self):
# unsubscribe user from channel
self.client.unsubscribe(self.listing_id)
self.client.disconnect()
如果你能提供一些类似情况的示例代码,我会非常感激。
2 个回答
你应该在你的应用程序中使用连接池。因为看起来 brukva 这个库并不自动支持这个功能(redis-py 是支持的,但由于它是阻塞的,所以和 tornado 不太兼容),所以你需要自己写一个连接池。
这个模式其实很简单,大致可以这样理解(这不是实际可运行的代码):
class BrukvaPool():
__conns = {}
def get(host, port,db):
''' Get a client for host, port, db '''
key = "%s:%s:%s" % (host, port, db)
conns = self.__conns.get(key, [])
if conns:
ret = conns.pop()
return ret
else:
## Init brukva client here and connect it
def release(client):
''' release a client at the end of a request '''
key = "%s:%s:%s" % (client.connection.host, client.connection.port, client.connection.db)
self.__conns.setdefault(key, []).append(client)
虽然可能会稍微复杂一些,但这就是主要的思路。
虽然有点晚,但我一直在使用 tornado-redis。它可以和tornado的ioloop以及tornado.gen
模块一起使用。
安装tornadoredis
你可以通过pip来安装它。
pip install tornadoredis
或者使用setuptools来安装。
easy_install tornadoredis
不过其实不太推荐这样做。你也可以直接克隆这个库的代码,然后解压出来。接着运行:
python setup.py build
python setup.py install
连接到redis
下面的代码需要放在你的main.py文件或者相应的文件里:
redis_conn = tornadoredis.Client('hostname', 'port')
redis_conn.connect()
redis.connect这个方法只需要调用一次。它是一个阻塞调用,所以应该在启动主ioloop之前调用。所有的处理程序都会共享同一个连接对象。
你可以把它添加到你的应用设置里,比如:
settings = {
redis = redis_conn
}
app = tornado.web.Application([('/.*', Handler),],
**settings)
使用tornadoredis
在处理程序中可以通过self.settings['redis']
来使用这个连接,或者可以把它作为BaseHandler类的一个属性来添加。你的请求处理程序会继承这个类,并访问这个属性。
class BaseHandler(tornado.web.RequestHandler):
@property
def redis():
return self.settings['redis']
要和redis进行通信,可以使用tornado.web.asynchronous
和tornado.gen.engine
这两个装饰器。
class SomeHandler(BaseHandler):
@tornado.web.asynchronous
@tornado.gen.engine
def get(self):
foo = yield gen.Task(self.redis.get, 'foo')
self.render('sometemplate.html', {'foo': foo}
额外信息
更多的例子和其他功能,比如连接池和管道,可以在github的库里找到。