并行文件解析,多核CPU

13 投票
6 回答
24942 浏览
提问于 2025-04-16 06:15

我之前问过一个相关但比较笼统的问题(特别是可以看看这个回答)。

这次的问题非常具体。我关心的代码就是:

result = {}
for line in open('input.txt'):
  key, value = parse(line)
  result[key] = value

这个叫做 parse 的函数是完全独立的(也就是说,它不使用任何共享的资源)。

我有一台Intel i7-920的处理器(4个核心,8个线程;我觉得线程可能更重要,但我不太确定)。

我该怎么做才能让我的程序充分利用这个处理器的并行处理能力呢?

我想我可以用8个不同的线程来打开这个文件进行读取,因为相对于总时间,磁盘访问的时间是比较短的,应该不会影响性能。

6 个回答

4

这可以通过使用Ray来实现,Ray是一个用于编写并行和分布式Python程序的库。

要运行下面的代码,首先需要创建一个名为input.txt的文件,内容如下。

printf "1\n2\n3\n4\n5\n6\n" > input.txt

然后,你可以通过给parse函数加上@ray.remote这个装饰器,来并行处理这个文件,并同时执行多个副本,方法如下。

import ray
import time

ray.init()

@ray.remote
def parse(line):
    time.sleep(1)
    return 'key' + str(line), 'value'

# Submit all of the "parse" tasks in parallel and wait for the results.
keys_and_values = ray.get([parse.remote(line) for line in open('input.txt')])
# Create a dictionary out of the results.
result = dict(keys_and_values)

需要注意的是,最佳的做法取决于运行parse函数所需的时间。如果运行时间是1秒(如上所示),那么每个Ray任务解析一行是合理的。如果只需要1毫秒,那么每个Ray任务解析多行(比如100行)可能更合适。

你的脚本足够简单,也可以使用多进程模块,但一旦你想做更复杂的事情,或者想利用多台机器而不是仅仅一台机器,使用Ray会简单得多。

可以查看Ray的文档了解更多信息。

8
  1. 把文件分成8个小文件
  2. 启动一个单独的脚本来处理每个文件
  3. 把结果合并起来

为什么这是最好的方法...

  • 这很简单易懂——你不需要用不同的方式来编程,只要像处理线性流程一样就行。
  • 通过启动少量的长时间运行的进程,你可以获得最佳的性能。
  • 操作系统会处理上下文切换和输入输出的多路复用,所以你不需要担心这些(操作系统做得很好)。
  • 你可以扩展到多台机器,而不需要改动代码。
  • ...
21

cPython并没有很简单地提供你想要的线程模型。不过,你可以使用multiprocessing模块和一个进程池来实现类似的功能。

这样的解决方案可能看起来像这样:

def worker(lines):
    """Make a dict out of the parsed, supplied lines"""
    result = {}
    for line in lines.split('\n'):
        k, v = parse(line)
        result[k] = v
    return result

if __name__ == '__main__':
    # configurable options.  different values may work better.
    numthreads = 8
    numlines = 100

    lines = open('input.txt').readlines()

    # create the process pool
    pool = multiprocessing.Pool(processes=numthreads)

    # map the list of lines into a list of result dicts
    result_list = pool.map(worker, 
        (lines[line:line+numlines] for line in xrange(0,len(lines),numlines) ) )

    # reduce the result dicts into a single dict
    result = {}
    map(result.update, result_list)

撰写回答