我以前使用过Joblib和Airflow,没有遇到过这个问题。我正在尝试通过气流运行作业,该气流使用Joblib运行并行计算。当气流作业启动时,我看到以下警告
UserWarning: Loky-backed parallel loops cannot be called in multiprocessing, setting n_jobs=1
将警告追溯到源代码,我在LokyBackend类的joblib包中看到以下函数触发(MultiprocessingBackend类中也有类似的逻辑)
def effective_n_jobs(self, n_jobs):
"""Determine the number of jobs which are going to run in parallel"""
if n_jobs == 0:
raise ValueError('n_jobs == 0 in Parallel has no meaning')
elif mp is None or n_jobs is None:
# multiprocessing is not available or disabled, fallback
# to sequential mode
return 1
elif mp.current_process().daemon:
# Daemonic processes cannot have children
if n_jobs != 1:
warnings.warn(
'Loky-backed parallel loops cannot be called in a'
' multiprocessing, setting n_jobs=1',
stacklevel=3)
return 1
问题是我以前在Joblib和Airflow中运行过类似的函数,但没有触发此条件以将n_jobs
设置为1。想知道这是否是某种类型的版本控制问题(使用Airflow 2.X和Joblib 1.X),或者Airflow中是否有可以解决此问题的设置。我查看了Joblib的旧版本,甚至降级到了Joblib 0.4.0,但这并没有解决任何问题。由于API、数据库连接等方面的差异,我更不愿意降级气流
编辑:
以下是我在Airflow中运行的代码:
def test_parallel():
out=joblib.Parallel(n_jobs=-1, backend="loky")(joblib.delayed(lambda a: a+1)(i) for i in range(20))
with DAG("test", default_args=DEFAULT_ARGS, schedule_interval="0 8 * * *",) as test:
run_test = PythonOperator(
task_id="test",
python_callable=test_parallel,
)
run_test
以及气流日志中的输出:
[2021-07-27 10:41:29,890] {logging_mixin.py:104} WARNING - /data01/code/virtualenv/alpha/lib/python3.8/site-packages/joblib/parallel.py:733 UserWarning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1
我通过{airflow tasks test run_test
我注意到您没有在代码底部调用
run_test
函数。这可能是问题的原因吗?更正版本:所以我解决了这个问题,从PythonOperator切换到BashOpertaor,并停止joblib,将CPU和线程的数量减少到1。我还遵循了来自here的指令,只是为了在代码执行后杀死守护进程,但您可以只等待300秒,这是进程终止的默认joblib超时
相关问题 更多 >
编程相关推荐