从工作进程访问数据库

8 投票
1 回答
3066 浏览
提问于 2025-04-18 05:52

我正在尝试用Python、Redis和PostgreSQL来处理需要访问数据库的任务。我的做法是:

  1. 使用RQ把任务放到Redis队列里:

    def queue_data(self, json_data):
        Queue().enqueue(process_data, json.dumps(json_data))
    
  2. process_data里,使用psycopg2对PostgreSQL数据库进行查询:

    def process_data(json_data):
        with psycopg2.connect("dbname=pgtest2db user=pgtest2user") as conn:
            with conn.cursor() as cursor:
                # I'm not actually doing this query, but you get the idea
                cursor.execute("SELECT * FROM some_table")
        conn.close()
    

显然,这样做并不是最优的,因为每次调用process_data时都会创建一个新的连接。有什么好的方法来解决这个问题呢?谁来负责管理数据库连接池呢?

需要注意的是,我强迫自己不使用ORM,因为我这样做是为了自学,并且我想从更纯粹的角度理解这些模式。

编辑

最后我使用了一个自定义的工作者,像这样:

import os
import node

import redis
from rq import Worker, Queue, Connection
from psycopg2.pool      import ThreadedConnectionPool

listen = ['high', 'default', 'low']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
redis_conn = redis.from_url(redis_url)
pool = ThreadedConnectionPool(minconn = 1, maxconn = 10, dsn = "dbname=pgtest2db user=pgtest2user")

def process_data(json_data):
    dbconn = pool.getconn()
    result = perform_some_db_actions(dbconn, json_data)
    pool.putconn(dbconn)
    return result

if __name__ == '__main__':
    with Connection(redis_conn):
        print "Setting up Redis"
        worker = Worker(map(Queue, listen))
        worker.work()

1 个回答

0

我知道现在说这些可能有点晚了,但对于那些想知道如何在RQ中使用psycopg2的人(特别是当你遇到SSL错误,比如解密失败或记录MAC错误时),建议你在工作进程内部打开连接(也就是在任务里)。最好是创建一个单例连接,而不是每次都打开新的连接。记得在worker.work()函数之前不要打开连接。

撰写回答