在子进程中运行进程池时出现Python多进程错误

0 投票
1 回答
64 浏览
提问于 2025-04-12 13:32

我有一个项目,需要启动4个进程来查询不同的数据库。

self.counter_puller.start() 
self.counter_puller.join()

这些操作是在主程序所在的文件里进行的。现在在Counter_puller这个进程里,我调用了calculate_kpis,这个方法属于一个叫做Calculator(object)的类。

class CountersPuller(Process):
    _db = None 
    _logger = None 
    _sites_info = None

def __init__(self, config, output_dir, area, level):
    Process.__init__(self)
...
calculator.calculate_kpis(self._pulled_counters)`

在这个方法里,我使用了进程池:

with Pool(globals.NUM_KPI_WORKERS) as exe: 
   results = exe.starmap(worker_calcuiating_kpi, zip(self._kpis, counters))   
for res in results:        
    calculated_kpis[res.columns[0]] = res.values

当我通过单元测试来测试calculate_kpis()这部分代码时,一切都运行得很好。但是当我运行整个程序时,在初始化进程池的时候却出现了错误。

Process NrPmDataCountersPuller-1:
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/var/www/html/kpi-analyzer/src/dbAggregator/counters_puller.py", line 135, in run
    calculator.calculate_kpis(self._pulled_counters)
  File "/var/www/html/kpi-analyzer/src/dbAggregator/kpi_calculator.py", line 268, in calculate_kpis
    self._generate_and_save_kpi_carrier_level(input_data)
  File "/var/www/html/kpi-analyzer/src/dbAggregator/kpi_calculator.py", line 280, in _generate_and_save_kpi_carrier_level
    calculated_kpis = self.__calculate_carrier_level(input_data)
  File "/var/www/html/kpi-analyzer/src/dbAggregator/kpi_calculator.py", line 349, in __calculate_carrier_level
    with Pool(globals.NUM_KPI_WORKERS) as exe:
  File "/usr/lib/python3.10/multiprocessing/context.py", line 119, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 191, in __init__
    self._setup_queues()
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 346, in _setup_queues
    self._inqueue = self._ctx.SimpleQueue()
  File "/usr/lib/python3.10/multiprocessing/context.py", line 113, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 341, in __init__
    self._rlock = ctx.Lock()
  File "/usr/lib/python3.10/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
  File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 162, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 58, in __init__
    kind, value, maxvalue, self._make_name(),
  File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 116, in _make_name
    return '%s-%s' % (process.current_process()._config['semprefix'],
TypeError: 'Config' object is not subscriptable

如果有任何提示可以帮助我解决这个问题,我将非常感激。

1 个回答

1

在你的 CountersPuller 类里,有一个叫 _config 的成员,这个名字和 Python 的 Process 类里的 _config 成员冲突了。

为了避免这种冲突,最好不要在这个类里使用 _config 这个名字,或者干脆不使用继承,直接把配置作为参数传给你的函数。

你可以用下面的代码轻松复现这个问题:

import multiprocessing

class MyProcess(multiprocessing.Process):

    def __init__(self):
        super().__init__()
        self._config = None

    def run(self):
        with multiprocessing.Pool() as p:
            pass

if __name__ == "__main__":
    proc = MyProcess()
    proc.start()
    proc.join()

这也是为什么我们更倾向于使用组合而不是继承的原因之一,下面是如何在不使用继承的情况下做到这一点。

import multiprocessing

class MyProcess:

    def __init__(self):
        self._config = "hello"

    def do_something(self):
        print(self._config)
        with multiprocessing.Pool() as p:
            pass

if __name__ == "__main__":
    proc_config = MyProcess()
    proc = multiprocessing.Process(target=proc_config.do_something)
    proc.start()
    proc.join()

撰写回答