使用Python脚本进行多CPU的bzip2压缩

6 投票
2 回答
1305 浏览
提问于 2025-04-16 01:54

我想在我的8核、16GB内存的工作站上快速压缩几百GB的数据,使用bzip2工具。

目前,我用一个简单的Python脚本来压缩整个目录,使用bzip2,并通过os.system和os.walk来实现。

我发现bzip2只使用了一个CPU,其他的CPU几乎没有在工作。

我对队列和线程处理还不是很熟悉,但我在想,怎样才能实现让四个bzip2的线程同时运行(其实我猜是os.system的线程),每个线程可能使用自己的CPU,从队列中取出文件进行压缩。

这是我现在的单线程脚本。

import os
import sys

for roots, dirlist , filelist in os.walk(os.curdir):
    for file in [os.path.join(roots,filegot) for filegot in filelist]:
        if "bz2" not in file:
            print "Compressing %s" % (file)
            os.system("bzip2 %s" % file)
            print ":DONE" 

2 个回答

1

可以使用 subprocess 模块同时启动多个进程。如果你有 N 个进程在运行(N 的值应该比你电脑的 CPU 数量稍微大一点,比如说,如果你有 2 个核心,就用 3;如果有 8 个核心,就用 10),那么等其中一个进程结束后,再启动另一个。

不过要注意,这样做可能帮助不大,因为会有很多磁盘操作,这些操作是无法并行处理的。拥有足够的空闲内存来做缓存会比较有帮助。

1

试试MRAB在comp.lang.python上分享的这段代码:

import os 
import sys 
from threading import Thread, Lock 
from Queue import Queue 
def report(message): 
     mutex.acquire() 
     print message 
     sys.stdout.flush() 
     mutex.release() 
class Compressor(Thread): 
     def __init__(self, in_queue, out_queue): 
         Thread.__init__(self) 
         self.in_queue = in_queue 
         self.out_queue = out_queue 
     def run(self): 
         while True: 
             path = self.in_queue.get() 
             sys.stdout.flush() 
             if path is None: 
                 break 
             report("Compressing %s" % path) 
             os.system("bzip2 %s" % path) 
             report("Done %s" %  path) 
             self.out_queue.put(path) 
in_queue = Queue() 
out_queue = Queue() 
mutex = Lock() 
THREAD_COUNT = 4 
worker_list = [] 
for i in range(THREAD_COUNT): 
     worker = Compressor(in_queue, out_queue) 
     worker.start() 
     worker_list.append(worker) 
for roots, dirlist, filelist in os.walk(os.curdir): 
     for file in [os.path.join(roots, filegot) for filegot in filelist]: 
         if "bz2" not in file: 
             in_queue.put(file) 
for i in range(THREAD_COUNT): 
     in_queue.put(None) 
for worker in worker_list: 
     worker.join() 

撰写回答