Python同期期货:处理子进程中的异常

2024-04-27 22:12:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个非常简单的concurrent.futures.ProcessPoolExecutor实现——类似于(使用Python3.6):

files = get_files()
processor = get_processor_instance()
with concurrent.futures.ProcessPoolExecutor() as executor:
    list(executor.map(processor.process, files))

虽然processor是许多可用处理器类中任何一个的实例,但它们都共享process方法,大致如下所示:

^{pr2}$

_get_records_save_output方法的实现因类而异,但我的问题在于错误的处理。我故意测试它,以便这两个方法中的一个耗尽内存,但我希望上面的except块捕获它并移动下一个文件——这正是我在单个进程中运行代码时发生的情况。在

如果我如上所述使用ProcessPoolExecutor,则会引发BrokenProcessPool异常并终止所有执行:

Traceback (most recent call last):
  File "/vagrant/myapp/myapp.py", line 94, in _process
    list(executor.map(processor.process, files))
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

当然,我可以在调用代码中捕获BrokenProcessPool,但我更愿意在内部处理错误并继续处理下一个文件。在

我还尝试使用标准的multiprocessing.Pool对象,如下所示:

with multiprocessing.Pool() as pool:
    pool.map(processor.process, files)

在这种情况下,这种行为更为奇怪:在开始处理前两个文件(这会导致内存不足)之后,它会继续处理后面的文件,后者较小,因此可以完全处理。然而,except块显然永远不会被触发(没有日志消息,没有error_time),应用程序只是挂起,既不完成也不做任何事情,直到手动终止。在

我希望try..except块将使每个进程独立,在不影响主应用程序的情况下处理自己的错误。有什么办法吗?在


Tags: 文件inpypyenvhomegetubuntuline
1条回答
网友
1楼 · 发布于 2024-04-27 22:12:29

因此,经过大量的调试(并且由于@RomanPerekhrest建议检查executor对象),我已经找到了原因。如问题所述,测试数据由许多文件组成,其中两个文件相当大(每个文件超过100万行CSV)。这两个都导致了我的测试机器(2GB虚拟机)阻塞,但是以不同的方式,第一个更大,导致了一个常规的内存不足错误,这个错误将由except来处理,而第二个只是导致了sigkill。在没有进行过多研究的情况下,我怀疑较大的文件在读取时无法放入内存(在_get_records方法中完成),而较小的文件则可以,但随后对其进行操作(在_save_outputcaused the overflow中完成)并终止了进程。在

我的解决方案是简单地捕捉BrokenProcessPool异常并通知用户该问题;我还添加了一个选项,该选项在一个进程中运行处理任务,在这种情况下,任何太大的文件都会被简单地标记为有错误:

files = get_files()
processor = get_processor_instance()
results = []
if args.nonconcurrent:
    results = list(map(processor.process, files))
else:
    with concurrent.futures.ProcessPoolExecutor() as executor:
        try:
            results = list(executor.map(processor.process, files))
        except concurrent.futures.process.BrokenProcessPool as ex:
            raise MyCustomProcessingError(
                f"{ex} This might be caused by limited system resources. "
                "Try increasing system memory or disable concurrent processing "
                "using the  nonconcurrent option."
            )

相关问题 更多 >