在Python中同时查看多个日志文件

4 投票
3 回答
3193 浏览
提问于 2025-04-16 16:05

这可能是我一个有点傻的练习,但它引发了一些有趣的问题。我有一个聊天客户端的日志文件夹,每当其中一个文件发生变化时,我想通过notify-osd收到通知。

我写的脚本基本上是用os.popen来运行Linux的tail命令,查看每个文件的最后一行,然后把每一行和上次运行时的记录进行对比。如果有变化,就用pynotify给我发个通知。

这个脚本实际上运行得很好,只是它占用了大量的CPU资源(可能是因为每次循环都要对大约16个文件运行tail命令,而这些文件是通过sshfs挂载的)。

看起来像这个方法会是个很好的解决方案,但我不知道怎么把它应用到多个文件上。

这里是我写的脚本。请原谅我缺乏注释和糟糕的风格。

编辑:为了澄清,这一切都是在桌面Linux上进行的。

3 个回答

1

这是一个简单的纯Python解决方案(虽然不是最好的,但它不会创建新的进程,在空闲时会输出4行空白,并且每次块的来源如果有变化都会标记出来):

#!/usr/bin/env python

from __future__ import with_statement

'''
Implement multi-file tail
'''

import os
import sys
import time


def print_file_from(filename, pos):
    with open(filename, 'rb') as fh:
        fh.seek(pos)
        while True:
            chunk = fh.read(8192)
            if not chunk:
                break
            sys.stdout.write(chunk)


def _fstat(filename):
    st_results = os.stat(filename)
    return (st_results[6], st_results[8])


def _print_if_needed(filename, last_stats, no_fn, last_fn):
    changed = False
    #Find the size of the file and move to  the end
    tup = _fstat(filename)
    # print tup
    if last_stats[filename] != tup:
        changed = True
        if not no_fn and last_fn != filename:
            print '\n<%s>' % filename
        print_file_from(filename, last_stats[filename][0])
        last_stats[filename] = tup
    return changed


def multi_tail(filenames, stdout=sys.stdout, interval=1, idle=10, no_fn=False):
    S = lambda (st_size, st_mtime): (max(0, st_size - 124), st_mtime)
    last_stats = dict((fn, S(_fstat(fn))) for fn in filenames)
    last_fn = None
    last_print = 0
    while 1:
        # print last_stats
        changed = False
        for filename in filenames:
            if _print_if_needed(filename, last_stats, no_fn, last_fn):
                changed = True
                last_fn = filename
        if changed:
            if idle > 0:
                last_print = time.time()
        else:
            if idle > 0 and last_print is not None:
                if time.time() - last_print >= idle:
                    last_print = None
                    print '\n' * 4
            time.sleep(interval)

if '__main__' == __name__:
    from optparse import OptionParser
    op = OptionParser()
    op.add_option('-F', '--no-fn', help="don't print filename when changes",
        default=False, action='store_true')
    op.add_option('-i', '--idle', help='idle time, in seconds (0 turns off)',
        type='int', default=10)
    op.add_option('--interval', help='check interval, in seconds', type='int',
        default=1)
    opts, args = op.parse_args()
    try:
        multi_tail(args, interval=opts.interval, idle=opts.idle,
            no_fn=opts.no_fn)
    except KeyboardInterrupt:
        pass
5

如果你已经在使用pyinotify这个模块,那么用纯Python来实现这个功能就很简单了(也就是说,不需要为每个文件单独启动一个进程来监控)。

下面是一个通过inotify驱动的示例,它应该会占用很少的CPU资源。当某个路径发生IN_MODIFY事件时,我们会从文件句柄中读取所有可用的数据,并输出找到的完整行,同时将不完整的行缓存起来,等到有更多数据时再处理:

import os
import select
import sys
import pynotify
import pyinotify

class Watcher(pyinotify.ProcessEvent):

    def __init__(self, paths):
        self._manager = pyinotify.WatchManager()
        self._notify = pyinotify.Notifier(self._manager, self)
        self._paths = {}
        for path in paths:
            self._manager.add_watch(path, pyinotify.IN_MODIFY)
            fh = open(path, 'rb')
            fh.seek(0, os.SEEK_END)
            self._paths[os.path.realpath(path)] = [fh, '']

    def run(self):
        while True:
            self._notify.process_events()
            if self._notify.check_events():
                self._notify.read_events()

    def process_default(self, evt):
        path = evt.pathname
        fh, buf = self._paths[path]
        data = fh.read()
        lines = data.split('\n')
        # output previous incomplete line.
        if buf:
            lines[0] = buf + lines[0]
        # only output the last line if it was complete.
        if lines[-1]:
            buf = lines[-1]
        lines.pop()

        # display a notification
        notice = pynotify.Notification('%s changed' % path, '\n'.join(lines))
        notice.show()

        # and output to stdout
        for line in lines:
            sys.stdout.write(path + ': ' + line + '\n')
        sys.stdout.flush()
        self._paths[path][1] = buf

pynotify.init('watcher')
paths = sys.argv[1:]
Watcher(paths).run()

使用方法:

% python watcher.py [path1 path2 ... pathN]
9

不看你的源代码,我可以告诉你有两种简单的方法,可以更高效地处理多个文件。

  1. 如果不是必要,别费劲去运行 tail。你可以用 os.stat 来查看所有文件的状态,记录下最后修改的时间。如果最后修改的时间不一样,就发出通知。

  2. 使用 pyinotify 来调用 Linux 的 inotify 功能;这样可以让内核为你处理第一种方法,当你目录里的文件发生变化时,它会通知你。然后把这个通知转换成你的 osd 通知。

不过,根据你想要的通知数量,以及你是否在意错过某条消息的通知,可能会有一些复杂的地方。

如果你想继续使用 tail,可以试试 tail -f。用 tail -f 打开所有文件,然后使用 select 模块,让操作系统告诉你哪个文件有新输入。你的主循环会调用 select,然后遍历每个可读的描述符来生成通知。(其实你也可以不使用 tail,直接在可读时调用 readline()。)

你脚本中还有其他可以改进的地方:

  • os.listdir 和原生的 Python 过滤(比如使用列表推导式)来代替用 popen 和一堆 grep 过滤。
  • 定期更新要扫描的缓冲区列表,而不是只在程序启动时做一次。
  • 使用 subprocess.popen 代替 os.popen

撰写回答