并行文件解析,多CPU核

2024-04-19 04:33:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我之前问了一个相关但非常一般的问题(特别是this response)。

这个问题很具体。这就是我关心的所有代码:

result = {}
for line in open('input.txt'):
  key, value = parse(line)
  result[key] = value

函数parse是完全独立的(即,不使用任何共享资源)。

我有Intel i7-920 CPU(4核,8线程;我认为线程更相关,但我不确定)。

我该怎么做才能使我的程序使用这个CPU的所有并行功能?

我假设我可以在8个不同的线程中打开这个文件进行读取,而不会有太大的性能损失,因为磁盘访问时间相对于总时间来说很小。


Tags: key代码inforparsevalueresponseline
3条回答
  1. 将文件拆分为8个较小的文件
  2. 启动单独的脚本来处理每个文件
  3. 加入结果

为什么这是最好的方法。。。

  • 这很简单-你不必以任何不同于线性处理的方式编程。
  • 启动少量长时间运行的进程可以获得最佳性能。
  • 操作系统将处理上下文切换和IO多路复用,因此您不必担心这些事情(操作系统做得很好)。
  • 您可以扩展到多台计算机,而不必更改代码
  • 。。。

这可以使用Ray来完成,这是一个用于编写并行和分布式Python的库。

要运行下面的代码,首先按如下方式创建input.txt

printf "1\n2\n3\n4\n5\n6\n" > input.txt

然后,您可以通过将@ray.remotedecorator添加到parse函数并并行执行多个副本来并行处理该文件,如下所示

import ray
import time

ray.init()

@ray.remote
def parse(line):
    time.sleep(1)
    return 'key' + str(line), 'value'

# Submit all of the "parse" tasks in parallel and wait for the results.
keys_and_values = ray.get([parse.remote(line) for line in open('input.txt')])
# Create a dictionary out of the results.
result = dict(keys_and_values)

注意,最佳的方法取决于运行parse函数需要多长时间。如果需要一秒钟(如上所述),那么解析每个Ray任务一行是有意义的。如果需要1毫秒,那么解析每个Ray任务的一堆行(例如100行)可能是有意义的。

您的脚本非常简单,因此也可以使用多处理模块,但是只要您想做任何更复杂的事情,或者想利用多台机器而不是一台机器,那么使用Ray就容易多了。

请参阅Ray documentation

cPython不提供您所寻找的线程模型。使用multiprocessing模块和process pool可以得到类似的结果

这样的解决方案可能看起来像这样:

def worker(lines):
    """Make a dict out of the parsed, supplied lines"""
    result = {}
    for line in lines.split('\n'):
        k, v = parse(line)
        result[k] = v
    return result

if __name__ == '__main__':
    # configurable options.  different values may work better.
    numthreads = 8
    numlines = 100

    lines = open('input.txt').readlines()

    # create the process pool
    pool = multiprocessing.Pool(processes=numthreads)

    # map the list of lines into a list of result dicts
    result_list = pool.map(worker, 
        (lines[line:line+numlines] for line in xrange(0,len(lines),numlines) ) )

    # reduce the result dicts into a single dict
    result = {}
    map(result.update, result_list)

相关问题 更多 >