在子进程中运行进程池时出现Python多进程错误
我有一个项目,需要启动4个进程来查询不同的数据库。
self.counter_puller.start()
self.counter_puller.join()
这些操作是在主程序所在的文件里进行的。现在在Counter_puller
这个进程里,我调用了calculate_kpis
,这个方法属于一个叫做Calculator(object)的类。
class CountersPuller(Process):
_db = None
_logger = None
_sites_info = None
def __init__(self, config, output_dir, area, level):
Process.__init__(self)
...
calculator.calculate_kpis(self._pulled_counters)`
在这个方法里,我使用了进程池:
with Pool(globals.NUM_KPI_WORKERS) as exe:
results = exe.starmap(worker_calcuiating_kpi, zip(self._kpis, counters))
for res in results:
calculated_kpis[res.columns[0]] = res.values
当我通过单元测试来测试calculate_kpis()
这部分代码时,一切都运行得很好。但是当我运行整个程序时,在初始化进程池的时候却出现了错误。
Process NrPmDataCountersPuller-1:
Traceback (most recent call last):
File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/var/www/html/kpi-analyzer/src/dbAggregator/counters_puller.py", line 135, in run
calculator.calculate_kpis(self._pulled_counters)
File "/var/www/html/kpi-analyzer/src/dbAggregator/kpi_calculator.py", line 268, in calculate_kpis
self._generate_and_save_kpi_carrier_level(input_data)
File "/var/www/html/kpi-analyzer/src/dbAggregator/kpi_calculator.py", line 280, in _generate_and_save_kpi_carrier_level
calculated_kpis = self.__calculate_carrier_level(input_data)
File "/var/www/html/kpi-analyzer/src/dbAggregator/kpi_calculator.py", line 349, in __calculate_carrier_level
with Pool(globals.NUM_KPI_WORKERS) as exe:
File "/usr/lib/python3.10/multiprocessing/context.py", line 119, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild,
File "/usr/lib/python3.10/multiprocessing/pool.py", line 191, in __init__
self._setup_queues()
File "/usr/lib/python3.10/multiprocessing/pool.py", line 346, in _setup_queues
self._inqueue = self._ctx.SimpleQueue()
File "/usr/lib/python3.10/multiprocessing/context.py", line 113, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/usr/lib/python3.10/multiprocessing/queues.py", line 341, in __init__
self._rlock = ctx.Lock()
File "/usr/lib/python3.10/multiprocessing/context.py", line 68, in Lock
return Lock(ctx=self.get_context())
File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 162, in __init__
SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 58, in __init__
kind, value, maxvalue, self._make_name(),
File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 116, in _make_name
return '%s-%s' % (process.current_process()._config['semprefix'],
TypeError: 'Config' object is not subscriptable
如果有任何提示可以帮助我解决这个问题,我将非常感激。
1 个回答
1
在你的 CountersPuller
类里,有一个叫 _config
的成员,这个名字和 Python 的 Process
类里的 _config
成员冲突了。
为了避免这种冲突,最好不要在这个类里使用 _config
这个名字,或者干脆不使用继承,直接把配置作为参数传给你的函数。
你可以用下面的代码轻松复现这个问题:
import multiprocessing
class MyProcess(multiprocessing.Process):
def __init__(self):
super().__init__()
self._config = None
def run(self):
with multiprocessing.Pool() as p:
pass
if __name__ == "__main__":
proc = MyProcess()
proc.start()
proc.join()
这也是为什么我们更倾向于使用组合而不是继承的原因之一,下面是如何在不使用继承的情况下做到这一点。
import multiprocessing
class MyProcess:
def __init__(self):
self._config = "hello"
def do_something(self):
print(self._config)
with multiprocessing.Pool() as p:
pass
if __name__ == "__main__":
proc_config = MyProcess()
proc = multiprocessing.Process(target=proc_config.do_something)
proc.start()
proc.join()