gobject IO监控 + 非阻塞读取
我在使用Python中的io_add_watch
监控时遇到了问题(通过gobject)。我想在每次收到通知后,非阻塞地读取整个缓冲区。以下是简化后的代码:
class SomeApp(object):
def __init__(self):
# some other init that does a lot of stderr debug writes
fl = fcntl.fcntl(0, fcntl.F_GETFL, 0)
fcntl.fcntl(0, fcntl.F_SETFL, fl | os.O_NONBLOCK)
print "hooked", gobject.io_add_watch(0, gobject.IO_IN | gobject.IO_PRI, self.got_message, [""])
self.app = gobject.MainLoop()
def run(self):
print "ready"
self.app.run()
def got_message(self, fd, condition, data):
print "reading now"
data[0] += os.read(0, 1024)
print "got something", fd, condition, data
return True
gobject.threads_init()
SomeApp().run()
这里有个小窍门——当我在没有开启调试输出的情况下运行程序时,got_message
的调用就不会出现。如果我先往标准错误输出(stderr)写很多东西,问题就消失了。如果除了代码中可见的打印内容外,我什么都不写,标准输入(stdin)的消息信号就不会出现。还有一个有趣的事情是,当我尝试通过strace
运行同样的应用程序,并开启stderr调试时(想检查是否漏掉了什么调用),问题又出现了。
简单来说:如果我在没有使用strace的情况下先往stderr写很多东西,io_watch
就能正常工作。如果我使用strace写很多东西,或者根本不写,io_watch
就不工作。
“其他初始化”部分需要一些时间,所以如果我在看到“hooked 2”输出之前输入一些文本,然后在“ready”之后按“ctrl+c”,get_message
回调会被调用,但读取时会抛出EAGAIN错误,这样缓冲区似乎是空的。
与stdin相关的strace日志:
ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
fcntl(0, F_GETFL) = 0xa002 (flags O_RDWR|O_ASYNC|O_LARGEFILE)
fcntl(0, F_SETFL, O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE) = 0
fcntl(0, F_GETFL) = 0xa802 (flags O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE)
有没有人对这里发生了什么有想法?
补充:又有了一个线索。我尝试重构应用程序,让读取在另一个线程中进行,并通过管道传回去。结果“有点”有效:
...
rpipe, wpipe = os.pipe()
stopped = threading.Event()
self.stdreader = threading.Thread(name = "reader", target = self.std_read_loop, args = (wpipe, stopped))
self.stdreader.start()
new_data = ""
print "hooked", gobject.io_add_watch(rpipe, gobject.IO_IN | gobject.IO_PRI, self.got_message, [new_data])
def std_read_loop(self, wpipe, stop_event):
while True:
try:
new_data = os.read(0, 1024)
while len(new_data) > 0:
l = os.write(wpipe, new_data)
new_data = new_data[l:]
except OSError, e:
if stop_event.isSet():
break
time.sleep(0.1)
...
令人惊讶的是,如果我把相同的文本放到一个新的管道中,一切就开始正常工作了。问题是:
- 第一行根本没有“被注意到”——我只收到了第二行及后面的内容
- 这看起来很糟糕
也许这能给其他人一些线索,为什么会这样?
3 个回答
如果你先设置了回调函数,然后再输出错误信息,会发生什么呢?当你开启调试输出时,这个回调函数还会被调用吗?
另外,我想你可能需要在你的处理函数里不断调用 os.read()
,直到没有数据为止,这样可以确保在调用之间如果有超过1024字节的数据准备好,也能被读取到。
你有没有尝试在后台线程中使用 select
模块来模拟 gio
的功能?这样做有效吗?你是在什么平台上操作的?你处理的文件描述符是什么类型的?(是文件?套接字?管道?)
文档上说,你在回调函数里应该返回 TRUE
,否则这个函数就会从事件源的列表中被移除。
这听起来像是一个竞争条件,也就是说在设置你的回调函数时可能有一些延迟,或者环境发生了变化,影响了你是否能设置这个回调函数。
我建议你仔细看看在调用 io_add_watch()
之前发生了什么。比如,Python 的 fcntl 文档提到:
这个模块里的所有函数都需要一个文件描述符 fd 作为第一个参数。这个描述符可以是一个整数,比如通过
sys.stdin.fileno()
得到的,或者是一个文件对象,比如sys.stdin
本身,它提供一个fileno()
方法,返回一个真实的文件描述符。
显然,当你假设标准输入(STDIN)的文件描述符(FD)是 0 时,你并没有按照这个方式来做。我建议你先改正这一点,然后再试一次。
另外,如果文件描述符已经被阻塞,那么你的程序可能会在等待,而其他没有被阻塞的进程在运行,因此根据你先做什么,时间上会有差异。如果你把 fcntl 的相关代码重构一下,让它在程序启动后尽快执行,甚至在导入 GTK 模块之前,这样会发生什么呢?
我不太明白为什么使用 GTK 图形界面的程序会想要从标准输入读取数据。如果你实际上是想捕获另一个进程的输出,应该使用 subprocess 模块来设置一个管道,然后在这个管道上使用 io_add_watch()
,像这样:
proc = subprocess.Popen(command, stdout = subprocess.PIPE)
gobject.io_add_watch(proc.stdout, glib.IO_IN, self.write_to_buffer )
再次强调,在这个例子中,我们确保在调用 io_add_watch(
之前有一个有效的打开的文件描述符。
通常,当使用 gobject.io_add_watch()
时,它是在 gobject.MainLoop()
之前调用的。例如,这里有一些使用 io_add_watch
来捕获 IO_IN 的有效代码。