Python中多进程的内存错误

14 投票
1 回答
19936 浏览
提问于 2025-04-18 18:05

我正在用Python进行一些复杂的科学计算。我需要读取存储在csv文件中的一堆数据并进行处理。因为每个处理过程都很耗时,而我有8个处理器可以使用,所以我尝试使用Multiprocessing里的Pool方法。

这是我设置多进程调用的方式:

    pool = Pool()
    vector_components = []
    for sample in range(samples):
        vector_field_x_i = vector_field_samples_x[sample]
        vector_field_y_i = vector_field_samples_y[sample]
        vector_component = pool.apply_async(vector_field_decomposer, args=(x_dim, y_dim, x_steps, y_steps,
                                                                           vector_field_x_i, vector_field_y_i))
        vector_components.append(vector_component)
    pool.close()
    pool.join()

    vector_components = map(lambda k: k.get(), vector_components)

    for vector_component in vector_components:
        CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')

我运行的数据集包含500个样本,每个样本的大小是100(x_dim)乘以100(y_dim)。

到目前为止,一切都运行得很好。

然后我收到了一个包含500个样本的数据集,每个样本的大小是400乘以400。

在运行这个数据集时,我在调用get时遇到了错误。

我还尝试运行一个单独的400乘以400的样本,也遇到了同样的错误。

Traceback (most recent call last):
  File "__init__.py", line 33, in <module>
    VfD.samples_vector_field_decomposer(samples, x_dim, y_dim, x_steps, y_steps, vector_field_samples_x, vector_field_samples_y)
  File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in samples_vector_field_decomposer
    vector_components = map(lambda k: k.get(), vector_components)
  File "/export/home/pceccon/VectorFieldDecomposer/Sources/Controllers/VectorFieldDecomposerController.py", line 43, in <lambda>
    vector_components = map(lambda k: k.get(), vector_components)
  File "/export/home/pceccon/.pyenv/versions/2.7.5/lib/python2.7/multiprocessing/pool.py", line 554, in get
    raise self._value
MemoryError

我该怎么办呢?

提前谢谢你。

1 个回答

11

现在你在内存中保存了几个列表——vector_field_xvector_field_yvector_components,还有在调用map时的一个单独的vector_components副本(这就是你内存不足的原因)。你可以通过使用pool.imap来避免需要这个vector_components的副本,而不是使用pool.apply_async和手动创建的列表。imap返回的是一个迭代器,而不是一个完整的列表,这样你就不会把所有结果都放在内存里。

通常情况下,pool.map会把传给它的可迭代对象分成几块,然后把这些块发送给子进程,而不是一个一个地发送。这有助于提高性能。因为imap使用的是迭代器而不是列表,它不知道你传给它的可迭代对象的完整大小。没有这个大小,它就不知道每一块应该有多大,所以默认的chunksize是1,这样虽然可以工作,但性能可能不太好。为了避免这种情况,你可以给它提供一个合适的chunksize参数,因为你知道可迭代对象的长度是sample个元素。对于你500个元素的列表,这可能影响不大,但值得尝试一下。

下面是一些示例代码,演示了这一切:

import multiprocessing
from functools import partial


def vector_field_decomposer(x_dim, y_dim, x_steps, y_steps, vector_fields):
    vector_field_x_i = vector_fields[0]
    vector_field_y_i = vector_fields[1]
    # Do whatever is normally done here.


if __name__ == "__main__":
    num_workers = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(num_workers)
    # Calculate a good chunksize (based on implementation of pool.map)
    chunksize, extra = divmod(samples // 4 * num_workers)
    if extra:
        chunksize += 1

    # Use partial so many arguments can be passed to vector_field_decomposer
    func = partial(vector_field_decomposer, x_dim, y_dim, x_steps, y_steps)
    # We use a generator expression as an iterable, so we don't create a full list.
    results = pool.imap(func, 
                        ((vector_field_samples_x[s], vector_field_samples_y[s]) for s in xrange(samples)),
                        chunksize=chunksize)
    for vector in results:
        CsvH.write_vector_field(vector_component, 
                                '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
    pool.close()
    pool.join()

应该能让你避免MemoryError的问题,但如果不行,你可以尝试对你的总样本进行更小块的imap处理,进行多次处理。不过我觉得你应该不会有问题,因为除了最开始的vector_field_*列表外,你没有创建其他额外的列表。

撰写回答