如何通过QSocketNotifier检测管道准备好读取?
我在使用Python 3.4,操作系统是Windows和Unix。我想用QSocketNotifier来检测一个管道是否可以读取数据。这个例子有点特殊,因为这个管道是在同一个进程里使用的。不管怎样,关键是我需要在管道有数据可读的时候采取行动。
我写了一个演示程序,但QSocketNotifier从来没有发出它的activated信号。我是不是漏掉了什么明显的东西?
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
import os
def can_read(fd):
print('File {} can be read'.format(fd))
def start():
print('Starting')
reader, writer = os.pipe()
notifier = QSocketNotifier(reader, QSocketNotifier.Read, w)
notifier.setEnabled(True)
notifier.activated.connect(can_read)
os.write(writer, b'a')
os.close(writer)
app = QApplication([])
QTimer.singleShot(0, start)
w = QMainWindow()
w.show()
app.exec_()
2 个回答
1
这可能跟让变量超出作用域有关。如果你把所有相关的部分都存储起来,比如放在一个类里面,这样就能正常工作了。下面是我的解决方案(使用的是pyqt4,抱歉),这个程序会一直运行,直到窗口被关闭:
from PyQt4.QtGui import *
from PyQt4.QtCore import *
import os
class win(QMainWindow):
def can_read(self,fd):
print('File {} can be read'.format(fd))
def start(self):
print('Starting')
self.reader, self.writer = os.pipe()
notifier = QSocketNotifier(self.reader, QSocketNotifier.Read, self)
notifier.setEnabled(True)
notifier.activated.connect(self.can_read)
os.write(self.writer, b'a')
os.close(self.writer)
app = QApplication([])
#QTimer.singleShot(0, start)
w = win()
w.start()
w.show()
app.exec_()
1
其实,这段话并没有直接回答如何从一个可读的管道接收通知,但它提供了一个跨平台的替代方案,也就是监控一个套接字(socket),而不是管道:
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
import socket
def _create_sock_pair(port=0):
"""Create socket pair.
If socket.socketpair isn't available, we emulate it.
"""
# See if socketpair() is available.
have_socketpair = hasattr(socket, 'socketpair')
if have_socketpair:
client_sock, srv_sock = socket.socketpair()
return client_sock, srv_sock
# Create a non-blocking temporary server socket
temp_srv_sock = socket.socket()
temp_srv_sock.setblocking(False)
temp_srv_sock.bind(('', port))
port = temp_srv_sock.getsockname()[1]
temp_srv_sock.listen(1)
# Create non-blocking client socket
client_sock = socket.socket()
client_sock.setblocking(False)
try:
client_sock.connect(('localhost', port))
except socket.error as err:
# Error 10035 (operation would block) is not an error, as we're doing this with a
# non-blocking socket.
if err.errno != 10035:
raise
# Use select to wait for connect() to succeed.
import select
timeout = 1
readable = select.select([temp_srv_sock], [], [], timeout)[0]
if temp_srv_sock not in readable:
raise Exception('Client socket not connected in {} second(s)'.format(timeout))
srv_sock, _ = temp_srv_sock.accept()
return client_sock, srv_sock
def can_read():
print('Server can read')
app.quit()
def write():
print('Client writing')
client_sock.send(b'a')
client_sock.close()
app = QApplication([])
client_sock, srv_sock = _create_sock_pair()
notifier = QSocketNotifier(srv_sock.fileno(), QSocketNotifier.Read)
notifier.activated.connect(can_read)
write()
w = QMainWindow()
w.show()
app.exec_()
srv_sock.close()