如何在Python中复制文件并显示进度条?

13 投票
8 回答
28340 浏览
提问于 2025-04-11 09:35

在使用 shutil.copy() 复制大文件时,你不会知道这个操作进展到哪一步。

我自己写了一个可以用的东西——它使用了一个简单的进度条类(这个类简单地返回一个ASCII格式的进度条,作为字符串),然后通过一个循环来实际执行复制,循环里用到了 open().read().write()。它通过 sys.stdout.write("\r%s\r" % (the_progress_bar)) 来显示进度条,这个方法有点小技巧,但确实有效。

你可以在 这里的github上查看代码(在上下文中)

有没有什么内置模块可以做得更好?这个代码还有什么可以改进的地方吗?

8 个回答

4

我有一个用内置模块简单实现的shutil.copy(),并且加上了进度条。如果你使用的是utf-8编码,你可以像gif图片中的第二个例子那样看到进度条:这里是进度条的例子:

可以查看评论来修改样式和颜色。第一个和最后一个例子不需要utf-8编码。你可以在原本使用shutil.copy(src, dst)的地方,直接用命令CPprogress(SOURCE, DESTINATION):

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
CPprogress(SOURCE, DESTINATION)

I made this to give shutil.copy() [or shutil.copy2() in this case] a progress bar.

You can use CPprogress(SOURCE, DESTINATION) just like shutil.copy(src, dst). SOURCE must be a file path and DESTINATION a file or folder path.

It will give you a progress bar for each file copied. Just copy this code above the place where you want to use CPprogress(SOURCE, DESTINATION) in your code.

You can easily change the look of the progress bar:
    - To keep the style and just change the colors, replace the colors values of progressCOLOR and finalCOLOR (orange code at the end of the lines).
    - The use a solid block progress bar, # -*- coding: utf-8 -*- is required. Otherwise, you will get an encoding error. Some basic terminals, like xterm, may not show the progress bar because of the utf-8 characters.
      To use this style, remove the comments #STYLE# in lines ###COLORS### - BlueCOLOR and endBLOCK.
      In def getPERCECENTprogress() remove the comments  #STYLE# AND COMMENT THE PREVIOUS line. Do the same in def CPprogress()
      If you don't want the utf-8 encoding, delete the four lines beginning with #STYLE#.

NOTE: If you want to copy lots of small files, the copy process for file is so fast 
      that all you will see is a lot of lines scrolling in you terminal window - not enough time for a 'progress'.
      In that case, I use an overall progress that shows only one progress bar to the complete job.   nzX
