怎样在Python中把输出复用到OS文件描述符?

4 投票
1 回答
1066 浏览
提问于 2025-04-16 13:07

subprocess.Popen这个机制使用的是一个底层的文件描述符,而不是像文件那样的对象,来写入它的stdout/stderr。我需要同时捕获stdoutstderr,同时还要在控制台上显示它们。

我该如何创建一个文件描述符,让Popen可以使用,从而实现这个需求呢?

1 个回答

3

先给你点背景知识:subprocess 这个模块会使用你指定的 stdinstdoutstderr 的原始文件描述符,因为它会把这些信息传递给 POSIX 系统。如果你使用 subprocess.PIPE,那么它会用 os.pipe() 创建一个新的管道。此外,Popen.communicate 会一直读取数据直到流结束,如果你想把数据传到其他地方,这样做可能就不太合适了。

既然你想把输出打印到 stdout,我猜测你是想处理文本输出。你需要在 Popen 中使用 encodingerrorsuniversal_newlines,这样 subprocess 才会把文件当作文本来处理(具体可以参考 文档)。

import subprocess

p = subprocess.Popen(
    '/usr/bin/whoami',
    stdout=subprocess.PIPE,  # Control stdout
    universal_newlines=True  # Files opened in text mode
)

# Pipe the data somewhere else too, e.g.: a log file
with open('subprocess.log', 'w') as logfile:
    # p.poll() returns the return code when `p` exits
    while p.poll() is None:
        line = p.stdout.readline()
        # one to our stdout (readline includes the \n)
        print(line, end='')
        # one to the logfile
        logfile.write(line)

同样的技巧也可以用来处理 stderr,比如通过在 print 中传入 file=sys.stderr。注意,你也可以直接从自己的 stdin 读取数据,只需直接传入即可:

subprocess.Popen('/usr/bin/whoami', stdin=sys.stdin, stdout=subprocess.PIPE, ...)

毕竟,标准流只是对文件描述符的封装。如果读取到行末不适合你期待的输出类型,你可以选择 read 一个非常短的缓冲区。

同时处理 stderrstdout

如果你需要同时处理 stdoutstderr,你会遇到一个问题:你只能一次读取一个流。
一种方法是使用 os.set_blocking 将管道设置为非阻塞,这样如果没有数据,任何 read 方法都会立即返回。这让你可以在两个流之间交替读取。
另一种方法是使用两个独立的线程来处理 stdoutstderr;不过有一种更简单的方法可以通过 asyncio 模块 来实现:

import asyncio
import sys

PROCESS_PATH = '/bin/mixed_output'

class MultiplexProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future

    def pipe_data_received(self, fd, data):
        if fd == sys.stdout.fileno():
            print(data.decode('utf-8'), file=sys.stdout, end='')
        elif fd == sys.stderr.fileno():
            print(data.decode('utf-8'), file=sys.stderr, end='')

    def process_exited(self):
        self.exit_future.set_result(True)


async def launch_subprocess(loop):
    # Future marking the end of the process
    exit_future = asyncio.Future(loop=loop)
    # Use asyncio's subprocess
    create_subp = loop.subprocess_exec(
        lambda: MultiplexProtocol(exit_future),
        PROCESS_PATH,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        stdin=None
    )
    transport, protocol = await create_subp
    await exit_future
    # Close the pipes
    transport.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(launch_subprocess(loop))

这样做比在主进程中不断循环以将数据传递到其他流要省 CPU,因为 MultiplexProtocol.pipe_data_received 只在需要时被调用。

撰写回答