Python多进程作业转Celery任务时出现AttributeError

3 投票
2 回答
3964 浏览
提问于 2025-04-18 00:04

我写了一个可以使用多进程的函数,代码如下:

import multiprocessing
import pandas as pd
import numpy as np

def _apply_df(args):
    df, func, kwargs = args
    return df.apply(func, **kwargs)

def apply_by_multiprocessing(df, func, **kwargs):
    workers = kwargs.pop('workers')
    pool = multiprocessing.Pool(processes=workers)
    result = pool.map(_apply_df, [(d, func, kwargs)
            for d in np.array_split(df, workers)])
    pool.close()
    return pd.concat(list(result))

def square(x):
    return x**x

if __name__ == '__main__':
    df = pd.DataFrame({'a':range(10), 'b':range(10)})
    apply_by_multiprocessing(df, square, axis=1, workers=4)  
    ## run by 4 processors

上面的“apply_by_multiprocessing”可以让Pandas的DataFrame在多个进程中同时运行。但是当我把它变成Celery任务时,出现了一个错误:AssertionError: 'Worker'对象没有'_config'这个属性。

from celery import shared_task

@shared_task
def my_multiple_job():
    df = pd.DataFrame({'a':range(10), 'b':range(10)})
    apply_by_multiprocessing(df, square, axis=1, workers=4)  

这个错误的详细信息如下:

  File "/Users/yong27/work/goldstar/kinmatch/utils.py", line 14, in apply_by_multiprocessing
    pool = multiprocessing.Pool(processes=workers)
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 118, in Pool
    context=self.get_context())
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 146, in __init__
    self._setup_queues()
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 238, in _setup_queues
    self._inqueue = self._ctx.SimpleQueue()
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__
    self._rlock = ctx.Lock()
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock
    return Lock(ctx=self.get_context())
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 164, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 60, in __init__
    kind, value, maxvalue, self._make_name(),
  File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 118, in _make_name
    return '%s-%s' % (process.current_process()._config['semprefix'],
AttributeError: 'Worker' object has no attribute '_config'

看起来是因为Celery的工作进程不是一个普通的进程。我该怎么解决这个问题呢?我使用的是Python3.4,Django 1.6.2,celery 3.1.10,django-celery 3.1.9,pandas 0.12.0。

2 个回答

2

我不知道为什么多进程不管用,但我建议你使用celery的组任务。

from celery import task, group

def feeds_fetch(feeds):
    g = group(fetch_one.s(feed) for feed in feeds)
    g.apply_async()


@task()
def fetch_one(feed):
    return feed.fetch()
4

这个问题在另一个问题中有个不错的答案

基本上,这是一个Celery的已知问题,并且提供了一个临时解决办法:这个办法对我有效,我只是把以下代码添加到了定义任务的同一个文件中:

from celery.signals import worker_process_init
from multiprocessing import current_process

@worker_process_init.connect
def fix_multiprocessing(**kwargs):
    try:
        current_process()._config
    except AttributeError:
        current_process()._config = {'semprefix': '/mp'}

撰写回答