使用subprocess在Python中逐行链式读写管道
我有一段代码,看起来可以在Python中通过子进程将多个命令串联起来,并且可以逐行读取和写入数据(而不是提前使用communicate()
)。这段代码调用了一个Unix命令(mycmd
),读取它的输出,然后把这些输出写入另一个Unix命令(next_cmd
)的输入,并把最后一个命令的输出重定向到一个文件中。
# some unix command that uses a pipe: command "a"
# writes to stdout and "b" reads it and writes to stdout
mycmd = "a | b"
mycmd_proc = subprocess.Popen(mycmd, shell=True,
stdin=sys.stdin,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# nextCmd reads from stdin, and I'm passing it mycmd's output
next_cmd = "nextCmd -stdin"
output_file = open(output_filename, "w")
next_proc = subprocess.Popen(next_cmd, shell=True,
stdin=subprocess.PIPE,
stdout=output_file)
for line in iter(mycmd.stdout.readline, ''):
# do something with line
# ...
# write it to next command
next_proc.stdin.write(line)
### If I wanted to call another command here that passes next_proc output
### line by line to another command, would I need
### to call next_proc.communicate() first?
next_proc.communicate()
output_file.close()
这个方法看起来有效,并且它只在命令结束时调用communicate()
。
我想扩展这段代码,添加另一个命令,这样你就可以做到:
mycmd1 | mycmd2 | mycmd3 > some_file
意思是:逐行读取来自Python的mycmd1的输出,处理这一行,然后把它传给mycmd2,读取mycmd2的输出,逐行处理它,再把它传给mycmd3,最后mycmd3的输出会放到some_file
中。这可能吗?还是说这会导致死锁、阻塞或者缓冲区未刷新?请注意,我并不是简单地将三个Unix命令串联在一起,因为我想在中间用Python进行干预,并逐行处理每个命令的输出,然后再传给下一个命令。
我想避免调用communicate并将所有输出加载到内存中——我希望逐行解析输出。谢谢。
1 个回答
这个方法可以处理任意数量的命令:
import sys
import subprocess
def processFirst(out):
return out
def processSecond(out):
return out
def processThird(out):
return out
commands = [("a|b", processFirst), ("nextCmd -stdin", processSecond), ("thirdCmd", processThird)]
previous_output = None
for cmd,process_func in commands:
if previous_output is None:
stdin = sys.stdin
else:
stdin = subprocess.PIPE
proc = subprocess.Popen(cmd, shell=True,
stdin = stdin,
stdout = subprocess.PIPE)
if previous_output is not None:
proc.stdin.write(previous_output)
out,err = proc.communicate()
out = process_func(out)
previous_output = out
你只需要把想要运行的命令添加到命令列表中,并指定一个函数来处理它的输出。最后一个命令的输出会在循环结束时存放在 previous_output
里。
为了避免出现死锁、缓冲等问题,你可以使用 proc.communicate()
来确保每个命令都能完整运行,这样它会返回输出(而不是像你例子中那样直接读取)。然后把这个输出传给下一个命令,再让它运行,依此类推。
补充一下:我刚注意到你不想一开始就使用 communicate()
,而是想逐行反应。我会稍后修改我的回答来解决这个问题。
这个回答提供了一个如何使用 select.select()
从管道逐行读取而不阻塞的例子。
下面是一个针对你具体情况的例子:
import sys
import subprocess
import select
import os
class LineReader(object):
def __init__(self, fd, process_func):
self._fd = fd
self._buf = ''
self._process_func = process_func
self.next_proc = None
def fileno(self):
return self._fd
def readlines(self):
data = os.read(self._fd, 4096)
if not data:
# EOF
if self.next_proc is not None:
self.next_proc.stdin.close()
return None
self._buf += data
if '\n' not in data:
return []
tmp = self._buf.split('\n')
tmp_lines, self._buf = tmp[:-1], tmp[-1]
lines = []
for line in tmp_lines:
lines.append(self._process_func(line))
if self.next_proc is not None:
self.next_proc.stdin.write("%s\n" % lines[-1])
return lines
def processFirst(line):
return line
def processSecond(line):
return line
def processThird(line):
return line
commands = [("a|b", processFirst), ("nextCmd -stdin", processSecond), ("thirdCmd", processThird)]
readers = []
previous_reader = None
for cmd,process_func in commands:
if previous_reader is None:
stdin = sys.stdin
else:
stdin = subprocess.PIPE
proc = subprocess.Popen(cmd, shell=True,
stdin = stdin,
stdout = subprocess.PIPE)
if previous_reader is not None:
previous_reader.next_proc = proc
previous_reader = LineReader(proc.stdout.fileno(), process_func)
readers.append(previous_reader)
while readers:
ready,_,_ = select.select(readers, [], [], 10.0)
for stream in ready:
lines = stream.readlines()
if lines is None:
readers.remove(stream)