Python中多个进程从中央源快速读取的方法?

3 投票
1 回答
2671 浏览
提问于 2025-04-16 14:11

我在寻找一种快速的方法,让多个进程(在multiprocessing.Pool()中)能够从一个中央数据源读取数据。目前,我有一个文件,这个文件被读入一个队列(使用multiprocessing.Manager().Queue()),然后启动一个工作池,让这些进程从这个队列中读取数据。这种方法运行得不错,但当我处理几个GB大小的文件时,就出现了问题,因为这个管理队列的速度大约是普通Python队列的7倍慢。

我觉得这可能是因为管理器是在一个单独的进程中,它需要通过一个套接字进行通信,而不是直接与内存交互。

这是我使用的代码(get_records函数只是从文件中读取每条记录的字节流并返回)

from multiprocessing import Manager
manager = Manager()
mgr_q = manager.Queue()
map(mgr_q.put, get_records(f))

所以,也许有更好的方法来处理这个问题?


以下是关于将我的一个数据文件(约3GB)读取到不同数据类型中的速度统计:

读取到普通的Python列表中。速度为229.377 MB/秒

l = []
map(l.append, get_records(f))

读取到普通队列中。速度为74.035 MB/秒

import Queue
q = Queue.Queue()
map(q.put, get_records(f))

读取到multiprocessing.queues队列中。速度为67.718 MB/秒

from multiprocessing import Queue
mq = Queue()
map(mq.put, get_records(f))

最后,读取到管理队列中。速度为9.568 MB/秒

from multiprocessing import Manager
manager = Manager()
mgr_q = manager.Queue()
map(mgr_q.put, get_records(f))

速度是通过rate = duration / filesize / 1024 / 1024计算得出的

1 个回答

3

如果你只是从文件中读取数据,那么多个进程同时读取是安全的。与其在队列中传递数据,不如直接传递文件的位置(偏移量)和要读取的字节数。然后在工作进程中可以这样做:

f.seek(offset)
get_records(f, count)

撰写回答