Python 子进程带超时和大输出(>64K)

11 投票
2 回答
11824 浏览
提问于 2025-04-16 03:19

我想要执行一个进程,并设置一个超时时间(以秒为单位),同时获取这个进程产生的输出。我希望在Windows、Linux和FreeBSD上都能做到这一点。

我尝试了三种不同的方法:

  1. cmd - 没有设置超时,并使用subprocess.PIPE来捕获输出。

    表现:按预期工作,但不支持超时,我需要超时功能...

  2. cmd_to - 设置了超时,并使用subprocess.PIPE来捕获输出。

    表现:当输出大于等于65536字节时,会阻塞子进程的执行。

  3. cmd_totf - 设置了超时,并使用tempfile.NamedTemporaryfile来捕获输出。

    表现:按预期工作,但会在磁盘上使用临时文件。

下面可以更仔细地查看这些实现。

从下面的输出可以看出,当使用subprocess.PIPE并且子进程的输出大于等于65536字节时,超时代码会阻塞子进程的执行。

子进程的文档说明,当调用process.wait()并使用subprocess.PIPE时,这是预期的行为,但在使用process.poll()时并没有给出警告,那么这里到底出了什么问题呢?

我在cmd_totf中有一个解决方案,它使用了tempfile模块,但缺点是它会把输出写入磁盘,这我真的很想避免。

所以我的问题是:

  • 我在cmd_to中做错了什么?
  • 有没有办法实现我想要的功能,而不使用临时文件,或者把输出保存在内存中。

生成大量输出的脚本('exp_gen.py'):

#!/usr/bin/env python
import sys
output  = "b"*int(sys.argv[1])
print output

围绕subprocessing.Popen的三种不同实现(cmd、cmd_to、cmd_totf):

#!/usr/bin/env python
import subprocess, time, tempfile
bufsize = -1

def cmd(cmdline, timeout=60):
  """
  Execute cmdline.
  Uses subprocessing and subprocess.PIPE.
  """

  p = subprocess.Popen(
    cmdline,
    bufsize = bufsize,
    shell   = False,
    stdin   = subprocess.PIPE,
    stdout  = subprocess.PIPE,
    stderr  = subprocess.PIPE
  )

  out, err    = p.communicate()
  returncode  = p.returncode

  return (returncode, err, out)

def cmd_to(cmdline, timeout=60):
  """
  Execute cmdline, limit execution time to 'timeout' seconds.
  Uses subprocessing and subprocess.PIPE.
  """

  p = subprocess.Popen(
    cmdline,
    bufsize = bufsize,
    shell   = False,
    stdin   = subprocess.PIPE,
    stdout  = subprocess.PIPE,
    stderr  = subprocess.PIPE
  )

  t_begin         = time.time()             # Monitor execution time
  seconds_passed  = 0  

  while p.poll() is None and seconds_passed < timeout:
    seconds_passed = time.time() - t_begin
    time.sleep(0.1)

  #if seconds_passed > timeout:
  #
  #  try:
  #    p.stdout.close()  # If they are not closed the fds will hang around until
  #    p.stderr.close()  # os.fdlimit is exceeded and cause a nasty exception
  #    p.terminate()     # Important to close the fds prior to terminating the process!
  #                      # NOTE: Are there any other "non-freed" resources?
  #  except:
  #    pass
  #  
  #  raise TimeoutInterrupt

  out, err    = p.communicate()
  returncode  = p.returncode

  return (returncode, err, out)

