在使用subprocess.Popen时将大量数据传送到stdin
我有点儿搞不懂用Python来解决这个简单问题的正确方法。
我的问题其实很简单。如果你用下面的代码,它会卡住。这在子进程模块的文档中有详细说明。
import subprocess
proc = subprocess.Popen(['cat','-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
for i in range(100000):
proc.stdin.write('%d\n' % i)
output = proc.communicate()[0]
print output
在寻找解决方案的过程中(有一个非常有见地的讨论,但我现在找不到了),我发现了这个解决方案(还有其他的),它使用了一个明确的分叉:
import os
import sys
from subprocess import Popen, PIPE
def produce(to_sed):
for i in range(100000):
to_sed.write("%d\n" % i)
to_sed.flush()
#this would happen implicitly, anyway, but is here for the example
to_sed.close()
def consume(from_sed):
while 1:
res = from_sed.readline()
if not res:
sys.exit(0)
#sys.exit(proc.poll())
print 'received: ', [res]
def main():
proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE)
to_sed = proc.stdin
from_sed = proc.stdout
pid = os.fork()
if pid == 0 :
from_sed.close()
produce(to_sed)
return
else :
to_sed.close()
consume(from_sed)
if __name__ == '__main__':
main()
虽然这个解决方案在概念上很容易理解,但它使用了一个额外的进程,而且相比于子进程模块来说,显得太底层了(子进程模块的存在就是为了隐藏这些复杂的东西……)。
我在想:有没有一种简单干净的解决方案,使用子进程模块而不会卡住,还是说我必须退一步,使用旧式的选择循环或者明确的分叉来实现这个模式?
谢谢
10 个回答
你的代码会在cat
的输出管道缓冲区满的时候发生死锁。如果你使用stdout=PIPE
,那么你必须及时处理这些输出,否则就会像你遇到的那样出现死锁。
如果你在进程运行时不需要输出,可以把它重定向到一个临时文件中:
#!/usr/bin/env python3
import subprocess
import tempfile
with tempfile.TemporaryFile('r+') as output_file:
with subprocess.Popen(['cat'],
stdin=subprocess.PIPE,
stdout=output_file,
universal_newlines=True) as process:
for i in range(100000):
print(i, file=process.stdin)
output_file.seek(0) # rewind (and sync with the disk)
print(output_file.readline(), end='') # get the first line of the output
如果输入和输出的数据量不大(可以放进内存里),你可以一次性传入所有输入,并通过.communicate()
一次性获取所有输出,这个方法会帮你同时读取和写入:
#!/usr/bin/env python3
import subprocess
cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]),
stdout=subprocess.PIPE, universal_newlines=True)
print(cp.stdout.splitlines()[-1]) # print the last line
如果你想手动实现同时读取和写入,可以使用线程、异步编程、fcntl等方法。@Jed 提供了一个简单的基于线程的解决方案。这里有一个基于asyncio
的解决方案:
#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE
async def pump_input(writer):
try:
for i in range(100000):
writer.write(b'%d\n' % i)
await writer.drain()
finally:
writer.close()
async def run():
# start child process
# NOTE: universal_newlines parameter is not supported
process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE)
asyncio.ensure_future(pump_input(process.stdin)) # write input
async for line in process.stdout: # consume output
print(int(line)**2) # print squares
return await process.wait() # wait for the child process to exit
if sys.platform.startswith('win'):
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
在Unix系统上,你可以使用基于fcntl
的解决方案:
#!/usr/bin/env python3
import sys
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK
from shutil import copyfileobj
from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF
def make_blocking(pipe, blocking=True):
fd = pipe.fileno()
if not blocking:
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK
else:
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it
with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process:
make_blocking(process.stdout, blocking=False)
with process.stdin:
for i in range(100000):
#NOTE: the mode is block-buffered (default) and therefore
# `cat` won't see it immidiately
process.stdin.write(b'%d\n' % i)
# a deadblock may happen here with a *blocking* pipe
output = process.stdout.read(PIPE_BUF)
if output is not None:
sys.stdout.buffer.write(output)
# read the rest
make_blocking(process.stdout)
copyfileobj(process.stdout, sys.stdout.buffer)
如果你不想把所有的数据都放在内存里,你就需要使用选择(select)。比如说,你可以这样做:
import subprocess
from select import select
import os
proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
i = 0;
while True:
rlist, wlist, xlist = [proc.stdout], [], []
if i < 100000:
wlist.append(proc.stdin)
rlist, wlist, xlist = select(rlist, wlist, xlist)
if proc.stdout in rlist:
out = os.read(proc.stdout.fileno(), 10)
print out,
if not out:
break
if proc.stdin in wlist:
proc.stdin.write('%d\n' % i)
i += 1
if i >= 100000:
proc.stdin.close()
如果你想要一个纯Python的解决方案,你需要把读取器或写入器放在一个单独的线程里。threading
这个包是个轻量级的选择,它让你可以方便地访问常用的对象,而且没有复杂的分叉问题。
import subprocess
import threading
import sys
proc = subprocess.Popen(['cat','-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
def writer():
for i in range(100000):
proc.stdin.write(b'%d\n' % i)
proc.stdin.close()
thread = threading.Thread(target=writer)
thread.start()
for line in proc.stdout:
sys.stdout.write(line.decode())
thread.join()
proc.wait()
如果能把subprocess
模块更新一下,让它支持流和协程,那就太好了。这样一来,就可以更优雅地构建同时使用Python和命令行的管道。