Python多进程/多线程加速文件复制

2024-06-08 12:56:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个程序可以把大量的文件从一个地方复制到另一个地方——我说的是100000多个文件(我现在在图像序列中复制314g)。他们都在一个巨大的,非常快的网络存储阵列上。我正在使用shutil按顺序复制文件,这需要一些时间,所以我试图找到最好的方法来实现这一点。我注意到我使用的一些软件可以有效地多线程地从网络上读取文件,从而大大提高了加载时间,所以我想尝试用python来实现这一点。在

我没有编程多线程/多处理的经验-这看起来是正确的领域继续吗?如果是这样,最好的方法是什么?我看过其他一些关于在python中复制线程文件的SO帖子,它们似乎都说没有速度上的提高,但我认为考虑到我的硬件,情况不会这样。目前我的IO容量还远远不够,资源大约占1%(我在本地有40个内核和64g RAM)。在

  • 斯宾塞

Tags: 文件方法图像程序网络软件顺序编程
2条回答

这可以通过在Python中使用gevent进行并行化。在

我建议使用以下逻辑来实现加速100k+文件复制

  1. 把所有需要复制到csv文件中的100K+文件的名称,例如:输入.csv'.

  2. 然后从csv文件创建块。块的数量应根据编号计算机中的处理器/核心。

  3. 将这些块传递给单独的线程。

  4. 每个线程按顺序读取该块中的文件名并将其从一个位置复制到另一个位置。

下面是python代码片段:

import sys
import os
import multiprocessing

from gevent import monkey
monkey.patch_all()

from gevent.pool import Pool

def _copyFile(file):
    # over here, you can put your own logic of copying a file from source to destination

def _worker(csv_file, chunk):
    f = open(csv_file)
    f.seek(chunk[0])
    for file in f.read(chunk[1]).splitlines():
        _copyFile(file)


def _getChunks(file, size):
    f = open(file)
    while 1:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline()
        yield start, f.tell() - start
        if not s:
            f.close()
            break

if __name__ == "__main__":
    if(len(sys.argv) > 1):
        csv_file_name = sys.argv[1]
    else:
        print "Please provide a csv file as an argument."
        sys.exit()

    no_of_procs = multiprocessing.cpu_count() * 4

    file_size = os.stat(csv_file_name).st_size

    file_size_per_chunk = file_size/no_of_procs

    pool = Pool(no_of_procs)

    for chunk in _getChunks(csv_file_name, file_size_per_chunk):
        pool.apply_async(_worker, (csv_file_name, chunk))

    pool.join()

将文件另存为文件_复印机.py. 打开终端并运行:

^{pr2}$

更新:

我从来没有让Gevent工作(第一个答案),因为我无法安装没有互联网连接的模块,我没有在我的工作站。不过,我只需使用python的内置线程,就可以将文件复制时间减少8次(从那时起我就学会了如何使用它),我想把它作为一个额外的答案发布给感兴趣的人!下面是我的代码,可能需要注意的是,由于硬件/网络设置的不同,我的8x复制时间很可能因环境而异。在

import Queue, threading, os, time
import shutil

fileQueue = Queue.Queue()
destPath = 'path/to/cop'

class ThreadedCopy:
    totalFiles = 0
    copyCount = 0
    lock = threading.Lock()

    def __init__(self):
        with open("filelist.txt", "r") as txt: #txt with a file per line
            fileList = txt.read().splitlines()

        if not os.path.exists(destPath):
            os.mkdir(destPath)

        self.totalFiles = len(fileList)

        print str(self.totalFiles) + " files to copy."
        self.threadWorkerCopy(fileList)


    def CopyWorker(self):
        while True:
            fileName = fileQueue.get()
            shutil.copy(fileName, destPath)
            fileQueue.task_done()
            with self.lock:
                self.copyCount += 1
                percent = (self.copyCount * 100) / self.totalFiles
                print str(percent) + " percent copied."

    def threadWorkerCopy(self, fileNameList):
        for i in range(16):
            t = threading.Thread(target=self.CopyWorker)
            t.daemon = True
            t.start()
        for fileName in fileNameList:
            fileQueue.put(fileName)
        fileQueue.join()

ThreadedCopy()

相关问题 更多 >