Python中多个进程从中央源快速读取的方法?
我在寻找一种快速的方法,让多个进程(在multiprocessing.Pool()中)能够从一个中央数据源读取数据。目前,我有一个文件,这个文件被读入一个队列(使用multiprocessing.Manager().Queue()),然后启动一个工作池,让这些进程从这个队列中读取数据。这种方法运行得不错,但当我处理几个GB大小的文件时,就出现了问题,因为这个管理队列的速度大约是普通Python队列的7倍慢。
我觉得这可能是因为管理器是在一个单独的进程中,它需要通过一个套接字进行通信,而不是直接与内存交互。
这是我使用的代码(get_records函数只是从文件中读取每条记录的字节流并返回)
from multiprocessing import Manager
manager = Manager()
mgr_q = manager.Queue()
map(mgr_q.put, get_records(f))
所以,也许有更好的方法来处理这个问题?
以下是关于将我的一个数据文件(约3GB)读取到不同数据类型中的速度统计:
读取到普通的Python列表中。速度为229.377 MB/秒
l = []
map(l.append, get_records(f))
读取到普通队列中。速度为74.035 MB/秒
import Queue
q = Queue.Queue()
map(q.put, get_records(f))
读取到multiprocessing.queues队列中。速度为67.718 MB/秒
from multiprocessing import Queue
mq = Queue()
map(mq.put, get_records(f))
最后,读取到管理队列中。速度为9.568 MB/秒
from multiprocessing import Manager
manager = Manager()
mgr_q = manager.Queue()
map(mgr_q.put, get_records(f))
速度是通过rate = duration / filesize / 1024 / 1024
计算得出的
1 个回答
3
如果你只是从文件中读取数据,那么多个进程同时读取是安全的。与其在队列中传递数据,不如直接传递文件的位置(偏移量)和要读取的字节数。然后在工作进程中可以这样做:
f.seek(offset)
get_records(f, count)