如何从Python迭代器向子进程的标准输入提供数据?

10 投票
4 回答
3118 浏览
提问于 2025-04-16 22:37

我正在尝试在Python中使用subprocess模块,与一个可以读取标准输入并以流的方式写入标准输出的进程进行通信。我希望这个子进程能够从一个生成输入的迭代器中逐行读取数据,然后再从子进程中读取输出的行。输入和输出的行可能并不是一一对应的。我该如何让子进程从一个返回字符串的任意迭代器中获取数据呢?

这里有一些示例代码,展示了一个简单的测试案例,以及我尝试过但由于某种原因没有成功的方法:

#!/usr/bin/python
from subprocess import *
# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

# I thought that stdin could be any iterable, but it actually wants a
# filehandle, so this fails with an error.
subproc = Popen("cat", stdin=input_iterator, stdout=PIPE)

# This works, but it first sends *all* the input at once, then returns
# *all* the output as a string, rather than giving me an iterator over
# the output. This uses up all my memory, because the input is several
# hundred million lines.
subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
output, error = subproc.communicate("".join(input_iterator))
output_lines = output.split("\n")

那么,我该如何让我的子进程逐行读取迭代器的数据,同时我也能逐行读取它的标准输出呢?

4 个回答

0

可以参考这个链接。这是一个对subprocess模块的扩展,支持异步输入输出。不过,这个扩展还是需要你的子进程能够对每一行输入或者一组输入行做出回应,并返回一部分输出。

4

要从一个Python的迭代器给一个子进程的标准输入提供数据,可以这样做:

#!/usr/bin/env python3 
from subprocess import Popen, PIPE

with Popen("sink", stdin=PIPE, bufsize=-1) as process:
    for chunk in input_iterator:
        process.stdin.write(chunk)

如果你想同时读取输出,那么你需要使用线程或者async.io:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def writelines(writer, lines):
    # NOTE: can't use writer.writelines(lines) here because it tries to write
    # all at once
    with closing(writer):
        for line in lines:
            writer.write(line)
            await writer.drain()

async def main():
    input_iterator = (b"hello %d\n" % x for x in range(100000000))
    process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(writelines(process.stdin, input_iterator))
    async for line in process.stdout:
        sys.stdout.buffer.write(line)
    return await process.wait()

if sys.platform == 'win32':
    loop = asyncio.ProactorEventLoop()  # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
with closing(loop):
    sys.exit(loop.run_until_complete(main()))
5

简单的方法似乎是从子进程中分叉,然后把输入的处理交给它。有没有人能详细说说这样做可能会有什么缺点?或者有没有什么Python模块可以让这个过程更简单、更安全?

#!/usr/bin/python
from subprocess import *
import os

def fork_and_input(input, handle):
    """Send input to handle in a child process."""
    # Make sure input is iterable before forking
    input = iter(input)
    if os.fork():
        # Parent
        handle.close()
    else:
        # Child
        try:
            handle.writelines(input)
            handle.close()
        # An IOError here means some *other* part of the program
        # crashed, so don't complain here.
        except IOError:
            pass
        os._exit()

# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
fork_and_input(input_iterator, subproc.stdin)

for line in subproc.stdout:
    print line,

撰写回答