在ZipArchive中覆盖文件

37 投票
6 回答
29341 浏览
提问于 2025-04-16 09:45

我有一个叫做 archive.zip 的压缩文件,里面有两个文件:hello.txtworld.txt

我想用新的文件来替换掉 hello.txt,我写了这个代码:

import zipfile

z = zipfile.ZipFile('archive.zip','a')
z.write('hello.txt')
z.close()  

但是它并没有替换掉原来的文件,反而又创建了一个新的 hello.txt —— 看看这个 winzip 的截图:

alt text

因为没有类似 zipfile.remove() 这样的命令,处理这个问题的最好方法是什么呢?

6 个回答

1

根据这个回答,这里有一个简单粗暴的方法,可以对现有的zipfile进行修改,以支持文件删除功能(我们在等待这个功能被接受到python:main中):

from zipfile import ZipFile, ZipInfo
from operator import attrgetter
import functools

def enable_zip_remove(func):
    def _zipfile_remove_member(self, member):
        # get a sorted filelist by header offset, in case the dir order
        # doesn't match the actual entry order
        fp = self.fp
        entry_offset = 0
        filelist = sorted(self.filelist, key=attrgetter('header_offset'))
        for i in range(len(filelist)):
            info = filelist[i]
            # find the target member
            if info.header_offset < member.header_offset:
                continue

            # get the total size of the entry
            entry_size = None
            if i == len(filelist) - 1:
                entry_size = self.start_dir - info.header_offset
            else:
                entry_size = filelist[i + 1].header_offset - info.header_offset

            # found the member, set the entry offset
            if member == info:
                entry_offset = entry_size
                continue

            # Move entry
            # read the actual entry data
            fp.seek(info.header_offset)
            entry_data = fp.read(entry_size)

            # update the header
            info.header_offset -= entry_offset

            # write the entry to the new position
            fp.seek(info.header_offset)
            fp.write(entry_data)
            fp.flush()

        # update state
        self.start_dir -= entry_offset
        self.filelist.remove(member)
        del self.NameToInfo[member.filename]
        self._didModify = True

        # seek to the start of the central dir
        fp.seek(self.start_dir)

    def zipfile_remove(self, member):
        """Remove a file from the archive. The archive must be open with mode 'a'"""

        if self.mode != 'a':
            raise RuntimeError("remove() requires mode 'a'")
        if not self.fp:
            raise ValueError(
                "Attempt to write to ZIP archive that was already closed")
        if self._writing:
            raise ValueError(
                "Can't write to ZIP archive while an open writing handle exists."
            )

        # Make sure we have an info object
        if isinstance(member, ZipInfo):
            # 'member' is already an info object
            zinfo = member
        else:
            # get the info object
            zinfo = self.getinfo(member)

        return self._zipfile_remove_member(zinfo)

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        if not hasattr(ZipFile, "remove"):
            setattr(ZipFile, "_zipfile_remove_member", _zipfile_remove_member)
            setattr(ZipFile, "remove", zipfile_remove)
        return func(*args, **kwargs)
    return wrapper

使用方法:

@enable_zip_remove
def replace_zip_file():
    with ZipFile("archive.zip", "a") as z:
        z.remove("hello.txt")
        z.write("hello.txt")

附注:不适合工作场合

30

这是对nosklo回答的进一步说明。

UpdateableZipFile是一个类,它是从ZipFile这个类派生出来的。它保持了和ZipFile一样的使用方式,但增加了可以覆盖文件的功能(通过writestr或write方法)以及删除文件的功能。

import os
import shutil
import tempfile
from zipfile import ZipFile, ZIP_STORED, ZipInfo


