从多线程计算中更新TKinter GUI

3 投票
1 回答
4404 浏览
提问于 2025-04-16 20:37

我正在为一个Python模拟器创建一个图形用户界面(GUI)。这个界面提供了一些工具,可以用来设置模拟并运行它。在模拟运行的时候,我想把进度信息传递给这个界面,并在我的simulation_frame中的一个Label上显示出来。因为模拟需要使用多进程,所以我使用了一个Queue来把更新的信息传回给界面。

我现在的设置是,运行模拟会阻塞Tk的主循环,因为我需要在调用结束时关闭我的Pool。我调用update_idletasks()来强制界面更新进度信息。

我觉得这样做既不优雅,也可能有风险。而且,虽然在Ubuntu上可以正常工作,但在Windows XP上似乎不行——运行一会儿后窗口就会变空。我可能可以通过调用update()而不是update_idletasks()来让它在Windows上工作,但我觉得那样更糟糕。

有没有更好的解决方案呢?

相关代码:

sims = []
queues = []
svars = []
names = []
i = 0
manager = mp.Manager()
for config in self.configs:
    name, file, num = config.get()
    j = 0
    for _ in range(num):
        #progress monitor label
        q = manager.Queue()
        s_var = StringVar()
        label = Label(self.sim_frame, textvariable = s_var, bg = "white")
        s_var.set("%d: Not Started"%i)
        label.grid(row = i, column = 0, sticky = W+N)
        self.sim_labels.append(label)
        queues.append(q)
        svars.append(s_var)
        names.append("%s-%d"%(name, j))
        sims.append(("%s-%d"%(name, j),file, data, verbose, q))
        i += 1
        j += 1
self.update()

# The progress tracking is pretty hacky.

pool = mp.Pool(parallel)
num_sims = len(sims)
#start simulating
tracker = pool.map_async(run_1_sim,sims)
while not tracker.ready():
    pass
    for i in range(num_sims):
        q = queues[i]
        try:
            gen = q.get(timeout = .001)
            # if the sim has updated, update the label
            #print gen
            svars[i].set(gen)
            self.update()
        except Empty:
            pass
# The results of the map, if necessary
tracker.get()

    def update(self):
        """
        Redraws everything
        """
        self.master.update_idletasks()

def run_1_sim(args):
    """
    Runs one simulation with the specified args, output updates to the supplied
    pipe every generation
    """
    name,config,data, verbose, q = args
    sim = Simulation(config, name=name, data = data)
    generation = 0
    q.put(sim.name + ": 0")
    try:
        while sim.run(verbose=verbose, log=True, generations = sim_step):
            generation += sim_step
            q.put(sim.name + ": " + str(generation))
    except Exception as err:
        print err

1 个回答

2

这可能对你有帮助,也可能没什么用,但你可以通过确保在创建根窗口的线程上执行 tkinter 的代码和方法来让它线程安全。有一个项目尝试了这个概念,你可以在 Python Cookbook 上找到,叫做 recipe 577633(目录修剪器 2)。下面的代码来自第76到253行,扩展起来也比较简单,可以添加小部件。


主要的线程安全支持

# Import several GUI libraries.
import tkinter.ttk
import tkinter.filedialog
import tkinter.messagebox

# Import other needed modules.
import queue
import _thread
import operator

################################################################################

class AffinityLoop:

    "Restricts code execution to thread that instance was created on."

    __slots__ = '__action', '__thread'

    def __init__(self):
        "Initialize AffinityLoop with job queue and thread identity."
        self.__action = queue.Queue()
        self.__thread = _thread.get_ident()

    def run(self, func, *args, **keywords):
        "Run function on creating thread and return result."
        if _thread.get_ident() == self.__thread:
            self.__run_jobs()
            return func(*args, **keywords)
        else:
            job = self.__Job(func, args, keywords)
            self.__action.put_nowait(job)
            return job.result

    def __run_jobs(self):
        "Run all pending jobs currently in the job queue."
        while not self.__action.empty():
            job = self.__action.get_nowait()
            job.execute()

    ########################################################################

    class __Job:

        "Store information to run a job at a later time."

        __slots__ = ('__func', '__args', '__keywords',
                     '__error', '__mutex', '__value')

        def __init__(self, func, args, keywords):
            "Initialize the job's info and ready for execution."
            self.__func = func
            self.__args = args
            self.__keywords = keywords
            self.__error = False
            self.__mutex = _thread.allocate_lock()
            self.__mutex.acquire()

        def execute(self):
            "Run the job, store any error, and return to sender."
            try:
                self.__value = self.__func(*self.__args, **self.__keywords)
            except Exception as error:
                self.__error = True
                self.__value = error
            self.__mutex.release()

        @property
        def result(self):
            "Return execution result or raise an error."
            self.__mutex.acquire()
            if self.__error:
                raise self.__value
            return self.__value

################################################################################

