Python: select()未能信号管道中的所有输入

10 投票
1 回答
4412 浏览
提问于 2025-04-16 14:44

我正在尝试用Python加载一个外部的命令行程序,并通过管道与它进行通信。这个程序通过标准输入(stdin)接收文本输入,并将文本输出以行的形式发送到标准输出(stdout)。我希望使用select()来实现异步通信。

问题是,并不是程序的所有输出都能在select()中被检测到。通常最后一到两行输出不会被信号通知。如果select()超时返回后,我尝试从管道读取数据,readline()会立即返回程序发送的那一行。下面是代码示例。

这个程序并没有对输出进行缓冲,所有的输出都是以文本行的形式发送的。在其他很多语言和环境中,通过管道连接到这个程序一直都很顺利。

我在Mac OSX 10.6上尝试了Python 3.1和3.2。

import subprocess
import select

engine = subprocess.Popen("Engine", bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
engine.stdin.write(b"go\n")
engine.stdin.flush()

while True:
    inputready,outputready,exceptready = select.select( [engine.stdout.fileno()] , [], [], 10.0)

    if (inputready, outputready, exceptready) == ([], [], []):
        print("trying to read from engine anyway...")
        line = engine.stdout.readline()
        print(line)

     for s in inputready:
        line = engine.stdout.readline()
        print(line)

1 个回答

17

需要注意的是,内部的 file.readlines([size]) 会循环调用 read() 系统调用多次,试图填满一个大小为 size 的内部缓冲区。第一次调用 read() 会立即返回,因为 select() 表示文件描述符是可读的。但是第二次调用会被阻塞,直到有数据可用,这样就失去了使用 select 的意义。总之,在异步应用中使用 file.readlines([size]) 是比较棘手的。

你应该在每次通过 select 时,对每个文件描述符调用一次 os.read(fd, size)。这样可以进行非阻塞读取,让你可以缓冲部分行,直到数据可用,并且可以明确检测到文件结束(EOF)。

我修改了你的代码,演示如何使用 os.read。它还从进程的 stderr 中读取数据:

import os
import select
import subprocess
from cStringIO import StringIO

target = 'Engine'
PIPE = subprocess.PIPE
engine = subprocess.Popen(target, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE)
engine.stdin.write(b"go\n")
engine.stdin.flush()

class LineReader(object):

    def __init__(self, fd):
        self._fd = fd
        self._buf = ''

    def fileno(self):
        return self._fd

    def readlines(self):
        data = os.read(self._fd, 4096)
        if not data:
            # EOF
            return None
        self._buf += data
        if '\n' not in data:
            return []
        tmp = self._buf.split('\n')
        lines, self._buf = tmp[:-1], tmp[-1]
        return lines

proc_stdout = LineReader(engine.stdout.fileno())
proc_stderr = LineReader(engine.stderr.fileno())
readable = [proc_stdout, proc_stderr]

while readable:
    ready = select.select(readable, [], [], 10.0)[0]
    if not ready:
        continue
    for stream in ready:
        lines = stream.readlines()
        if lines is None:
            # got EOF on this stream
            readable.remove(stream)
            continue
        for line in lines:
            print line

撰写回答