并行文件解析,多核CPU
我之前问过一个相关但比较笼统的问题(特别是可以看看这个回答)。
这次的问题非常具体。我关心的代码就是:
result = {}
for line in open('input.txt'):
key, value = parse(line)
result[key] = value
这个叫做 parse
的函数是完全独立的(也就是说,它不使用任何共享的资源)。
我有一台Intel i7-920的处理器(4个核心,8个线程;我觉得线程可能更重要,但我不太确定)。
我该怎么做才能让我的程序充分利用这个处理器的并行处理能力呢?
我想我可以用8个不同的线程来打开这个文件进行读取,因为相对于总时间,磁盘访问的时间是比较短的,应该不会影响性能。
6 个回答
4
这可以通过使用Ray来实现,Ray是一个用于编写并行和分布式Python程序的库。
要运行下面的代码,首先需要创建一个名为input.txt
的文件,内容如下。
printf "1\n2\n3\n4\n5\n6\n" > input.txt
然后,你可以通过给parse
函数加上@ray.remote
这个装饰器,来并行处理这个文件,并同时执行多个副本,方法如下。
import ray
import time
ray.init()
@ray.remote
def parse(line):
time.sleep(1)
return 'key' + str(line), 'value'
# Submit all of the "parse" tasks in parallel and wait for the results.
keys_and_values = ray.get([parse.remote(line) for line in open('input.txt')])
# Create a dictionary out of the results.
result = dict(keys_and_values)
需要注意的是,最佳的做法取决于运行parse
函数所需的时间。如果运行时间是1秒(如上所示),那么每个Ray任务解析一行是合理的。如果只需要1毫秒,那么每个Ray任务解析多行(比如100行)可能更合适。
你的脚本足够简单,也可以使用多进程模块,但一旦你想做更复杂的事情,或者想利用多台机器而不是仅仅一台机器,使用Ray会简单得多。
可以查看Ray的文档了解更多信息。
8
- 把文件分成8个小文件
- 启动一个单独的脚本来处理每个文件
- 把结果合并起来
为什么这是最好的方法...
- 这很简单易懂——你不需要用不同的方式来编程,只要像处理线性流程一样就行。
- 通过启动少量的长时间运行的进程,你可以获得最佳的性能。
- 操作系统会处理上下文切换和输入输出的多路复用,所以你不需要担心这些(操作系统做得很好)。
- 你可以扩展到多台机器,而不需要改动代码。
- ...
21
cPython并没有很简单地提供你想要的线程模型。不过,你可以使用multiprocessing
模块和一个进程池来实现类似的功能。
这样的解决方案可能看起来像这样:
def worker(lines):
"""Make a dict out of the parsed, supplied lines"""
result = {}
for line in lines.split('\n'):
k, v = parse(line)
result[k] = v
return result
if __name__ == '__main__':
# configurable options. different values may work better.
numthreads = 8
numlines = 100
lines = open('input.txt').readlines()
# create the process pool
pool = multiprocessing.Pool(processes=numthreads)
# map the list of lines into a list of result dicts
result_list = pool.map(worker,
(lines[line:line+numlines] for line in xrange(0,len(lines),numlines) ) )
# reduce the result dicts into a single dict
result = {}
map(result.update, result_list)