gobject IO监控 + 非阻塞读取

3 投票
3 回答
2034 浏览
提问于 2025-04-15 15:10

我在使用Python中的io_add_watch监控时遇到了问题(通过gobject)。我想在每次收到通知后,非阻塞地读取整个缓冲区。以下是简化后的代码:

class SomeApp(object):

   def __init__(self):
      # some other init that does a lot of stderr debug writes
      fl = fcntl.fcntl(0, fcntl.F_GETFL, 0)
      fcntl.fcntl(0, fcntl.F_SETFL, fl | os.O_NONBLOCK)
      print "hooked", gobject.io_add_watch(0, gobject.IO_IN | gobject.IO_PRI, self.got_message, [""])
      self.app = gobject.MainLoop()

   def run(self):
      print "ready"
      self.app.run()

   def got_message(self, fd, condition, data):
      print "reading now"
      data[0] += os.read(0, 1024)
      print "got something", fd, condition, data
      return True

gobject.threads_init()
SomeApp().run()

这里有个小窍门——当我在没有开启调试输出的情况下运行程序时,got_message的调用就不会出现。如果我先往标准错误输出(stderr)写很多东西,问题就消失了。如果除了代码中可见的打印内容外,我什么都不写,标准输入(stdin)的消息信号就不会出现。还有一个有趣的事情是,当我尝试通过strace运行同样的应用程序,并开启stderr调试时(想检查是否漏掉了什么调用),问题又出现了。

简单来说:如果我在没有使用strace的情况下先往stderr写很多东西,io_watch就能正常工作。如果我使用strace写很多东西,或者根本不写,io_watch就不工作。

“其他初始化”部分需要一些时间,所以如果我在看到“hooked 2”输出之前输入一些文本,然后在“ready”之后按“ctrl+c”,get_message回调会被调用,但读取时会抛出EAGAIN错误,这样缓冲区似乎是空的。

与stdin相关的strace日志:

ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
fcntl(0, F_GETFL)                       = 0xa002 (flags O_RDWR|O_ASYNC|O_LARGEFILE)
fcntl(0, F_SETFL, O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE) = 0
fcntl(0, F_GETFL)                       = 0xa802 (flags O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE)

有没有人对这里发生了什么有想法?


补充:又有了一个线索。我尝试重构应用程序,让读取在另一个线程中进行,并通过管道传回去。结果“有点”有效:

...
      rpipe, wpipe = os.pipe()
      stopped = threading.Event()
      self.stdreader = threading.Thread(name = "reader", target = self.std_read_loop, args = (wpipe, stopped))
      self.stdreader.start()
      new_data = ""
      print "hooked", gobject.io_add_watch(rpipe, gobject.IO_IN | gobject.IO_PRI, self.got_message, [new_data])

   def std_read_loop(self, wpipe, stop_event):
      while True:
         try:
            new_data = os.read(0, 1024)
            while len(new_data) > 0:
               l = os.write(wpipe, new_data)
               new_data = new_data[l:]
         except OSError, e:
            if stop_event.isSet():
               break
            time.sleep(0.1)
...

令人惊讶的是,如果我把相同的文本放到一个新的管道中,一切就开始正常工作了。问题是:

  • 第一行根本没有“被注意到”——我只收到了第二行及后面的内容
  • 这看起来很糟糕

也许这能给其他人一些线索,为什么会这样?

3 个回答

0

如果你先设置了回调函数,然后再输出错误信息,会发生什么呢?当你开启调试输出时,这个回调函数还会被调用吗?

另外,我想你可能需要在你的处理函数里不断调用 os.read(),直到没有数据为止,这样可以确保在调用之间如果有超过1024字节的数据准备好,也能被读取到。

你有没有尝试在后台线程中使用 select 模块来模拟 gio 的功能?这样做有效吗?你是在什么平台上操作的?你处理的文件描述符是什么类型的?(是文件?套接字?管道?)

0

文档上说,你在回调函数里应该返回 TRUE,否则这个函数就会从事件源的列表中被移除。

2

这听起来像是一个竞争条件,也就是说在设置你的回调函数时可能有一些延迟,或者环境发生了变化,影响了你是否能设置这个回调函数。

我建议你仔细看看在调用 io_add_watch() 之前发生了什么。比如,Python 的 fcntl 文档提到:

这个模块里的所有函数都需要一个文件描述符 fd 作为第一个参数。这个描述符可以是一个整数,比如通过 sys.stdin.fileno() 得到的,或者是一个文件对象,比如 sys.stdin 本身,它提供一个 fileno() 方法,返回一个真实的文件描述符。

显然,当你假设标准输入(STDIN)的文件描述符(FD)是 0 时,你并没有按照这个方式来做。我建议你先改正这一点,然后再试一次。

另外,如果文件描述符已经被阻塞,那么你的程序可能会在等待,而其他没有被阻塞的进程在运行,因此根据你先做什么,时间上会有差异。如果你把 fcntl 的相关代码重构一下,让它在程序启动后尽快执行,甚至在导入 GTK 模块之前,这样会发生什么呢?

我不太明白为什么使用 GTK 图形界面的程序会想要从标准输入读取数据。如果你实际上是想捕获另一个进程的输出,应该使用 subprocess 模块来设置一个管道,然后在这个管道上使用 io_add_watch(),像这样:

proc = subprocess.Popen(command, stdout = subprocess.PIPE)
gobject.io_add_watch(proc.stdout, glib.IO_IN, self.write_to_buffer )

再次强调,在这个例子中,我们确保在调用 io_add_watch( 之前有一个有效的打开的文件描述符。

通常,当使用 gobject.io_add_watch() 时,它是在 gobject.MainLoop() 之前调用的。例如,这里有一些使用 io_add_watch 来捕获 IO_IN 的有效代码。

撰写回答