使用Python的select模块检查文件描述符是否有更多数据可读
我有一个程序,它在一个线程里创建了一个子进程,这样这个线程就可以不断检查特定的输出情况(来自标准输出或标准错误),并在合适的时候调用相应的回调函数,而程序的其他部分可以继续运行。下面是这个代码的简化版本:
import select
import subprocess
import threading
def run_task():
command = ['python', 'a-script-that-outputs-lines.py']
proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
while True:
ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)
if proc.stdout in ready:
next_line_to_process = proc.stdout.readline()
# process the output
if proc.stderr in ready:
next_line_to_process = proc.stderr.readline()
# process the output
if not ready and proc.poll() is not None:
break
thread = threading.Thread(target = run_task)
thread.run()
这个方法运行得还不错,但我希望线程在满足两个条件后能够退出:子进程已经结束,并且标准输出和标准错误中的所有数据都已经处理完。
我遇到的困难是,如果我最后的条件像上面那样写(if not ready and proc.poll() is not None
),那么线程就永远不会退出,因为一旦标准输出和标准错误的文件描述符被标记为准备好,它们就不会再变为不准备好(即使所有数据都已经从中读取,read()
会卡住,或者 readline()
会返回一个空字符串)。
如果我把条件改成只检查 if proc.poll() is not None
,那么程序退出时循环会结束,但我不能保证它已经看完了所有需要处理的数据。
这是错误的方法吗?还是说有办法可靠地判断你是否已经读取了所有会写入到文件描述符的数据?或者这是特定于尝试读取子进程的标准错误/标准输出的问题?
我在 Python 2.5(在 OS X 上运行)上尝试过这个方法,也在 Python 2.6(在 Debian 的 2.6 内核上运行)上尝试过 select.poll()
和 select.epoll()
的变体。
3 个回答
你可以直接用 os.read(fd, size)
来读取管道的文件描述符,而不是使用 readline()
。这个操作是非阻塞的,也就是说它不会让程序停下来等数据到来,而且它还能检测到文件结束(如果遇到文件结束,它会返回一个空字符串或字节对象)。不过,你需要自己来实现行的分割和缓冲。可以参考下面的例子:
class NonblockingReader():
def __init__(self, pipe):
self.fd = pipe.fileno()
self.buffer = ""
def readlines(self):
data = os.read(self.fd, 2048)
if not data:
return None
self.buffer += data
if os.linesep in self.buffer:
lines = self.buffer.split(os.linesep)
self.buffer = lines[-1]
return lines[:-1]
else:
return []
我最终找到的解决办法,如上所述,是这样的,如果对谁有帮助的话。我觉得这个方法是对的,因为我现在有97.2%的把握,光靠 select()
、poll()
和 read()
是无法做到这一点的:
import select
import subprocess
import threading
def run_task():
command = ['python', 'a-script-that-outputs-lines.py']
proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
while True:
ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)
if proc.stdout in ready:
next_line_to_process = proc.stdout.readline()
if next_line_to_process:
# process the output
elif proc.returncode is not None:
# The program has exited, and we have read everything written to stdout
ready = filter(lambda x: x is not proc.stdout, ready)
if proc.stderr in ready:
next_line_to_process = proc.stderr.readline()
if next_line_to_process:
# process the output
elif proc.returncode is not None:
# The program has exited, and we have read everything written to stderr
ready = filter(lambda x: x is not proc.stderr, ready)
if proc.poll() is not None and not ready:
break
thread = threading.Thread(target = run_task)
thread.run()
如果你想知道能否在不阻塞的情况下从管道中读取数据,select
模块是合适的选择。
为了确保你已经读取了所有数据,可以用一个更简单的条件:if proc.poll() is not None: break
,然后在循环结束后调用 rest = [pipe.read() for pipe in [p.stdout, p.stderr]]
。
子进程在关闭之前不太可能会关闭它的标准输出或标准错误,所以为了简单起见,你可以跳过处理结束标志(EOF)的逻辑。
不要直接调用 Thread.run()
,应该使用 Thread.start()
。其实在这里你可能根本不需要单独的线程。
在调用 select()
后,不要调用 p.stdout.readline()
,因为这可能会阻塞,应该使用 os.read(p.stdout.fileno(), limit)
。空的字节串表示对应的管道已经到达结束。
作为替代方案,或者作为补充,你可以使用 fcntl
模块将管道设置为非阻塞模式:
import os
from fcntl import fcntl, F_GETFL, F_SETFL
def make_nonblocking(fd):
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK)
并在读取时处理输入输出/操作系统错误。