Python多进程处理海量数据

3 投票
1 回答
2311 浏览
提问于 2025-04-17 23:00

我在这个网站上搜索过,但不太确定用什么词能找到相关的答案,如果这个问题重复了,我先说声抱歉。

我需要处理一个非常非常大的矩阵(1400万 * 25万),想利用Python的多进程模块来加快处理速度。对于矩阵中的每一对列,我需要应用一个函数,然后把结果存储在一个专有的类里。

我会实现一个双重循环,以便得到所需的列组合。

我不想一次性给250,000个任务,这样会占用太多内存。理想情况下,我想先处理一列,然后把任务分配给进程池。比如说,进程1处理列A和列B,函数F会处理A和B,然后把结果存储在类G的G[A,B]中;进程2处理列A和列C,过程类似。

这些进程不会访问G的同一个元素。

所以我想在每处理N个任务时暂停一下循环。G的设置和获取方法会被重写,以执行一些后台任务。

我不太明白的是,暂停循环是否真的有必要?也就是说,Python是否足够聪明,只会处理它能处理的任务?还是说它会填充大量的任务?

最后,我不太清楚结果是怎么工作的。我只想把结果放在G里,不需要返回任何东西。我不想担心.get()之类的,但据我了解,进程池方法会返回一个结果对象。我可以忽略这个吗?

有没有更好的方法?我是不是完全迷路了?

1 个回答

2

首先,你需要创建一个多进程池的类。你可以设置想要的工作进程数量,然后使用 map 来启动任务。我相信你已经知道这些了,但这里有个关于 Python 多进程的文档链接。

你提到不想返回数据,因为你觉得没必要,但你打算怎么查看结果呢?每个任务会把数据写到磁盘上吗?如果你想在进程之间传递数据,可以使用类似于 多进程队列 的东西。

这里有个链接里的示例代码,展示了如何使用进程和队列:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

这是使用池的一个示例:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

补充一下:@goncalopp 提到一个很重要的观点,就是在 Python 中进行复杂的数值计算可能不太合适,因为它比较慢。Numpy 是一个很棒的库,可以用来进行数值计算。

如果你的程序因为每个进程都要写入磁盘而受到很大影响,建议你考虑运行大约 4 倍于处理器数量的进程,这样总是能保持有任务可做。同时,你也要确保你的磁盘速度很快哦 :)

撰写回答