PyQt - 如何检测并关闭已在运行的UI?

9 投票
3 回答
8736 浏览
提问于 2025-04-17 09:50

我在Maya里启动一个用户界面(UI)。如果这个界面没有被关闭,再次运行这个界面就会让Maya完全卡住,还会出现“事件循环已经在运行”的错误。

在重新运行脚本之前手动关闭这个界面可以避免卡住,但我觉得这样做不太方便。

有没有办法检测我想要运行的界面是否已经存在?还有可能强制关闭它吗?

3 个回答

1

我的解决方案是这样的:

import sys

from PyQt5.QtCore import QLockFile
from PyQt5.QtWidgets import QApplication
from PyQt5.QtWidgets import QMessageBox

from window import MainWindow


if __name__ == "__main__":
    try:
        app_object = QApplication(sys.argv)
        lock_file = QLockFile("app.lock")

        if lock_file.tryLock():
            window = MainWindow()
            window.show()

            app_object.exec()
        else:
            error_message = QMessageBox()
            error_message.setIcon(QMessageBox.Warning)
            error_message.setWindowTitle("Error")
            error_message.setText("The application is already running!")
            error_message.setStandardButtons(QMessageBox.Ok)
            error_message.exec()
    finally:
        lock_file.unlock()
9

如果有人想用 @ekhumoro 的方法在 python3 上运行,需要对字符串操作做一些调整。我会分享我修改过的版本,它在python 3上可以正常工作。

import sys

from PyQt4 import QtGui, QtCore, QtNetwork

class SingleApplication(QtGui.QApplication):
    def __init__(self, argv, key):
        QtGui.QApplication.__init__(self, argv)
        self._memory = QtCore.QSharedMemory(self)
        self._memory.setKey(key)
        if self._memory.attach():
            self._running = True
        else:
            self._running = False
            if not self._memory.create(1):
                raise RuntimeError( self._memory.errorString() )

    def isRunning(self):
        return self._running

class SingleApplicationWithMessaging(SingleApplication):
    def __init__(self, argv, key):
        SingleApplication.__init__(self, argv, key)
        self._key = key
        self._timeout = 1000
        self._server = QtNetwork.QLocalServer(self)

        if not self.isRunning():
            self._server.newConnection.connect(self.handleMessage)
            self._server.listen(self._key)

    def handleMessage(self):
        socket = self._server.nextPendingConnection()
        if socket.waitForReadyRead(self._timeout):
            self.emit(QtCore.SIGNAL('messageAvailable'), bytes(socket.readAll().data()).decode('utf-8') )
            socket.disconnectFromServer()
        else:
            QtCore.qDebug(socket.errorString())

    def sendMessage(self, message):
        if self.isRunning():
            socket = QtNetwork.QLocalSocket(self)
            socket.connectToServer(self._key, QtCore.QIODevice.WriteOnly)
            if not socket.waitForConnected(self._timeout):
                print(socket.errorString())
                return False
            socket.write(str(message).encode('utf-8'))
            if not socket.waitForBytesWritten(self._timeout):
                print(socket.errorString())
                return False
            socket.disconnectFromServer()
            return True
        return False

class Window(QtGui.QWidget):
    def __init__(self):
        QtGui.QWidget.__init__(self)
        self.edit = QtGui.QLineEdit(self)
        self.edit.setMinimumWidth(300)
        layout = QtGui.QVBoxLayout(self)
        layout.addWidget(self.edit)

    def handleMessage(self, message):
        self.edit.setText(message)

if __name__ == '__main__':

    key = 'foobar'

    # if parameter no. 1 was set then we'll use messaging between app instances
    if len(sys.argv) > 1:
        app = SingleApplicationWithMessaging(sys.argv, key)
        if app.isRunning():
            msg = ''
            # checking if custom message was passed as cli argument
            if len(sys.argv) > 2:
                msg = sys.argv[2]
            else:
                msg = 'APP ALREADY RUNNING'
            app.sendMessage( msg )
            print( "app is already running, sent following message: \n\"{0}\"".format( msg ) )
            sys.exit(1)
    else:
        app = SingleApplication(sys.argv, key)
        if app.isRunning():
            print('app is already running, no message has been sent')
            sys.exit(1)

    window = Window()
    app.connect(app, QtCore.SIGNAL('messageAvailable'), window.handleMessage)
    window.show()

    sys.exit(app.exec_())

下面是一些命令行调用的例子,假设你的脚本名字是 "SingleInstanceApp.py":

python SingleInstanceApp.py 1
python SingleInstanceApp.py 1 "test"
python SingleInstanceApp.py 1 "foo bar baz"
python SingleInstanceApp.py 1 "utf8 test FOO ßÄÖÜ ßäöü łąćźżóń ŁĄĆŹŻÓŃ etc"

(这里是没有第一个参数的调用,所以消息就不会发送)

python SingleInstanceApp.py

希望这能帮助到某些人。

21

这里有一个非常简单的 PyQt5 解决方案,使用了 QLockFile

from PyQt5 import QtCore, QtWidgets

lockfile = QtCore.QLockFile(QtCore.QDir.tempPath() + '/my_app_name.lock')

if lockfile.tryLock(100):
    app = QtWidgets.QApplication([])
    win = QtWidgets.QWidget()
    win.setGeometry(50, 50, 100, 100)
    win.show()
    app.exec()
else:
    print('app is already running')

之前在 Qt Wiki 上有几个比较简单的 C++ 解决方案,但现在似乎找不到了。我把其中一个转换成了 PyQt,并在下面提供了一个示例脚本。原来的 C++ 解决方案被分成了两个类,因为可能不需要消息传递的功能。

