在使用subprocess.Popen时将大量数据传送到stdin

18 投票
10 回答
15235 浏览
提问于 2025-04-16 17:07

我有点儿搞不懂用Python来解决这个简单问题的正确方法。

我的问题其实很简单。如果你用下面的代码,它会卡住。这在子进程模块的文档中有详细说明。

import subprocess

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )
for i in range(100000):
    proc.stdin.write('%d\n' % i)
output = proc.communicate()[0]
print output

在寻找解决方案的过程中(有一个非常有见地的讨论,但我现在找不到了),我发现了这个解决方案(还有其他的),它使用了一个明确的分叉:

import os
import sys
from subprocess import Popen, PIPE

def produce(to_sed):
    for i in range(100000):
        to_sed.write("%d\n" % i)
        to_sed.flush()
    #this would happen implicitly, anyway, but is here for the example
    to_sed.close()

def consume(from_sed):
    while 1:
        res = from_sed.readline()
        if not res:
            sys.exit(0)
            #sys.exit(proc.poll())
        print 'received: ', [res]

def main():
    proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE)
    to_sed = proc.stdin
    from_sed = proc.stdout

    pid = os.fork()
    if pid == 0 :
        from_sed.close()
        produce(to_sed)
        return
    else :
        to_sed.close()
        consume(from_sed)

if __name__ == '__main__':
    main()

虽然这个解决方案在概念上很容易理解,但它使用了一个额外的进程,而且相比于子进程模块来说,显得太底层了(子进程模块的存在就是为了隐藏这些复杂的东西……)。

我在想:有没有一种简单干净的解决方案,使用子进程模块而不会卡住,还是说我必须退一步,使用旧式的选择循环或者明确的分叉来实现这个模式?

谢谢

10 个回答

4

你的代码会在cat的输出管道缓冲区满的时候发生死锁。如果你使用stdout=PIPE,那么你必须及时处理这些输出,否则就会像你遇到的那样出现死锁。

如果你在进程运行时不需要输出,可以把它重定向到一个临时文件中:

#!/usr/bin/env python3
import subprocess
import tempfile

with tempfile.TemporaryFile('r+') as output_file:
    with subprocess.Popen(['cat'],
                          stdin=subprocess.PIPE,
                          stdout=output_file,
                          universal_newlines=True) as process:
        for i in range(100000):
            print(i, file=process.stdin)
    output_file.seek(0)  # rewind (and sync with the disk)
    print(output_file.readline(), end='')  # get  the first line of the output

如果输入和输出的数据量不大(可以放进内存里),你可以一次性传入所有输入,并通过.communicate()一次性获取所有输出,这个方法会帮你同时读取和写入:

#!/usr/bin/env python3
import subprocess

cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]),
                    stdout=subprocess.PIPE, universal_newlines=True)
print(cp.stdout.splitlines()[-1]) # print the last line

如果你想手动实现同时读取和写入,可以使用线程、异步编程、fcntl等方法。@Jed 提供了一个简单的基于线程的解决方案。这里有一个基于asyncio的解决方案:

#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE

async def pump_input(writer):
     try:
         for i in range(100000):
             writer.write(b'%d\n' % i)
             await writer.drain()
     finally:
         writer.close()

async def run():
    # start child process
    # NOTE: universal_newlines parameter is not supported
    process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(pump_input(process.stdin)) # write input
    async for line in process.stdout: # consume output
        print(int(line)**2) # print squares
    return await process.wait()  # wait for the child process to exit


if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

在Unix系统上,你可以使用基于fcntl的解决方案:

#!/usr/bin/env python3
import sys
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK
from shutil import copyfileobj
from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF

def make_blocking(pipe, blocking=True):
    fd = pipe.fileno()
    if not blocking:
        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK
    else:
        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it


with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process:
    make_blocking(process.stdout, blocking=False)
    with process.stdin:
        for i in range(100000):
            #NOTE: the mode is block-buffered (default) and therefore
            # `cat` won't see it immidiately
            process.stdin.write(b'%d\n' % i)
            # a deadblock may happen here with a *blocking* pipe
            output = process.stdout.read(PIPE_BUF)
            if output is not None:
                sys.stdout.buffer.write(output)
    # read the rest
    make_blocking(process.stdout)
    copyfileobj(process.stdout, sys.stdout.buffer)
7

如果你不想把所有的数据都放在内存里,你就需要使用选择(select)。比如说,你可以这样做:

import subprocess
from select import select
import os

proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

i = 0;
while True:
    rlist, wlist, xlist = [proc.stdout], [], []
    if i < 100000:
        wlist.append(proc.stdin)
    rlist, wlist, xlist = select(rlist, wlist, xlist)
    if proc.stdout in rlist:
        out = os.read(proc.stdout.fileno(), 10)
        print out,
        if not out:
            break
    if proc.stdin in wlist:
        proc.stdin.write('%d\n' % i)
        i += 1
        if i >= 100000:
            proc.stdin.close()
13

如果你想要一个纯Python的解决方案,你需要把读取器或写入器放在一个单独的线程里。threading这个包是个轻量级的选择,它让你可以方便地访问常用的对象,而且没有复杂的分叉问题。

import subprocess
import threading
import sys

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )
def writer():
    for i in range(100000):
        proc.stdin.write(b'%d\n' % i)
    proc.stdin.close()
thread = threading.Thread(target=writer)
thread.start()
for line in proc.stdout:
    sys.stdout.write(line.decode())
thread.join()
proc.wait()

如果能把subprocess模块更新一下,让它支持流和协程,那就太好了。这样一来,就可以更优雅地构建同时使用Python和命令行的管道。

撰写回答