装饰/委托文件对象以添加功能

14 投票
5 回答
1514 浏览
提问于 2025-04-16 10:06

我写了一个小的Python脚本,用来执行一些命令行指令,使用的是subprocess模块和一个辅助函数:

import subprocess as sp
def run(command, description):
    """Runs a command in a formatted manner. Returns its return code."""
    start=datetime.datetime.now()
    sys.stderr.write('%-65s' % description)
    s=sp.Popen(command, shell=True, stderr=sp.PIPE, stdout=sp.PIPE)
    out,err=s.communicate()
    end=datetime.datetime.now()
    duration=end-start
    status='Done' if s.returncode==0 else 'Failed'
    print '%s (%d seconds)' % (status, duration.seconds)

接下来的几行代码是用来读取标准输出和错误信息的:

    s=sp.Popen(command, shell=True, stderr=sp.PIPE, stdout=sp.PIPE)
    out,err=s.communicate()

如你所见,stdout和stderr没有被使用。假设我想把输出和错误信息写入一个日志文件,并且格式化一下,比如:

[STDOUT: 2011-01-17 14:53:55] <message>
[STDERR: 2011-01-17 14:53:56] <message>

我想问的是,最符合Python风格的做法是什么?我想到了三种选择:

  1. 继承文件对象并重写write方法。
  2. 使用一个实现了write的委托类。
  3. 以某种方式直接连接到PIPE

更新:参考测试脚本

我用这个脚本来检查结果,保存为test.py

#!/usr/bin/python
import sys

sys.stdout.write('OUT\n')
sys.stdout.flush()
sys.stderr.write('ERR\n')
sys.stderr.flush()

有什么想法吗?

5 个回答

0

这段内容使用了Adam Rosenfield的make_async和read_async。我之前的回答是用select.epoll,所以只能在Linux上用,现在改成了select.select,这样在Unix和Windows上都能用了。

这段代码会把子进程的输出实时记录到/tmp/test.log文件里:

import logging
import subprocess
import shlex
import select
import fcntl
import os
import errno

def make_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''add the O_NONBLOCK flag to a file descriptor'''
    fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

def read_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''read some data from a file descriptor, ignoring EAGAIN errors'''
    try:
        return fd.read()
    except IOError, e:
        if e.errno != errno.EAGAIN:
            raise e
        else:
            return ''

def log_process(proc,stdout_logger,stderr_logger):
    loggers = { proc.stdout: stdout_logger, proc.stderr:  stderr_logger }
    def log_fds(fds):
        for fd in fds:
            out = read_async(fd)
            if out.strip():
                loggers[fd].info(out)
    make_async(proc.stdout)
    make_async(proc.stderr)
    while True:
        # Wait for data to become available 
        rlist, wlist, xlist = select.select([proc.stdout, proc.stderr], [], [])
        log_fds(rlist)
        if proc.poll() is not None:
            # Corner case: check if more output was created
            # between the last call to read_async and now
            log_fds([proc.stdout, proc.stderr])                
            break

if __name__=='__main__':
    formatter = logging.Formatter('[%(name)s: %(asctime)s] %(message)s')
    handler = logging.FileHandler('/tmp/test.log','w')
    handler.setFormatter(formatter)

    stdout_logger=logging.getLogger('STDOUT')
    stdout_logger.setLevel(logging.DEBUG)
    stdout_logger.addHandler(handler)

    stderr_logger=logging.getLogger('STDERR')
    stderr_logger.setLevel(logging.DEBUG)
    stderr_logger.addHandler(handler)        

    proc = subprocess.Popen(shlex.split('ls -laR /tmp'),
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    log_process(proc,stdout_logger,stderr_logger)
1

我建议使用选项3,也就是用logging这个标准库。对我来说,另外两个选项有点太复杂了。

14

方案1和方案2是合理的解决办法,但仅仅重写write()方法是不够的。

问题在于,Popen需要文件句柄来连接到进程,所以Python的文件对象是行不通的,它们必须是操作系统级别的。为了解决这个问题,你需要一个有操作系统级别文件句柄的Python对象。我能想到的解决办法就是使用管道,这样你就有一个可以写入的操作系统级别的文件句柄。但这样你还需要另一个线程来不断检查那个管道,看有没有东西可以读取并记录下来。(所以这实际上是方案2的更严格实现,因为它委托给了日志记录功能)。

说到做到:

import io
import logging
import os
import select
import subprocess
import time
import threading

LOG_FILENAME = 'output.log'
logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG)

class StreamLogger(io.IOBase):
    def __init__(self, level):
        self.level = level
        self.pipe = os.pipe()
        self.thread = threading.Thread(target=self._flusher)
        self.thread.start()

    def _flusher(self):
        self._run = True
        buf = b''
        while self._run:
            for fh in select.select([self.pipe[0]], [], [], 0)[0]:
                buf += os.read(fh, 1024)
                while b'\n' in buf:
                    data, buf = buf.split(b'\n', 1)
                    self.write(data.decode())
            time.sleep(1)
        self._run = None

    def write(self, data):
        return logging.log(self.level, data)

    def fileno(self):
        return self.pipe[1]

    def close(self):
        if self._run:
            self._run = False
            while self._run is not None:
                time.sleep(1)
            os.close(self.pipe[0])
            os.close(self.pipe[1])

这个类启动了一个操作系统级别的管道,Popen可以将标准输入/输出/错误连接到这个子进程。它还启动了一个线程,每秒检查一次管道的另一端,看有没有东西可以记录,然后用日志模块进行记录。

这个类可能应该实现更多的功能以确保完整性,但在这种情况下它已经可以正常工作了。

示例代码:

with StreamLogger(logging.INFO) as out:
    with StreamLogger(logging.ERROR) as err:
        subprocess.Popen("ls", stdout=out, stderr=err, shell=True)

output.log的内容大致如下:

INFO:root:output.log
INFO:root:streamlogger.py
INFO:root:and
INFO:root:so
INFO:root:on

在Python 2.6、2.7和3.1上进行了测试。

我认为方案1和方案3的任何实现都需要使用类似的技术。这有点复杂,但除非你能让Popen命令自己正确记录,否则我没有更好的主意。

撰写回答