class _ThreadSafe:

    "Create a thread-safe GUI class for safe cross-threaded calls."

    ROOT = tkinter.Tk

    def __init__(self, master=None, *args, **keywords):
        "Initialize a thread-safe wrapper around a GUI base class."
        if master is None:
            if self.BASE is not self.ROOT:
                raise ValueError('Widget must have a master!')
            self.__job = AffinityLoop() # Use Affinity() if it does not break.
            self.__schedule(self.__initialize, *args, **keywords)
        else:
            self.master = master
            self.__job = master.__job
            self.__schedule(self.__initialize, master, *args, **keywords)

    def __initialize(self, *args, **keywords):
        "Delegate instance creation to later time if necessary."
        self.__obj = self.BASE(*args, **keywords)

    ########################################################################

    # Provide a framework for delaying method execution when needed.

    def __schedule(self, *args, **keywords):
        "Schedule execution of a method till later if necessary."
        return self.__job.run(self.__run, *args, **keywords)

    @classmethod
    def __run(cls, func, *args, **keywords):
        "Execute the function after converting the arguments."
        args = tuple(cls.unwrap(i) for i in args)
        keywords = dict((k, cls.unwrap(v)) for k, v in keywords.items())
        return func(*args, **keywords)

    @staticmethod
    def unwrap(obj):
        "Unpack inner objects wrapped by _ThreadSafe instances."
        return obj.__obj if isinstance(obj, _ThreadSafe) else obj

    ########################################################################

    # Allow access to and manipulation of wrapped instance's settings.

    def __getitem__(self, key):
        "Get a configuration option from the underlying object."
        return self.__schedule(operator.getitem, self, key)

    def __setitem__(self, key, value):
        "Set a configuration option on the underlying object."
        return self.__schedule(operator.setitem, self, key, value)

    ########################################################################

    # Create attribute proxies for methods and allow their execution.

    def __getattr__(self, name):
        "Create a requested attribute and return cached result."
        attr = self.__Attr(self.__callback, (name,))
        setattr(self, name, attr)
        return attr

    def __callback(self, path, *args, **keywords):
        "Schedule execution of named method from attribute proxy."
        return self.__schedule(self.__method, path, *args, **keywords)

    def __method(self, path, *args, **keywords):
        "Extract a method and run it with the provided arguments."
        method = self.__obj
        for name in path:
            method = getattr(method, name)
        return method(*args, **keywords)

    ########################################################################

    class __Attr:

        "Save an attribute's name and wait for execution."

        __slots__ = '__callback', '__path'

        def __init__(self, callback, path):
            "Initialize proxy with callback and method path."
            self.__callback = callback
            self.__path = path

        def __call__(self, *args, **keywords):
            "Run a known method with the given arguments."
            return self.__callback(self.__path, *args, **keywords)

        def __getattr__(self, name):
            "Generate a proxy object for a sub-attribute."
            if name in {'__func__', '__name__'}:
                # Hack for the "tkinter.__init__.Misc._register" method.
                raise AttributeError('This is not a real method!')
            return self.__class__(self.__callback, self.__path + (name,))

################################################################################

# Provide thread-safe classes to be used from tkinter.

class Tk(_ThreadSafe): BASE = tkinter.Tk
class Frame(_ThreadSafe): BASE = tkinter.ttk.Frame
class Button(_ThreadSafe): BASE = tkinter.ttk.Button
class Entry(_ThreadSafe): BASE = tkinter.ttk.Entry
class Progressbar(_ThreadSafe): BASE = tkinter.ttk.Progressbar
class Treeview(_ThreadSafe): BASE = tkinter.ttk.Treeview
class Scrollbar(_ThreadSafe): BASE = tkinter.ttk.Scrollbar
class Sizegrip(_ThreadSafe): BASE = tkinter.ttk.Sizegrip
class Menu(_ThreadSafe): BASE = tkinter.Menu
class Directory(_ThreadSafe): BASE = tkinter.filedialog.Directory
class Message(_ThreadSafe): BASE = tkinter.messagebox.Message

如果你查看应用程序的其余部分,你会发现它是用你在其他 tkinter 应用中常见的 _ThreadSafe 变体小部件构建的。当来自不同线程的方法调用进来时,它们会被自动暂时保存,直到可以在创建它们的线程上执行这些调用。注意 mainloop 是如何通过第291到298行和326到336行进行替换的。


注意 NoDefaltRoot 和 main_loop 调用

@classmethod
def main(cls):
    "Create an application containing a single TrimDirView widget."
    tkinter.NoDefaultRoot()
    root = cls.create_application_root()
    cls.attach_window_icon(root, ICON)
    view = cls.setup_class_instance(root)
    cls.main_loop(root)

main_loop 允许线程执行

@staticmethod
def main_loop(root):
    "Process all GUI events according to tkinter's settings."
    target = time.clock()
    while True:
        try:
            root.update()
        except tkinter.TclError:
            break
        target += tkinter._tkinter.getbusywaitinterval() / 1000
        time.sleep(max(target - time.clock(), 0))

撰写回答