如何使用Python多进程池处理tar文件?

8 投票
1 回答
6818 浏览
提问于 2025-04-17 06:57

我正在尝试使用 multiprocessing.Pool 来处理一个 tar 文件的内容。我已经成功地在 multiprocessing 模块中使用了线程池的实现,但我想用进程来代替线程,因为这样可能会更快,并且可以避免为 Matplotlib 处理多线程环境所做的一些更改。我遇到了一个错误,我怀疑这个错误与进程之间不能共享地址空间有关,但我不太确定该如何解决:

Traceback (most recent call last):
  File "test_tarfile.py", line 32, in <module>
    test_multiproc()
  File "test_tarfile.py", line 24, in test_multiproc
    pool.map(read_file, files)
  File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 225, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 522, in get
    raise self._value
ValueError: I/O operation on closed file

实际的程序要复杂一些,但这是我正在做的一个示例,可以重现这个错误:

from multiprocessing.pool import ThreadPool, Pool
import StringIO
import tarfile

def write_tar():
    tar = tarfile.open('test.tar', 'w')
    contents = 'line1'
    info = tarfile.TarInfo('file1.txt')
    info.size = len(contents)
    tar.addfile(info, StringIO.StringIO(contents))
    tar.close()

def test_multithread():
    tar   = tarfile.open('test.tar')
    files = [tar.extractfile(member) for member in tar.getmembers()]
    pool  = ThreadPool(processes=1)
    pool.map(read_file, files)
    tar.close()

def test_multiproc():
    tar   = tarfile.open('test.tar')
    files = [tar.extractfile(member) for member in tar.getmembers()]
    pool  = Pool(processes=1)
    pool.map(read_file, files)
    tar.close()

def read_file(f):
    print f.read()

write_tar()
test_multithread()
test_multiproc()

我怀疑在将 TarInfo 对象传递到另一个进程时出现了问题,但父进程的 TarFile 没有传递过去,我不太确定在多进程的情况下该如何解决。有没有办法在不从 tar 包中提取文件并写入磁盘的情况下做到这一点?

1 个回答

7

你并不是把一个 TarInfo 对象传递给另一个进程,而是把 tar.extractfile(member) 的结果传递过去,其中 member 是一个 TarInfo 对象。extractfile(...) 方法返回的是一个类似文件的对象,这个对象有很多功能,其中之一就是 read() 方法,它可以用来读取你之前用 tar = tarfile.open('test.tar') 打开的原始 tar 文件。

不过,你不能在一个进程中使用另一个进程打开的文件,你必须重新打开这个文件。我把你的 test_multiproc() 替换成了这个:

def test_multiproc():
    tar   = tarfile.open('test.tar')
    files = [name for name in tar.getnames()]
    pool  = Pool(processes=1)
    result = pool.map(read_file2, files)
    tar.close()

然后我加上了这个:

def read_file2(name):
    t2 = tarfile.open('test.tar')
    print t2.extractfile(name).read()
    t2.close()

这样就能让你的代码正常工作了。

撰写回答