在Python中如何最佳地将大文件拆分以进行多进程处理?
我遇到很多“明显可以并行处理”的项目,想用 multiprocessing
模块来加速处理。不过,这些项目通常需要读取超大的文件(超过2GB),逐行处理这些文件,进行一些基本的计算,然后再写出结果。那么,怎么才能把一个文件分割开来,并用Python的multiprocessing模块来处理呢?我应该使用 multiprocessing
中的 Queue
还是 JoinableQueue
?或者直接用 Queue
模块?还是应该用 multiprocessing
来对文件的可迭代对象进行映射到一组进程上?我试过这些方法,但逐行分发数据的开销太大了。我最终选择了一种轻量级的管道过滤器设计,使用 cat file | process1 --out-file out1 --num-processes 2 | process2 --out-file out2
,这样可以把第一个进程的输入的一部分直接传给第二个进程(具体可以参考这篇帖子),但我希望能有一个完全用Python实现的解决方案。
令人惊讶的是,Python的文档并没有提供一个标准的方法来做到这一点(尽管在 multiprocessing
的文档中有一大段关于编程指南的内容)。
谢谢,
Vince
补充信息:每行的处理时间是不一样的。有些问题处理得很快,几乎不受I/O限制,而有些则是CPU限制的。对于那些不相互依赖的CPU限制任务,使用并行处理会有很大的好处,即使是把数据分配给处理函数的效率不高,仍然能在实际时间上节省不少。
一个典型的例子是一个脚本,它从每行中提取字段,检查各种位标志,然后把带有特定标志的行以全新的格式写入一个新文件。这个看起来像是一个I/O限制的问题,但当我用便宜的并发版本和管道运行它时,速度快了大约20%。而当我用 multiprocessing
的池和映射,或者队列来运行时,速度总是慢超过100%。
7 个回答
你没有提到你是怎么处理这些行的,这可能是最重要的信息。
每一行是独立的吗?计算是否依赖于前一行的结果?必须分块处理吗?处理每一行需要多长时间?最后是否有一个处理步骤需要把“所有”数据都考虑进去?还是说可以丢掉中间结果,只保留一个总的结果?文件能不能先通过文件大小除以线程数来分割?还是说在处理时文件会变大?
如果这些行是独立的,并且文件不会变大,你只需要协调一下“起始地址”和“长度”分配给每个工作者;他们可以独立地打开文件并定位到相应位置,然后你只需要协调他们的结果;比如说,等到N个结果返回到一个队列里。
如果这些行不是独立的,那么答案就会很大程度上依赖于文件的结构。
一种策略是给每个工作进程分配一个偏移量。如果你有八个工作进程,就给它们编号从0到7。比如,工作进程0读取第一个记录,处理完后跳过7个记录,接着处理第8个记录;工作进程1读取第二个记录,然后跳过7个,处理第9个记录,依此类推……
这种方法有很多好处。无论文件有多大,工作总是能平均分配;在同一台机器上的进程处理速度大致相同,而且使用相同的缓冲区,这样就不会产生过多的输入输出开销。只要文件没有被更新,你就可以重新运行单独的线程来恢复因故障而中断的工作。
在Linux操作系统中,有一种非常好的架构已经存在了。你不需要特别的库来实现它。
你想要的是一种“分发”的设计。
一个“主”程序会创建多个子进程,这些子进程通过管道连接在一起。
主程序读取文件,把每一行写入管道,同时进行最少的过滤,以便把这些行分配给合适的子进程。
每个子进程可能应该是一个由不同进程组成的管道,这些进程从标准输入读取和写入数据。
你不需要使用队列这种数据结构,因为内存中的管道本身就是一个队列,它在两个同时运行的进程之间传递字节。