PyQt5

from PyQt5 import QtWidgets, QtCore, QtNetwork

class SingleApplication(QtWidgets.QApplication):
    messageAvailable = QtCore.pyqtSignal(object)

    def __init__(self, argv, key):
        super().__init__(argv)
        # cleanup (only needed for unix)
        QtCore.QSharedMemory(key).attach()
        self._memory = QtCore.QSharedMemory(self)
        self._memory.setKey(key)
        if self._memory.attach():
            self._running = True
        else:
            self._running = False
            if not self._memory.create(1):
                raise RuntimeError(self._memory.errorString())

    def isRunning(self):
        return self._running

class SingleApplicationWithMessaging(SingleApplication):
    def __init__(self, argv, key):
        super().__init__(argv, key)
        self._key = key
        self._timeout = 1000
        self._server = QtNetwork.QLocalServer(self)
        if not self.isRunning():
            self._server.newConnection.connect(self.handleMessage)
            self._server.listen(self._key)

    def handleMessage(self):
        socket = self._server.nextPendingConnection()
        if socket.waitForReadyRead(self._timeout):
            self.messageAvailable.emit(
                socket.readAll().data().decode('utf-8'))
            socket.disconnectFromServer()
        else:
            QtCore.qDebug(socket.errorString())

    def sendMessage(self, message):
        if self.isRunning():
            socket = QtNetwork.QLocalSocket(self)
            socket.connectToServer(self._key, QtCore.QIODevice.WriteOnly)
            if not socket.waitForConnected(self._timeout):
                print(socket.errorString())
                return False
            if not isinstance(message, bytes):
                message = message.encode('utf-8')
            socket.write(message)
            if not socket.waitForBytesWritten(self._timeout):
                print(socket.errorString())
                return False
            socket.disconnectFromServer()
            return True
        return False

class Window(QtWidgets.QWidget):
    def __init__(self):
        super().__init__()
        self.edit = QtWidgets.QLineEdit(self)
        self.edit.setMinimumWidth(300)
        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(self.edit)

    def handleMessage(self, message):
        self.edit.setText(message)

if __name__ == '__main__':

    import sys

    key = 'app-name'

    # send commandline args as message
    if len(sys.argv) > 1:
        app = SingleApplicationWithMessaging(sys.argv, key)
        if app.isRunning():
            print('app is already running')
            app.sendMessage(' '.join(sys.argv[1:]))
            sys.exit(1)
    else:
        app = SingleApplication(sys.argv, key)
        if app.isRunning():
            print('app is already running')
            sys.exit(1)

    window = Window()
    app.messageAvailable.connect(window.handleMessage)
    window.show()

    sys.exit(app.exec_())

PyQt4

# only needed for python2
import sip
sip.setapi('QString', 2)

from PyQt4 import QtGui, QtCore, QtNetwork

class SingleApplication(QtGui.QApplication):
    messageAvailable = QtCore.pyqtSignal(object)

    def __init__(self, argv, key):
        QtGui.QApplication.__init__(self, argv)
        # cleanup (only needed for unix)
        QtCore.QSharedMemory(key).attach()
        self._memory = QtCore.QSharedMemory(self)
        self._memory.setKey(key)
        if self._memory.attach():
            self._running = True
        else:
            self._running = False
            if not self._memory.create(1):
                raise RuntimeError(self._memory.errorString())

    def isRunning(self):
        return self._running

class SingleApplicationWithMessaging(SingleApplication):
    def __init__(self, argv, key):
        SingleApplication.__init__(self, argv, key)
        self._key = key
        self._timeout = 1000
        self._server = QtNetwork.QLocalServer(self)
        if not self.isRunning():
            self._server.newConnection.connect(self.handleMessage)
            self._server.listen(self._key)

    def handleMessage(self):
        socket = self._server.nextPendingConnection()
        if socket.waitForReadyRead(self._timeout):
            self.messageAvailable.emit(
                socket.readAll().data().decode('utf-8'))
            socket.disconnectFromServer()
        else:
            QtCore.qDebug(socket.errorString())

    def sendMessage(self, message):
        if self.isRunning():
            socket = QtNetwork.QLocalSocket(self)
            socket.connectToServer(self._key, QtCore.QIODevice.WriteOnly)
            if not socket.waitForConnected(self._timeout):
                print(socket.errorString())
                return False
            if not isinstance(message, bytes):
                message = message.encode('utf-8')
            socket.write(message)
            if not socket.waitForBytesWritten(self._timeout):
                print(socket.errorString())
                return False
            socket.disconnectFromServer()
            return True
        return False

class Window(QtGui.QWidget):
    def __init__(self):
        QtGui.QWidget.__init__(self)
        self.edit = QtGui.QLineEdit(self)
        self.edit.setMinimumWidth(300)
        layout = QtGui.QVBoxLayout(self)
        layout.addWidget(self.edit)

    def handleMessage(self, message):
        self.edit.setText(message)

if __name__ == '__main__':

    import sys

    key = 'app-name'

    # send commandline args as message
    if len(sys.argv) > 1:
        app = SingleApplicationWithMessaging(sys.argv, key)
        if app.isRunning():
            print('app is already running')
            app.sendMessage(' '.join(sys.argv[1:]))
            sys.exit(1)
    else:
        app = SingleApplication(sys.argv, key)
        if app.isRunning():
            print('app is already running')
            sys.exit(1)

    window = Window()
    app.messageAvailable.connect(window.handleMessage)
    window.show()

    sys.exit(app.exec_())

撰写回答