PyGTK/GIO:递归监视目录变化

4 投票
2 回答
2905 浏览
提问于 2025-04-16 00:08

下面是一个示例代码(来自这个问题的GIO回答),它使用GIO的文件监视器来监控一个文件夹的变化:

import gio

def directory_changed(monitor, file1, file2, evt_type):
    print "Changed:", file1, file2, evt_type

gfile = gio.File(".")
monitor = gfile.monitor_directory(gio.FILE_MONITOR_NONE, None)
monitor.connect("changed", directory_changed) 

import glib
ml = glib.MainLoop()
ml.run()

运行这个代码后,我可以创建和修改子节点,并会收到变化的通知。不过,这个监控只对直接的子节点有效(我知道文档里没有说明其他情况)。下面的最后一个命令不会触发通知:

touch one
mkdir two
touch two/three

有没有简单的方法可以让它支持递归监控?我不想手动写代码去查找目录的创建,并添加监视器,删除时再移除等等。

这个功能的目的是为了一个版本控制系统(VCS)文件浏览器扩展,能够缓存工作副本中文件的状态,并在变化时单独更新它们。所以可能需要监控从几十到几千(甚至更多)的目录。我希望能找到工作副本的根目录,然后在那添加文件监视器。

我知道有pyinotify这个工具,但我想避免使用它,以便在非Linux内核的系统上也能运行,比如FreeBSD或其他系统。根据我的了解,GIO的文件监视器在可用的情况下是使用inotify的,我能理解不强调具体实现是为了保持一定的抽象层次,但这让我觉得应该是可以做到的。

(如果有必要的话,我最初是在PyGTK邮件列表上发布的这个内容。)

2 个回答

2

“有没有简单的方法让它变成递归的?”

我不知道有什么“简单的方法”可以做到这一点。像Linux上的inotify或BSD上的kqueue这样的底层系统,并没有提供自动添加递归监视的功能。我也不知道有没有库可以在GIO上实现你想要的功能。

所以你很可能需要自己来实现这个功能。因为在某些特殊情况下(比如 mkdir -p foo/bar/baz),这可能会有点棘手。我建议你看看pynotify是怎么实现它的 auto_add 功能的(可以在 pynotify的源代码中查找),然后把这个思路移植到GIO上。

1

我不太确定GIO是否允许同时使用多个显示器,但如果可以的话,完全可以这样做:

import gio
import os

def directory_changed(monitor, file1, file2, evt_type):
    if os.path.isdir(file2):    #maybe this needs to be file1?
        add_monitor(file2) 
    print "Changed:", file1, file2, evt_type

def add_monitor(dir):
    gfile = gio.File(dir)
    monitor = gfile.monitor_directory(gio.FILE_MONITOR_NONE, None)
    monitor.connect("changed", directory_changed) 

add_monitor('.')

import glib
ml = glib.MainLoop()
ml.run()

*我说没有理由是因为这可能会消耗很多资源,不过我对GIO几乎没有了解,所以也不能确定。其实你也可以用Python自己写一个,使用几个命令(比如os.listdir等)。它可能看起来像这样:

import time
import os

class Watcher(object):
    def __init__(self):
        self.dirs = []
        self.snapshots = {}

    def add_dir(self, dir):
        self.dirs.append(dir)

    def check_for_changes(self, dir):
        snapshot = self.snapshots.get(dir)
        curstate = os.listdir(dir)
        if not snapshot:
            self.snapshots[dir] = curstate
        else:
            if not snapshot == curstate:
                print 'Changes: ',
                for change in set(curstate).symmetric_difference(set(snapshot)):
                    if os.path.isdir(change):
                        print "isdir"
                        self.add_dir(change)
                    print change,

                self.snapshots[dir] = curstate
                print

    def mainloop(self):
        if len(self.dirs) < 1:
            print "ERROR: Please add a directory with add_dir()"
            return

        while True:
            for dir in self.dirs:
                self.check_for_changes(dir)
            time.sleep(4) # Don't want to be a resource hog

w = Watcher()
w.add_dir('.')


w.mainloop()

撰写回答