如何将多个文件连接为Popen的标准输入

9 投票
4 回答
3929 浏览
提问于 2025-04-16 17:52

我正在把一个bash脚本移植到python 2.6,想要替换一些代码:

cat $( ls -tr xyz_`date +%F`_*.log ) | filter args > bzip2

我想要的东西类似于在http://docs.python.org/release/2.6/library/subprocess.html上提到的“替换shell管道”的例子,像这样...

p1 = Popen(["filter", "args"], stdin=*?WHAT?*, stdout=PIPE)
p2 = Popen(["bzip2"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]

但是,我不太确定怎么最好地提供p1stdin值,以便它可以把输入文件连接起来。看起来我可以添加...

p0 = Popen(["cat", "file1", "file2"...], stdout=PIPE)
p1 = ... stdin=p0.stdout ...

...但这似乎超出了使用(慢且效率低下的)管道来调用外部程序的范围,这些程序有很强的功能。(任何一个好的shell都会在内部执行cat。)

所以,我可以想象一个自定义类,满足文件对象的API要求,因此可以用于p1的stdin,连接任意其他文件对象。(编辑:已有的回答解释了为什么这不可能

python 2.6有没有什么机制来满足这个需求,或者在python圈子里,另一个Popen去调用cat是否被认为是完全可以接受的呢?

谢谢。

4 个回答

1

这应该很简单。首先,使用 os.pipe 创建一个 管道,然后用管道的读端作为标准输入来启动 filter。接着,对于目录中每个符合模式的文件,只需将其内容传递给管道的写端。这和命令行中的 cat ..._*.log | filter args 完全一样。

更新:抱歉,其实不需要使用 os.pipe 创建管道,我忘了 subprocess.Popen(..., stdin=subprocess.PIPE) 实际上会为你创建一个管道。此外,管道不能存放太多数据,只有在之前的数据被读取后,才能继续写入更多数据。

所以解决方案(比如使用 wc -l)可以是:

import glob
import subprocess

p = subprocess.Popen(["wc", "-l"], stdin=subprocess.PIPE)

processDate = "2011-05-18"  # or time.strftime("%F")
for name in glob.glob("xyz_%s_*.log" % processDate):
    f = open(name, "rb")
    # copy all data from f to p.stdin
    while True:
        data = f.read(8192)
        if not data:
            break  # reached end of file
        p.stdin.write(data)
    f.close()

p.stdin.close()
p.wait()

使用示例:

$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_a.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_b.log 
$ hexdump /dev/urandom | head -n 10000 >xyz_2011-05-18_c.log 
$ ./example.py 
   30000
2

如果你查看一下 subprocess 模块的实现,你会发现 std{in,out,err} 需要是支持 fileno() 方法的文件对象。所以,简单地把一个像文件一样的对象(比如 Python 接口的对象,或者甚至是 StringIO 对象)拼接在一起是不合适的。

如果使用的是迭代器,而不是文件对象,你可以用 itertools.chain 来处理。

当然,如果不在乎内存的使用,你可以这样做:

import itertools, os

# ...

files = [f for f in os.listdir(".") if os.path.isfile(f)]
input = ''.join(itertools.chain(open(file) for file in files))
p2.communicate(input)
6

你可以用Python代码替代你正在做的所有事情,除了外部工具。这样,只要你的外部工具是可移植的,你的程序也会保持可移植性。你还可以考虑把C++程序变成一个库,然后用Cython来和它对接。正如Messa所展示的,date可以用time.strftime来替代,文件匹配可以用glob.glob来完成,而cat则可以通过读取文件列表中的所有文件并将它们写入你程序的输入来替代。调用bzip2可以用bz2模块来替代,但这样会让你的程序变得复杂,因为你需要同时进行读写。为了做到这一点,你需要使用p.communicate或者如果数据量很大,可以使用线程(select.select是更好的选择,但在Windows上不适用)。

import sys
import bz2
import glob
import time
import threading
import subprocess

output_filename = '../whatever.bz2'
input_filenames = glob.glob(time.strftime("xyz_%F_*.log"))
p = subprocess.Popen(['filter', 'args'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
output = open(output_filename, 'wb')
output_compressor = bz2.BZ2Compressor()

def data_reader():
    for filename in input_filenames:
        f = open(filename, 'rb')
        p.stdin.writelines(iter(lambda: f.read(8192), ''))
    p.stdin.close()

input_thread = threading.Thread(target=data_reader)
input_thread.start()

with output:
    for chunk in iter(lambda: p.stdout.read(8192), ''):
        output.write(output_compressor.compress(chunk))

    output.write(output_compressor.flush())

input_thread.join()
p.wait()

补充:如何检测文件输入类型

你可以通过文件扩展名或者使用Python的libmagic库来检测文件是如何压缩的。这里有一个代码示例,它同时做了这两件事,并且如果可用的话会自动选择magic。你可以根据自己的需求取用合适的部分并进行调整。open_autodecompress应该能够检测mime编码,并在可用时用合适的解压缩器打开文件。

import os
import gzip
import bz2
try:
    import magic
except ImportError:
    has_magic = False
else:
    has_magic = True


mime_openers = {
    'application/x-bzip2': bz2.BZ2File,
    'application/x-gzip': gzip.GzipFile,
}

ext_openers = {
    '.bz2': bz2.BZ2File,
    '.gz': gzip.GzipFile,
}


def open_autodecompress(filename, mode='r'):
    if has_magic:
        ms = magic.open(magic.MAGIC_MIME_TYPE)
        ms.load()
        mimetype = ms.file(filename)
        opener = mime_openers.get(mimetype, open)
    else:
        basepart, ext = os.path.splitext(filename)
        opener = ext_openers.get(ext, open)
    return opener(filename, mode)

撰写回答