'''
import os
import shutil
import sys
import threading
import time

######## COLORS ######
progressCOLOR = '\033[38;5;33;48;5;236m' #\033[38;5;33;48;5;236m# copy inside '' for colored progressbar| orange:#\033[38;5;208;48;5;235m
finalCOLOR =  '\033[38;5;33;48;5;33m' #\033[38;5;33;48;5;33m# copy inside '' for colored progressbar| orange:#\033[38;5;208;48;5;208m
#STYLE#BlueCOLOR = '\033[38;5;33m'#\033[38;5;33m# copy inside '' for colored progressbar Orange#'\033[38;5;208m'# # BG progress# #STYLE# 
#STYLE#endBLOCK = '' # ▌ copy OR '' for none # BG progress# #STYLE# requires utf8 coding header
########

BOLD    = '\033[1m'
UNDERLINE = '\033[4m'
CEND    = '\033[0m'

def getPERCECENTprogress(source_path, destination_path):
    time.sleep(.24)
    if os.path.exists(destination_path):
        while os.path.getsize(source_path) != os.path.getsize(destination_path):
            sys.stdout.write('\r')
            percentagem = int((float(os.path.getsize(destination_path))/float(os.path.getsize(source_path))) * 100)
            steps = int(percentagem/5)
            copiado = int(os.path.getsize(destination_path)/1000000)# Should be 1024000 but this get's equal to Thunar file manager report (Linux - Xfce)
            sizzz = int(os.path.getsize(source_path)/1000000)
            sys.stdout.write(("         {:d} / {:d} Mb   ".format(copiado, sizzz)) +  (BOLD + progressCOLOR + "{:20s}".format('|'*steps) + CEND) + ("   {:d}% ".format(percentagem))) # BG progress
            #STYLE#sys.stdout.write(("         {:d} / {:d} Mb   ".format(copiado, sizzz)) +  (BOLD + BlueCOLOR + "▐" + "{:s}".format('█'*steps) + CEND) + ("{:s}".format(' '*(20-steps))+ BOLD + BlueCOLOR + endBLOCK+ CEND) +("   {:d}% ".format(percentagem))) #STYLE# # BG progress# closer to GUI but less compatible (no block bar with xterm) # requires utf8 coding header
            sys.stdout.flush()
            time.sleep(.01)

def CPprogress(SOURCE, DESTINATION):
    if os.path.isdir(DESTINATION):
        dst_file = os.path.join(DESTINATION, os.path.basename(SOURCE))
    else: dst_file = DESTINATION
    print " "
    print (BOLD + UNDERLINE + "FROM:" + CEND + "   "), SOURCE
    print (BOLD + UNDERLINE + "TO:" + CEND + "     "), dst_file
    print " "
    threading.Thread(name='progresso', target=getPERCECENTprogress, args=(SOURCE, dst_file)).start()
    shutil.copy2(SOURCE, DESTINATION)
    time.sleep(.02)
    sys.stdout.write('\r')
    sys.stdout.write(("         {:d} / {:d} Mb   ".format((int(os.path.getsize(dst_file)/1000000)), (int(os.path.getsize(SOURCE)/1000000)))) +  (BOLD + finalCOLOR + "{:20s}".format('|'*20) + CEND) + ("   {:d}% ".format(100))) # BG progress 100%
    #STYLE#sys.stdout.write(("         {:d} / {:d} Mb   ".format((int(os.path.getsize(dst_file)/1000000)), (int(os.path.getsize(SOURCE)/1000000)))) +  (BOLD + BlueCOLOR + "▐" + "{:s}{:s}".format(('█'*20), endBLOCK) + CEND) + ("   {:d}% ".format(100))) #STYLE# # BG progress 100%# closer to GUI but less compatible (no block bar with xterm) # requires utf8 coding header
    sys.stdout.flush()
    print " "
    print " "

'''
#Ex. Copy all files from root of the source dir to destination dir

folderA = '/path/to/SOURCE' # SOURCE
folderB = '/path/to/DESTINATION' # DESTINATION
for FILE in os.listdir(folderA):
    if not os.path.isdir(os.path.join(folderA, FILE)):
        if os.path.exists(os.path.join(folderB, FILE)): continue # as we are using shutil.copy2() that overwrites destination, this skips existing files
        CPprogress(os.path.join(folderA, FILE), folderB) # use the command as if it was shutil.copy2() but with progress


         75 / 150 Mb  ||||||||||         |  50%
