如何通过QSocketNotifier检测管道准备好读取?

0 投票
2 回答
1363 浏览
提问于 2025-04-18 14:29

我在使用Python 3.4,操作系统是Windows和Unix。我想用QSocketNotifier来检测一个管道是否可以读取数据。这个例子有点特殊,因为这个管道是在同一个进程里使用的。不管怎样,关键是我需要在管道有数据可读的时候采取行动。

我写了一个演示程序,但QSocketNotifier从来没有发出它的activated信号。我是不是漏掉了什么明显的东西?

from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
import os


def can_read(fd):
   print('File {} can be read'.format(fd))


def start():
   print('Starting')
   reader, writer = os.pipe()
   notifier = QSocketNotifier(reader, QSocketNotifier.Read, w)
   notifier.setEnabled(True)
   notifier.activated.connect(can_read)
   os.write(writer, b'a')
   os.close(writer)


app = QApplication([])
QTimer.singleShot(0, start)
w = QMainWindow()
w.show()
app.exec_()

2 个回答

1

这可能跟让变量超出作用域有关。如果你把所有相关的部分都存储起来,比如放在一个类里面,这样就能正常工作了。下面是我的解决方案(使用的是pyqt4,抱歉),这个程序会一直运行,直到窗口被关闭:

from PyQt4.QtGui import *
from PyQt4.QtCore import *
import os

class win(QMainWindow):


    def can_read(self,fd):
       print('File {} can be read'.format(fd))


    def start(self):
       print('Starting')
       self.reader, self.writer = os.pipe()
       notifier = QSocketNotifier(self.reader, QSocketNotifier.Read, self)
       notifier.setEnabled(True)
       notifier.activated.connect(self.can_read)
       os.write(self.writer, b'a')
       os.close(self.writer)


app = QApplication([])
#QTimer.singleShot(0, start)
w = win()
w.start()
w.show()
app.exec_()
1

其实,这段话并没有直接回答如何从一个可读的管道接收通知,但它提供了一个跨平台的替代方案,也就是监控一个套接字(socket),而不是管道:

from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
import socket

def _create_sock_pair(port=0):
    """Create socket pair.

    If socket.socketpair isn't available, we emulate it.
    """
    # See if socketpair() is available.
    have_socketpair = hasattr(socket, 'socketpair')
    if have_socketpair:
        client_sock, srv_sock = socket.socketpair()
        return client_sock, srv_sock

    # Create a non-blocking temporary server socket
    temp_srv_sock = socket.socket()
    temp_srv_sock.setblocking(False)
    temp_srv_sock.bind(('', port))
    port = temp_srv_sock.getsockname()[1]
    temp_srv_sock.listen(1)

    # Create non-blocking client socket
    client_sock = socket.socket()
    client_sock.setblocking(False)
    try:
        client_sock.connect(('localhost', port))
    except socket.error as err:
        # Error 10035 (operation would block) is not an error, as we're doing this with a
        # non-blocking socket.
        if err.errno != 10035:
            raise

    # Use select to wait for connect() to succeed.
    import select
    timeout = 1
    readable = select.select([temp_srv_sock], [], [], timeout)[0]
    if temp_srv_sock not in readable:
        raise Exception('Client socket not connected in {} second(s)'.format(timeout))
    srv_sock, _ = temp_srv_sock.accept()

    return client_sock, srv_sock


def can_read():
    print('Server can read')
    app.quit()


def write():
    print('Client writing')
    client_sock.send(b'a')
    client_sock.close()


app = QApplication([])

client_sock, srv_sock = _create_sock_pair()
notifier = QSocketNotifier(srv_sock.fileno(), QSocketNotifier.Read)
notifier.activated.connect(can_read)
write()

w = QMainWindow()
w.show()
app.exec_()
srv_sock.close()

撰写回答