def cmd_totf(cmdline, timeout=60):
  """
  Execute cmdline, limit execution time to 'timeout' seconds.
  Uses subprocessing and tempfile instead of subprocessing.PIPE.
  """

  output  = tempfile.NamedTemporaryFile(delete=False)
  error   = tempfile.NamedTemporaryFile(delete=False)

  p = subprocess.Popen(
    cmdline,
    bufsize = 0,
    shell   = False,
    stdin   = None,
    stdout  = output,
    stderr  = error
  )

  t_begin         = time.time()             # Monitor execution time
  seconds_passed  = 0  

  while p.poll() is None and seconds_passed < timeout:
    seconds_passed = time.time() - t_begin
    time.sleep(0.1)

  #if seconds_passed > timeout:
  #
  #  try:
  #    p.stdout.close()  # If they are not closed the fds will hang around until
  #    p.stderr.close()  # os.fdlimit is exceeded and cause a nasty exception
  #    p.terminate()     # Important to close the fds prior to terminating the process!
  #                      # NOTE: Are there any other "non-freed" resources?
  #  except:
  #    pass
  #  
  #  raise TimeoutInterrupt

  p.wait()

  returncode  = p.returncode

  fd          = open(output.name)
  out         = fd.read()
  fd.close()

  fd  = open(error.name)
  err = fd.read()
  fd.close()

  error.close()
  output.close()

  return (returncode, err, out)

if __name__ == "__main__":

  implementations = [cmd, cmd_to, cmd_totf]
  bytes     = ['65535', '65536', str(1024*1024)]
  timeouts  = [5]

  for timeout in timeouts:    
    for size in bytes:    
      for i in implementations:
        t_begin         = time.time()
        seconds_passed  = 0        
        rc, err, output = i(['exp_gen.py', size], timeout)
        seconds_passed = time.time() - t_begin
        filler = ' '*(8-len(i.func_name))
        print "[%s%s:  timeout=%d,  iosize=%s,  seconds=%f]" % (repr(i.func_name), filler, timeout, size, seconds_passed)

执行的输出:

['cmd'     :  timeout=5,  iosize=65535,  seconds=0.016447]
['cmd_to'  :  timeout=5,  iosize=65535,  seconds=0.103022]
['cmd_totf':  timeout=5,  iosize=65535,  seconds=0.107176]
['cmd'     :  timeout=5,  iosize=65536,  seconds=0.028105]
['cmd_to'  :  timeout=5,  iosize=65536,  seconds=5.116658]
['cmd_totf':  timeout=5,  iosize=65536,  seconds=0.104905]
['cmd'     :  timeout=5,  iosize=1048576,  seconds=0.025964]
['cmd_to'  :  timeout=5,  iosize=1048576,  seconds=5.128062]
['cmd_totf':  timeout=5,  iosize=1048576,  seconds=0.103183]

2 个回答

2

免责声明:这个答案没有在Windows或FreeBSD上测试过。不过使用的模块应该在这些系统上也能工作。我相信这应该是一个能解决你问题的有效答案——对我来说是有效的。

这是我在Linux上为了解决问题而编写的代码。它结合了几个Stackoverflow的讨论和我自己在Python 3文档中的研究。

这段代码的主要特点:

  • 使用进程而不是线程来处理阻塞的输入输出,因为进程能更可靠地被终止。
  • 实现了一个可重启的超时监控器,每当有输出时就重新开始计时。
  • 实现了一个长期的超时监控器,以限制整体运行时间。
  • 可以接收标准输入(虽然我只需要一次性输入短字符串)。
  • 可以像往常一样捕获标准输出和标准错误输出(目前只编写了标准输出,标准错误输出被重定向到标准输出,但可以很容易分开)。
  • 几乎是实时的,因为它每0.2秒检查一次输出。不过你可以轻松减少这个时间间隔或去掉等待时间。
  • 还有很多调试信息输出,可以看到发生了什么事情。

唯一的代码依赖是enum,具体实现可以参考这里,不过代码可以很容易修改成不需要这个的样子。它只是用来区分两个超时——如果你愿意,可以使用不同的异常。

下面是代码——像往常一样,欢迎反馈:

(编辑于2012年6月29日——代码现在实际上可以工作)

# Python module runcmd
# Implements a class to launch shell commands which
# are killed after a timeout. Timeouts can be reset
# after each line of output
#
# Use inside other script with:
#
# import runcmd
# (return_code, out) = runcmd.RunCmd(['ls', '-l', '/etc'],
#                                    timeout_runtime,
#                                    timeout_no_output,
#                                    stdin_string).go()
#