'''
5

这是不是有点过头了?也许吧。不过在几乎所有的系统上,比如Linux、Mac,或者在Windows上快速安装wxWidgets后,你都可以得到真正的东西,带有暂停和取消按钮的图形界面设置。现在的Mac自带wxWidgets,而在Linux上它也是一个常见的包。

处理单个文件的速度非常快(它会立刻完成,看起来像是坏掉了),所以你可以考虑创建一个文件集的任务,让它每处理一个文件就进行一次,而不是每处理一个块就进行一次。祝你玩得开心!

-Jim Carroll

"""
Threaded Jobs.

Any class that does a long running process can inherit
from ThreadedJob.  This enables running as a background
thread, progress notification, pause and cancel.  The
time remaining is also calculated by the ThreadedJob class.
"""
import wx.lib.newevent
import thread
import exceptions
import time

(RunEvent, EVT_RUN) = wx.lib.newevent.NewEvent()
(CancelEvent, EVT_CANCEL) = wx.lib.newevent.NewEvent()
(DoneEvent, EVT_DONE) = wx.lib.newevent.NewEvent()
(ProgressStartEvent, EVT_PROGRESS_START) = wx.lib.newevent.NewEvent()
(ProgressEvent, EVT_PROGRESS) = wx.lib.newevent.NewEvent()

class InterruptedException(exceptions.Exception):
    def __init__(self, args = None):
        self.args = args
    #
#

class ThreadedJob:
    def __init__(self):
        # tell them ten seconds at first
        self.secondsRemaining = 10.0
        self.lastTick = 0

        # not running yet
        self.isPaused = False
        self.isRunning = False
        self.keepGoing = True

    def Start(self):
        self.keepGoing = self.isRunning = True
        thread.start_new_thread(self.Run, ())

        self.isPaused = False
    #

    def Stop(self):
        self.keepGoing = False
    #

    def WaitUntilStopped(self):
        while self.isRunning:
            time.sleep(0.1)
            wx.SafeYield()
        #
    #

    def IsRunning(self):
        return self.isRunning
    #

    def Run(self):
        # this is overridden by the
        # concrete ThreadedJob
        print "Run was not overloaded"
        self.JobFinished()

        pass
    #

    def Pause(self):
        self.isPaused = True
        pass
    #

    def Continue(self):
        self.isPaused = False
        pass
    #

    def PossibleStoppingPoint(self):
        if not self.keepGoing:
            raise InterruptedException("process interrupted.")
        wx.SafeYield()

        # allow cancel while paused
        while self.isPaused:
            if not self.keepGoing:
                raise InterruptedException("process interrupted.")

            # don't hog the CPU
            time.sleep(0.1)
        #
    #

    def SetProgressMessageWindow(self, win):
        self.win = win
    #

    def JobBeginning(self, totalTicks):

        self.lastIterationTime = time.time()
        self.totalTicks = totalTicks

        if hasattr(self, "win") and self.win:
            wx.PostEvent(self.win, ProgressStartEvent(total=totalTicks))
        #
    #

    def JobProgress(self, currentTick):
        dt = time.time() - self.lastIterationTime
        self.lastIterationTime = time.time()
        dtick = currentTick - self.lastTick
        self.lastTick = currentTick

        alpha = 0.92
        if currentTick > 1:
            self.secondsPerTick = dt * (1.0 - alpha) + (self.secondsPerTick * alpha)
        else:
            self.secondsPerTick = dt
        #

        if dtick > 0:
            self.secondsPerTick /= dtick

        self.secondsRemaining = self.secondsPerTick * (self.totalTicks - 1 - currentTick) + 1

        if hasattr(self, "win") and self.win:
            wx.PostEvent(self.win, ProgressEvent(count=currentTick))
        #
    #

    def SecondsRemaining(self):
        return self.secondsRemaining
    #

    def TimeRemaining(self):

        if 1: #self.secondsRemaining > 3:
            minutes = self.secondsRemaining // 60
            seconds = int(self.secondsRemaining % 60.0)
            return "%i:%02i" % (minutes, seconds)
        else:
            return "a few"
    #

    def JobFinished(self):
        if hasattr(self, "win") and self.win:
            wx.PostEvent(self.win, DoneEvent())
        #

        # flag we're done before we post the all done message
        self.isRunning = False
    #
#

class EggTimerJob(ThreadedJob):
    """ A sample Job that demonstrates the mechanisms and features of the Threaded Job"""
    def __init__(self, duration):
        self.duration = duration
        ThreadedJob.__init__(self)
    #

    def Run(self):
        """ This can either be run directly for synchronous use of the job,
        or started as a thread when ThreadedJob.Start() is called.

        It is responsible for calling JobBeginning, JobProgress, and JobFinished.
        And as often as possible, calling PossibleStoppingPoint() which will 
        sleep if the user pauses, and raise an exception if the user cancels.
        """
        self.time0 = time.clock()
        self.JobBeginning(self.duration)

        try:
            for count in range(0, self.duration):
                time.sleep(1.0)
                self.JobProgress(count)
                self.PossibleStoppingPoint()
            #
        except InterruptedException:
            # clean up if user stops the Job early
            print "canceled prematurely!"
        #

        # always signal the end of the job
        self.JobFinished()
        #
    #

    def __str__(self):
        """ The job progress dialog expects the job to describe its current state."""
        response = []
        if self.isPaused:
            response.append("Paused Counting")
        elif not self.isRunning:
            response.append("Will Count the seconds")
        else:
            response.append("Counting")
        #
        return " ".join(response)
    #
#

class FileCopyJob(ThreadedJob):
    """ A common file copy Job. """

    def __init__(self, orig_filename, copy_filename, block_size=32*1024):

        self.src = orig_filename
        self.dest = copy_filename
        self.block_size = block_size
        ThreadedJob.__init__(self)
    #

    def Run(self):
        """ This can either be run directly for synchronous use of the job,
        or started as a thread when ThreadedJob.Start() is called.

        It is responsible for calling JobBeginning, JobProgress, and JobFinished.
        And as often as possible, calling PossibleStoppingPoint() which will 
        sleep if the user pauses, and raise an exception if the user cancels.
        """
        self.time0 = time.clock()

        try:
            source = open(self.src, 'rb')

            # how many blocks?
            import os
            (st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, st_size, st_atime, st_mtime, st_ctime) = os.stat(self.src)
            num_blocks = st_size / self.block_size
            current_block = 0

            self.JobBeginning(num_blocks)

            dest = open(self.dest, 'wb')

            while 1:
                copy_buffer = source.read(self.block_size)
                if copy_buffer:
                    dest.write(copy_buffer)
                    current_block += 1
                    self.JobProgress(current_block)
                    self.PossibleStoppingPoint()
                else:
                    break

            source.close()
            dest.close()

        except InterruptedException:
            # clean up if user stops the Job early
            dest.close()
            # unlink / delete the file that is partially copied
            os.unlink(self.dest)
            print "canceled, dest deleted!"
        #

        # always signal the end of the job
        self.JobFinished()
        #
    #

    def __str__(self):
        """ The job progress dialog expects the job to describe its current state."""
        response = []
        if self.isPaused:
            response.append("Paused Copy")
        elif not self.isRunning:
            response.append("Will Copy a file")
        else:
            response.append("Copying")
        #
        return " ".join(response)
    #
#

class JobProgress(wx.Dialog):
    """ This dialog shows the progress of any ThreadedJob.

    It can be shown Modally if the main application needs to suspend
    operation, or it can be shown Modelessly for background progress
    reporting.

    app = wx.PySimpleApp()
    job = EggTimerJob(duration = 10)
    dlg = JobProgress(None, job)
    job.SetProgressMessageWindow(dlg)
    job.Start()
    dlg.ShowModal()


    """
    def __init__(self, parent, job):
        self.job = job

        wx.Dialog.__init__(self, parent, -1, "Progress", size=(350,200))

        # vertical box sizer
        sizeAll = wx.BoxSizer(wx.VERTICAL)

        # Job status text
        self.JobStatusText = wx.StaticText(self, -1, "Starting...")
        sizeAll.Add(self.JobStatusText, 0, wx.EXPAND|wx.ALL, 8)

        # wxGague
        self.ProgressBar = wx.Gauge(self, -1, 10, wx.DefaultPosition, (250, 15))
        sizeAll.Add(self.ProgressBar, 0, wx.EXPAND|wx.ALL, 8)

        # horiz box sizer, and spacer to right-justify
        sizeRemaining = wx.BoxSizer(wx.HORIZONTAL)
        sizeRemaining.Add((2,2), 1, wx.EXPAND)

        # time remaining read-only edit
        # putting wide default text gets a reasonable initial layout.
        self.remainingText = wx.StaticText(self, -1, "???:??")
        sizeRemaining.Add(self.remainingText, 0, wx.LEFT|wx.RIGHT|wx.ALIGN_CENTER_VERTICAL, 8)

        # static text: remaining
        self.remainingLabel = wx.StaticText(self, -1, "remaining")
        sizeRemaining.Add(self.remainingLabel, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 8)

        # add that row to the mix
        sizeAll.Add(sizeRemaining, 1, wx.EXPAND)

        # horiz box sizer & spacer
        sizeButtons = wx.BoxSizer(wx.HORIZONTAL)
        sizeButtons.Add((2,2), 1, wx.EXPAND|wx.ADJUST_MINSIZE)

        # Pause Button
        self.PauseButton = wx.Button(self, -1, "Pause")
        sizeButtons.Add(self.PauseButton, 0, wx.ALL, 4)
        self.Bind(wx.EVT_BUTTON, self.OnPauseButton, self.PauseButton)

        # Cancel button
        self.CancelButton = wx.Button(self, wx.ID_CANCEL, "Cancel")
        sizeButtons.Add(self.CancelButton, 0, wx.ALL, 4)
        self.Bind(wx.EVT_BUTTON, self.OnCancel, self.CancelButton)

        # Add all the buttons on the bottom row to the dialog
        sizeAll.Add(sizeButtons, 0, wx.EXPAND|wx.ALL, 4)

        self.SetSizer(sizeAll)
        #sizeAll.Fit(self)
        sizeAll.SetSizeHints(self)

        # jobs tell us how they are doing
        self.Bind(EVT_PROGRESS_START, self.OnProgressStart)
        self.Bind(EVT_PROGRESS, self.OnProgress)
        self.Bind(EVT_DONE, self.OnDone)

        self.Layout()
    #

    def OnPauseButton(self, event):
        if self.job.isPaused:
            self.job.Continue()
            self.PauseButton.SetLabel("Pause")
            self.Layout()
        else:
            self.job.Pause()
            self.PauseButton.SetLabel("Resume")
            self.Layout()
        #
    #

    def OnCancel(self, event):
        self.job.Stop()
    #

    def OnProgressStart(self, event):
        self.ProgressBar.SetRange(event.total)
        self.statusUpdateTime = time.clock()
    #

    def OnProgress(self, event):
        # update the progress bar
        self.ProgressBar.SetValue(event.count)

        self.remainingText.SetLabel(self.job.TimeRemaining())

        # update the text a max of 20 times a second
        if time.clock() - self.statusUpdateTime > 0.05:
            self.JobStatusText.SetLabel(str(self.job))
            self.statusUpdateTime = time.clock()
            self.Layout()
        #
    #

    # when a job is done
    def OnDone(self, event):
        self.ProgressBar.SetValue(0)
        self.JobStatusText.SetLabel("Finished")
        self.Destroy()
    #
#

if __name__ == "__main__":
    app = wx.PySimpleApp()
    #job = EggTimerJob(duration = 10)
    job = FileCopyJob("VeryBigFile.mp4", "/tmp/test_junk.mp4", 1024*1024*10)
    dlg = JobProgress(None, job)
    job.SetProgressMessageWindow(dlg)
    job.Start()
    dlg.ShowModal()
#
17

两件事:

  • 我会把默认的块大小设置得比512大得多。我建议从16384开始,甚至可以更大。
  • 为了模块化,可能更好的是让 copy_with_prog 函数不直接输出进度条,而是调用一个回调函数,这样调用者可以自己决定如何显示进度。

也许可以像这样:

def copy_with_prog(src, dest, callback = None):
    while True:
        # copy loop stuff
        if callback:
            callback(pos, total)

prog = ProgressBar(...)
copy_with_prog(src, dest, lambda pos, total: prog.update(pos, total))

撰写回答