Celery Worker 数据库连接池管理
我正在使用独立的Celery(不是在Django里面)。我计划在多台物理机器上运行一种工作任务。这个任务的主要步骤如下:
- 接收一个XML文档。
- 对它进行转换。
- 进行多次的数据库读取和写入。
我使用的是PostgreSQL,但这同样适用于其他需要连接的数据库类型。过去,我使用过数据库连接池,这样可以避免每次请求都创建新的数据库连接,或者避免连接保持打开太久。但是,由于每个Celery工作者都是在独立的进程中运行的,我不太确定它们是如何共享这个连接池的。我是不是漏掉了什么?我知道Celery允许你保存从工作者返回的结果,但这不是我想要做的。每个任务可以根据处理的数据进行多种不同的更新或插入。
在Celery工作者中,正确访问数据库的方法是什么?
是否可以在多个工作者/任务之间共享一个连接池,或者有没有其他方法可以做到这一点?
6 个回答
你可以在你的celery配置中改变默认的行为,让它使用线程工作者,而不是每个进程一个工作者:
CELERYD_POOL = "celery.concurrency.threads.TaskPool"
然后你可以在你的任务实例中保存这个共享的线程池,并在每次调用线程任务时引用它。
每个工作进程都应该有一个数据库连接。因为 Celery 本身会管理一组工作进程,所以你的数据库连接数量总是和 Celery 的工作进程数量相等。
不过,这样做有个缺点,就是会把数据库连接的管理和 Celery 的工作进程管理绑在一起。但这没关系,因为在一个进程中,GIL(全局解释器锁)只允许同时运行一个线程。
我喜欢tigeronk2提到的每个工作者一个连接的想法。正如他说的,Celery自己有一套工作者池,所以其实不需要单独再搞一个数据库连接池。Celery信号文档里解释了如何在创建工作者时进行自定义初始化,所以我在我的tasks.py里加了以下代码,结果效果跟我预期的一样好。我甚至能在工作者关闭时关闭连接:
from celery.signals import worker_process_init, worker_process_shutdown
db_conn = None
@worker_process_init.connect
def init_worker(**kwargs):
global db_conn
print('Initializing database connection for worker.')
db_conn = db.connect(DB_CONNECT_STRING)
@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
global db_conn
if db_conn:
print('Closing database connectionn for worker.')
db_conn.close()