import multiprocessing
import queue
import subprocess
import time

import enum

def timestamp():
    return time.strftime('%Y%m%d-%H%M%S')


class ErrorRunCmd(Exception): pass
class ErrorRunCmdTimeOut(ErrorRunCmd): pass

class Enqueue_output(multiprocessing.Process):
    def __init__(self, out, queue):
        multiprocessing.Process.__init__(self)
        self.out = out
        self.queue = queue
        self.daemon = True
    def run(self):
        try:
            for line in iter(self.out.readline, b''):
                #print('worker read:', line)
                self.queue.put(line)
        except ValueError: pass # Readline of closed file
        self.out.close()
class Enqueue_input(multiprocessing.Process):
    def __init__(self, inp, iterable):
        multiprocessing.Process.__init__(self)
        self.inp = inp
        self.iterable = iterable
        self.daemon = True
    def run(self):
        #print("writing stdin")
        for line in self.iterable:
            self.inp.write(bytes(line,'utf-8'))
        self.inp.close()
        #print("writing stdin DONE")

class RunCmd():
    """RunCmd - class to launch shell commands

    Captures and returns stdout. Kills child after a given
    amount (timeout_runtime) wallclock seconds. Can also
    kill after timeout_retriggerable wallclock seconds.
    This second timer is reset whenever the child does some
    output

       (return_code, out) = RunCmd(['ls', '-l', '/etc'],
                                   timeout_runtime,
                                   timeout_no_output,
                                   stdin_string).go()

    """
    Timeout = enum.Enum('No','Retriggerable','Runtime')

    def __init__(self, cmd, timeout_runtime, timeout_retriggerable, stdin=None):
        self.dbg = False
        self.cmd = cmd
        self.timeout_retriggerable = timeout_retriggerable
        self.timeout_runtime = timeout_runtime
        self.timeout_hit = self.Timeout.No
        self.stdout = '--Cmd did not yield any output--'
        self.stdin = stdin
    def read_queue(self, q):
        time_last_output = None
        try:
            bstr = q.get(False) # non-blocking
            if self.dbg: print('{} chars read'.format(len(bstr)))
            time_last_output = time.time()
            self.stdout += bstr
        except queue.Empty:
            #print('queue empty')
            pass
        return time_last_output
    def go(self):
        if self.stdin:
            pstdin = subprocess.PIPE
        else:
            pstdin = None
        p = subprocess.Popen(self.cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=pstdin)
        pin = None
        if (pstdin):
            pin = Enqueue_input(p.stdin, [self.stdin + '\n'])
            pin.start()
        q = multiprocessing.Queue()
        pout = Enqueue_output(p.stdout, q)
        pout.start()
        try:
            if self.dbg: print('Beginning subprocess with timeout {}/{} s on {}'.format(self.timeout_retriggerable, self.timeout_runtime, time.asctime()))
            time_begin = time.time()
            time_last_output = time_begin
            seconds_passed = 0
            self.stdout = b''
            once = True                 # ensure loop's executed at least once
                                        # some child cmds may exit very fast, but still produce output
            while once or p.poll() is None or not q.empty():
                once = False
                if self.dbg: print('a) {} of {}/{} secs passed and overall {} chars read'.format(seconds_passed, self.timeout_retriggerable, self.timeout_runtime, len(self.stdout)))

                tlo = self.read_queue(q)
                if tlo:
                    time_last_output = tlo

                now = time.time()
                if now - time_last_output >= self.timeout_retriggerable:
                    self.timeout_hit = self.Timeout.Retriggerable
                    raise ErrorRunCmdTimeOut(self)
                if now - time_begin >= self.timeout_runtime:
                    self.timeout_hit = self.Timeout.Runtime
                    raise ErrorRunCmdTimeOut(self)

                if q.empty():
                    time.sleep(0.1)
            # Final try to get "last-millisecond" output
            self.read_queue(q)            
        finally:
            self._close(p, [pout, pin])            
        return (self.returncode, self.stdout)               

    def _close(self, p, procs):
        if self.dbg:
            if self.timeout_hit != self.Timeout.No:
                print('{} A TIMEOUT occured: {}'.format(timestamp(), self.timeout_hit))
            else:
                print('{} No timeout occured'.format(timestamp()))
        for process in [proc for proc in procs if proc]:
            try:
                process.terminate()
            except:
                print('{} Process termination raised trouble'.format(timestamp()))
                raise
        try:
            p.stdin.close()
        except: pass
        if self.dbg: print('{} _closed stdin'.format(timestamp()))
        try:
            p.stdout.close()    # If they are not closed the fds will hang around until
        except: pass
        if self.dbg: print('{} _closed stdout'.format(timestamp()))
            #p.stderr.close()   # os.fdlimit is exceeded and cause a nasty exception
        try:
            p.terminate()       # Important to close the fds prior to terminating the process!
                                # NOTE: Are there any other "non-freed" resources?
        except: pass
        if self.dbg: print('{} _closed Popen'.format(timestamp()))
        try:
            self.stdout = self.stdout.decode('utf-8')
        except: pass
        self.returncode = p.returncode
        if self.dbg: print('{} _closed all'.format(timestamp()))

