Celery:每个进程正确运行耗时初始化函数的方法

15 投票
2 回答
6685 浏览
提问于 2025-04-18 09:39

简要说明:

如果你想在celery生成的每个进程中运行一个初始化函数,可以使用worker_process_init这个信号。根据文档,这个信号的处理函数不能阻塞超过4秒钟。但是,如果我需要运行一个执行时间超过4秒的初始化函数,该怎么办呢?

问题:

我使用一个C语言扩展模块在celery任务中执行某些操作。这个模块的初始化可能需要几秒钟(大约4到10秒)。我希望这个初始化函数只在每个进程启动时运行,而不是每个任务都运行,所以我用了worker_process_init信号:

#lib.py 
import isclient #c extension module
client = None
def init():
    global client
    client = isclient.Client() #this might take a while

def create_ne_list(text):
    return client.ne_receiventities4datachunk(text)

#celery.py
from celery import Celery
from celery.signals import worker_process_init
from lib import init

celery = Celery(include=[
    'isc.ne.tasks'
])

celery.config_from_object('celeryconfig')

@worker_process_init.connect
def process_init(sender=None, conf=None, **kwargs):
    init()

if __name__ == '__main__':
    celery.start()

#tasks.py
from celery import celery
from lib import create_ne_list as cnl

@celery.task(time_limit=1200)
def create_ne_list(text):
    return cnl(text)

当我运行这段代码时,发生的情况和我之前提到的问题类似(Celery: 卡在无限重复的超时中(等待UP消息超时))。简单来说,由于我的初始化函数超过4秒,有时会导致一个工作进程被杀掉并重启,而在重启过程中又被杀掉,因为在4秒没有响应后会自动发生这种情况。这最终导致了一个无限重复的杀掉和重启的过程。

另一个选择是使用worker_init信号,只在每个工作进程中运行一次初始化函数。如果这样做,我又遇到了另一个问题:排队的进程出于某种原因卡住了。当我以3个并发启动工作进程,并发送几个任务时,前面三个会完成,但后面的任务就没有被处理。(我猜这可能和client对象需要在多个进程之间共享有关,而C扩展出于某种原因不支持这一点。不过说实话,我对多进程还比较陌生,所以只能猜测。)

问题:

所以,问题是:如何在每个进程中运行一个超过4秒的初始化函数?有没有正确的方法可以做到这一点?

2 个回答

8

@changhwan回答 在 celery 4.4.0 版本之后不再是唯一的方法了。这里有一个 拉取请求,它为这个功能增加了配置选项。

使用配置选项

在 celery ^4.4.0 版本中,这个值是可以配置的。你可以使用 celery 应用的配置选项 worker_proc_alive_timeout。来自 稳定版本文档 的说明:

worker_proc_alive_timeout

默认值:4.0。

这是在等待新工作进程启动时的超时时间,单位是秒(可以是整数或小数)。

示例:

from celery import Celery
from celery.signals import worker_process_init

app = Celery('app')
app.conf.worker_proc_alive_timeout = 10

@worker_process_init.connect
def long_init_function(*args, **kwargs):
   import time
   time.sleep(8)
13

Celery限制了进程初始化的超时时间为4秒。

你可以查看这个源代码来了解更多。

如果你想绕过这个限制,可以在创建celery应用之前考虑更改这个设置。

from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 10.0 #set this long enough

需要注意的是,没有任何配置或设置可以直接更改这个值。

撰写回答