Celery:每个进程正确运行耗时初始化函数的方法
简要说明:
如果你想在celery生成的每个进程中运行一个初始化函数,可以使用worker_process_init
这个信号。根据文档,这个信号的处理函数不能阻塞超过4秒钟。但是,如果我需要运行一个执行时间超过4秒的初始化函数,该怎么办呢?
问题:
我使用一个C语言扩展模块在celery任务中执行某些操作。这个模块的初始化可能需要几秒钟(大约4到10秒)。我希望这个初始化函数只在每个进程启动时运行,而不是每个任务都运行,所以我用了worker_process_init
信号:
#lib.py
import isclient #c extension module
client = None
def init():
global client
client = isclient.Client() #this might take a while
def create_ne_list(text):
return client.ne_receiventities4datachunk(text)
#celery.py
from celery import Celery
from celery.signals import worker_process_init
from lib import init
celery = Celery(include=[
'isc.ne.tasks'
])
celery.config_from_object('celeryconfig')
@worker_process_init.connect
def process_init(sender=None, conf=None, **kwargs):
init()
if __name__ == '__main__':
celery.start()
#tasks.py
from celery import celery
from lib import create_ne_list as cnl
@celery.task(time_limit=1200)
def create_ne_list(text):
return cnl(text)
当我运行这段代码时,发生的情况和我之前提到的问题类似(Celery: 卡在无限重复的超时中(等待UP消息超时))。简单来说,由于我的初始化函数超过4秒,有时会导致一个工作进程被杀掉并重启,而在重启过程中又被杀掉,因为在4秒没有响应后会自动发生这种情况。这最终导致了一个无限重复的杀掉和重启的过程。
另一个选择是使用worker_init
信号,只在每个工作进程中运行一次初始化函数。如果这样做,我又遇到了另一个问题:排队的进程出于某种原因卡住了。当我以3个并发启动工作进程,并发送几个任务时,前面三个会完成,但后面的任务就没有被处理。(我猜这可能和client
对象需要在多个进程之间共享有关,而C扩展出于某种原因不支持这一点。不过说实话,我对多进程还比较陌生,所以只能猜测。)
问题:
所以,问题是:如何在每个进程中运行一个超过4秒的初始化函数?有没有正确的方法可以做到这一点?
2 个回答
@changhwan 的 回答 在 celery 4.4.0 版本之后不再是唯一的方法了。这里有一个 拉取请求,它为这个功能增加了配置选项。
使用配置选项
在 celery ^4.4.0
版本中,这个值是可以配置的。你可以使用 celery 应用的配置选项 worker_proc_alive_timeout
。来自 稳定版本文档 的说明:
worker_proc_alive_timeout
默认值:4.0。
这是在等待新工作进程启动时的超时时间,单位是秒(可以是整数或小数)。
示例:
from celery import Celery
from celery.signals import worker_process_init
app = Celery('app')
app.conf.worker_proc_alive_timeout = 10
@worker_process_init.connect
def long_init_function(*args, **kwargs):
import time
time.sleep(8)
Celery限制了进程初始化的超时时间为4秒。
你可以查看这个源代码来了解更多。
如果你想绕过这个限制,可以在创建celery应用之前考虑更改这个设置。
from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 10.0 #set this long enough
需要注意的是,没有任何配置或设置可以直接更改这个值。