使用方法:

import runcmd

cmd = ['ls', '-l', '/etc']

worker = runcmd.RunCmd(cmd,
                       40,    # limit runtime [wallclock seconds]
                       2,     # limit runtime after last output [wallclk secs]
                       ''     # stdin input string
                       )
(return_code, out) = worker.go()

if worker.timeout_hit != worker.Timeout.No:
    print('A TIMEOUT occured: {}'.format(worker.timeout_hit))
else:
    print('No timeout occured')


print("Running '{:s}' returned {:d} and {:d} chars of output".format(cmd, return_code, len(out)))
print('Output:')
print(out)

command - 第一个参数 - 应该是一个包含命令及其参数的列表。它用于Popen(shell=False)调用,超时以秒为单位。目前没有代码可以禁用超时。将timeout_no_output设置为time_runtime可以有效禁用可重启的timeout_no_output

stdin_string可以是任何要发送到命令标准输入的字符串。如果你的命令不需要输入,可以设置为None。如果提供了字符串,最后会自动添加一个'\n'。

5

与子进程文档中的所有警告相反,直接从process.stdout和process.stderr读取数据提供了一个更好的解决方案。

这里说的更好是指,我可以读取一个进程的输出,超过2^16字节,而不需要暂时把这些输出存储到磁盘上。

下面是代码:

import fcntl
import os
import subprocess
import time

def nonBlockRead(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ''

def cmd(cmdline, timeout=60):
    """
    Execute cmdline, limit execution time to 'timeout' seconds.
    Uses the subprocess module and subprocess.PIPE.

    Raises TimeoutInterrupt
    """

    p = subprocess.Popen(
        cmdline,
        bufsize = bufsize, # default value of 0 (unbuffered) is best
        shell   = False, # not really needed; it's disabled by default
        stdout  = subprocess.PIPE,
        stderr  = subprocess.PIPE
    )

    t_begin = time.time() # Monitor execution time
    seconds_passed = 0

    stdout = ''
    stderr = ''

    while p.poll() is None and seconds_passed < timeout: # Monitor process
        time.sleep(0.1) # Wait a little
        seconds_passed = time.time() - t_begin

        # p.std* blocks on read(), which messes up the timeout timer.
        # To fix this, we use a nonblocking read()
        # Note: Not sure if this is Windows compatible
        stdout += nonBlockRead(p.stdout)
        stderr += nonBlockRead(p.stderr)

    if seconds_passed >= timeout:
        try:
            p.stdout.close()  # If they are not closed the fds will hang around until
            p.stderr.close()  # os.fdlimit is exceeded and cause a nasty exception
            p.terminate()     # Important to close the fds prior to terminating the process!
                              # NOTE: Are there any other "non-freed" resources?
        except:
            pass

        raise TimeoutInterrupt

    returncode  = p.returncode

    return (returncode, stdout, stderr)

撰写回答