Python中的并行执行和文件写入

7 投票
3 回答
19450 浏览
提问于 2025-04-17 20:37

我有一个非常大的数据集,这些数据分布在10个大集群中。我的任务是对每个集群进行一些计算,然后把结果一行一行地写入10个文件中,每个文件对应一个集群的结果。每个集群的计算可以独立进行,我想把代码并行化,也就是让10个CPU(或者线程)同时工作,这样我就可以同时对所有集群进行计算。下面是我任务的一个简化伪代码:

for(c in range (1,10)):  #this is the loop over the clusters
    for(l in "readlines from cluster C")
         # do some computations for line l in cluster c
         # append the results in file named "cluster_c" one file for each cluter c

3 个回答

5

就像@Davidmh的回答一样,不过这是用python3写的:

from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed

def saver(q):
    with open('out.txt', 'w') as out:
        while True:
            val = q.get()
            if val is None: break
            out.write(val + '\n')
        q.task_done()
        # Finish up
        q.task_done()

def foo(x):
    import os
    q.put(str(os.getpid()) + '-' + str(x**3+2))

q = JoinableQueue()
p = Process(target=saver, args=(q,))
p.start()
Parallel(n_jobs=-1, verbose=0)(delayed(foo)(i) for i in range(1000))
q.put(None) # Poison pill
p.join()

另外,我还在每一行输出中加上了进程ID(PID),这样可以检查一切是否正常工作;-)

11

你可以使用joblib来加速分析。如果你有一个函数叫做process_line

from joblib import Parallel, delayed
data = Parallel(n_jobs=-1)(delayed(process_line)(line)
                           for line in open('bigfile'))

你想要按顺序保存信息。根据计算时间和要保存的数据大小的比例,你可以使用不同的方法:

计算时间很长但结果很少

线程之间的通信开销非常小。在这种情况下,最简单的办法是让每个进程写入一个独立的文件,最后再把结果合并在一起。你可以通过传递一个索引来确保不会覆盖文件,并用它来创建文件。

一个更高级的解决方案是将文件处理器作为参数传递,并在获取一个多进程锁(multiprocessing.Lock)后再写入文件。唯一的问题是,如果很多进程同时尝试获取这个锁,它们会占用CPU资源,但并不会进行计算。

def process_line(line, outfile, lock)
   data = line[0]
   lock.aquire()
   print >> outfile, data
   lock.release()

计算时间较短

如果你有更多的数据,写入文件可能会带来一些额外的开销,特别是如果你之后还要把它重新加载到内存中。在这种情况下,你有两个选择:

  • 所有数据都能放进内存:那你真幸运。使用joblib,只需将结果作为函数的返回值。最后你会得到一个按顺序排列的结果列表。
  • 数据放不进内存,你需要实时处理。你需要一个生产者-消费者模式。类似于:

    from multiprocessing import Process, JoinableQueue
    from joblib import Parallel, delayed
    
    def saver(q):
        with open('out.txt', 'w') as out:
            while True:
                val = q.get()
                if val is None: break
                print >> out, val
                q.task_done()
            # Finish up
            q.task_done()
    
    def foo(x):
        q.put(x**3+2)
    
    q = JoinableQueue()
    p = Process(target=saver, args=(q,))
    p.start()
    Parallel(n_jobs=2, verbose=0)(delayed(foo)(i) for i in xrange(1000))
    q.put(None) # Poison pill
    q.join()
    p.join()
    

如果数据量相比计算时间非常大,你会发现仅仅在进程之间传输数据就会产生很多额外的开销。如果这是你的限制,那么你应该使用更高级的技术,比如OpenMP,或许还可以用Cython来解决全局解释器锁(GIL)的问题,使用线程而不是进程。

注意,我没有具体说明“少”有多小;这很大程度上取决于你集群的配置,比如通信速度、底层文件系统等。但这些都可以通过实验来轻松了解,比如,测量一个虚拟程序发送一行数据到另一个进程所需的时间。

3
#!/usr/bin/env python
from multiprocessing import Pool

def compute_cluster(c):
    """each cluster can be computed independently"""
    ... # compute a cluster here 

if __name__=="__main__":
   pool = Pool(10) # run 10 task at most in parallel
   pool.map(compute_cluster, range(10))

当然可以!请把你想要翻译的内容发给我,我会帮你用简单易懂的语言解释清楚。

撰写回答