class UpdateableZipFile(ZipFile):
    """
    Add delete (via remove_file) and update (via writestr and write methods)
    To enable update features use UpdateableZipFile with the 'with statement',
    Upon  __exit__ (if updates were applied) a new zip file will override the exiting one with the updates
    """

    class DeleteMarker(object):
        pass

    def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False):
        # Init base
        super(UpdateableZipFile, self).__init__(file, mode=mode,
                                                compression=compression,
                                                allowZip64=allowZip64)
        # track file to override in zip
        self._replace = {}
        # Whether the with statement was called
        self._allow_updates = False

    def writestr(self, zinfo_or_arcname, bytes, compress_type=None):
        if isinstance(zinfo_or_arcname, ZipInfo):
            name = zinfo_or_arcname.filename
        else:
            name = zinfo_or_arcname
        # If the file exits, and needs to be overridden,
        # mark the entry, and create a temp-file for it
        # we allow this only if the with statement is used
        if self._allow_updates and name in self.namelist():
            temp_file = self._replace[name] = self._replace.get(name,
                                                                tempfile.TemporaryFile())
            temp_file.write(bytes)
        # Otherwise just act normally
        else:
            super(UpdateableZipFile, self).writestr(zinfo_or_arcname,
                                                    bytes, compress_type=compress_type)

    def write(self, filename, arcname=None, compress_type=None):
        arcname = arcname or filename
        # If the file exits, and needs to be overridden,
        # mark the entry, and create a temp-file for it
        # we allow this only if the with statement is used
        if self._allow_updates and arcname in self.namelist():
            temp_file = self._replace[arcname] = self._replace.get(arcname,
                                                                   tempfile.TemporaryFile())
            with open(filename, "rb") as source:
                shutil.copyfileobj(source, temp_file)
        # Otherwise just act normally
        else:
            super(UpdateableZipFile, self).write(filename, 
                                                 arcname=arcname, compress_type=compress_type)

    def __enter__(self):
        # Allow updates
        self._allow_updates = True
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # call base to close zip file, organically
        try:
            super(UpdateableZipFile, self).__exit__(exc_type, exc_val, exc_tb)
            if len(self._replace) > 0:
                self._rebuild_zip()
        finally:
            # In case rebuild zip failed,
            # be sure to still release all the temp files
            self._close_all_temp_files()
            self._allow_updates = False

    def _close_all_temp_files(self):
        for temp_file in self._replace.itervalues():
            if hasattr(temp_file, 'close'):
                temp_file.close()

    def remove_file(self, path):
        self._replace[path] = self.DeleteMarker()

    def _rebuild_zip(self):
        tempdir = tempfile.mkdtemp()
        try:
            temp_zip_path = os.path.join(tempdir, 'new.zip')
            with ZipFile(self.filename, 'r') as zip_read:
                # Create new zip with assigned properties
                with ZipFile(temp_zip_path, 'w', compression=self.compression,
                             allowZip64=self._allowZip64) as zip_write:
                    for item in zip_read.infolist():
                        # Check if the file should be replaced / or deleted
                        replacement = self._replace.get(item.filename, None)
                        # If marked for deletion, do not copy file to new zipfile
                        if isinstance(replacement, self.DeleteMarker):
                            del self._replace[item.filename]
                            continue
                        # If marked for replacement, copy temp_file, instead of old file
                        elif replacement is not None:
                            del self._replace[item.filename]
                            # Write replacement to archive,
                            # and then close it (deleting the temp file)
                            replacement.seek(0)
                            data = replacement.read()
                            replacement.close()
                        else:
                            data = zip_read.read(item.filename)
                        zip_write.writestr(item, data)
            # Override the archive with the updated one
            shutil.move(temp_zip_path, self.filename)
        finally:
            shutil.rmtree(tempdir)

使用示例:

with UpdateableZipFile("C:\Temp\Test2.docx", "a") as o:
    # Overwrite a file with a string
    o.writestr("word/document.xml", "Some data")
    # exclude an exiting file from the zip
    o.remove_file("word/fontTable.xml")
    # Write a new file (with no conflict) to the zp
    o.writestr("new_file", "more data")
    # Overwrite a file with a file
    o.write(r"C:\Temp\example.png", "word/settings.xml")
48

用Python的zipfile模块是无法直接做到这一点的。你需要创建一个新的压缩文件,然后从第一个文件开始,把所有内容重新压缩一遍,还要加上新的修改过的文件。

下面有一些代码可以实现这个功能。不过要注意,这种方法效率不高,因为它会先解压所有数据,然后再重新压缩。

import tempfile
import zipfile
import shutil
import os

def remove_from_zip(zipfname, *filenames):
    tempdir = tempfile.mkdtemp()
    try:
        tempname = os.path.join(tempdir, 'new.zip')
        with zipfile.ZipFile(zipfname, 'r') as zipread:
            with zipfile.ZipFile(tempname, 'w') as zipwrite:
                for item in zipread.infolist():
                    if item.filename not in filenames:
                        data = zipread.read(item.filename)
                        zipwrite.writestr(item, data)
        shutil.move(tempname, zipfname)
    finally:
        shutil.rmtree(tempdir)

使用方法:

remove_from_zip('archive.zip', 'hello.txt')
with zipfile.ZipFile('archive.zip', 'a') as z:
    z.write('hello.txt')

撰写回答