Celery Worker 数据库连接池管理

61 投票
6 回答
32276 浏览
提问于 2025-04-17 13:40

我正在使用独立的Celery(不是在Django里面)。我计划在多台物理机器上运行一种工作任务。这个任务的主要步骤如下:

  1. 接收一个XML文档。
  2. 对它进行转换。
  3. 进行多次的数据库读取和写入。

我使用的是PostgreSQL,但这同样适用于其他需要连接的数据库类型。过去,我使用过数据库连接池,这样可以避免每次请求都创建新的数据库连接,或者避免连接保持打开太久。但是,由于每个Celery工作者都是在独立的进程中运行的,我不太确定它们是如何共享这个连接池的。我是不是漏掉了什么?我知道Celery允许你保存从工作者返回的结果,但这不是我想要做的。每个任务可以根据处理的数据进行多种不同的更新或插入。

在Celery工作者中,正确访问数据库的方法是什么?

是否可以在多个工作者/任务之间共享一个连接池,或者有没有其他方法可以做到这一点?

6 个回答

2

你可以在你的celery配置中改变默认的行为,让它使用线程工作者,而不是每个进程一个工作者:

CELERYD_POOL = "celery.concurrency.threads.TaskPool"

然后你可以在你的任务实例中保存这个共享的线程池,并在每次调用线程任务时引用它。

3

每个工作进程都应该有一个数据库连接。因为 Celery 本身会管理一组工作进程,所以你的数据库连接数量总是和 Celery 的工作进程数量相等。
不过,这样做有个缺点,就是会把数据库连接的管理和 Celery 的工作进程管理绑在一起。但这没关系,因为在一个进程中,GIL(全局解释器锁)只允许同时运行一个线程。

44

我喜欢tigeronk2提到的每个工作者一个连接的想法。正如他说的,Celery自己有一套工作者池,所以其实不需要单独再搞一个数据库连接池。Celery信号文档里解释了如何在创建工作者时进行自定义初始化,所以我在我的tasks.py里加了以下代码,结果效果跟我预期的一样好。我甚至能在工作者关闭时关闭连接:

from celery.signals import worker_process_init, worker_process_shutdown

db_conn = None

@worker_process_init.connect
def init_worker(**kwargs):
    global db_conn
    print('Initializing database connection for worker.')
    db_conn = db.connect(DB_CONNECT_STRING)


@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
    global db_conn
    if db_conn:
        print('Closing database connectionn for worker.')
        db_conn.close()

撰写回答