从工作进程访问数据库
我正在尝试用Python、Redis和PostgreSQL来处理需要访问数据库的任务。我的做法是:
使用RQ把任务放到Redis队列里:
def queue_data(self, json_data): Queue().enqueue(process_data, json.dumps(json_data))
在
process_data
里,使用psycopg2对PostgreSQL数据库进行查询:def process_data(json_data): with psycopg2.connect("dbname=pgtest2db user=pgtest2user") as conn: with conn.cursor() as cursor: # I'm not actually doing this query, but you get the idea cursor.execute("SELECT * FROM some_table") conn.close()
显然,这样做并不是最优的,因为每次调用process_data
时都会创建一个新的连接。有什么好的方法来解决这个问题呢?谁来负责管理数据库连接池呢?
需要注意的是,我强迫自己不使用ORM,因为我这样做是为了自学,并且我想从更纯粹的角度理解这些模式。
编辑
最后我使用了一个自定义的工作者,像这样:
import os
import node
import redis
from rq import Worker, Queue, Connection
from psycopg2.pool import ThreadedConnectionPool
listen = ['high', 'default', 'low']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
redis_conn = redis.from_url(redis_url)
pool = ThreadedConnectionPool(minconn = 1, maxconn = 10, dsn = "dbname=pgtest2db user=pgtest2user")
def process_data(json_data):
dbconn = pool.getconn()
result = perform_some_db_actions(dbconn, json_data)
pool.putconn(dbconn)
return result
if __name__ == '__main__':
with Connection(redis_conn):
print "Setting up Redis"
worker = Worker(map(Queue, listen))
worker.work()
1 个回答
0
我知道现在说这些可能有点晚了,但对于那些想知道如何在RQ中使用psycopg2的人(特别是当你遇到SSL错误,比如解密失败或记录MAC错误时),建议你在工作进程内部打开连接(也就是在任务里)。最好是创建一个单例连接,而不是每次都打开新的连接。记得在worker.work()函数之前不要打开连接。