让线程待命直到事件X发生

6 投票
3 回答
11220 浏览
提问于 2025-04-15 22:32

我正在写一个多线程的应用程序,每个文件我都创建一个处理器(handler)。我有一个叫做HandlerFactory的类,负责管理这些处理器的分配。我的想法是这样的:

线程A请求并从HandlerFactory类获取foo.txt的文件处理器。

线程B也请求foo.txt的文件处理器。

处理器类识别到这个文件处理器已经被线程A占用了。

处理器类让线程A进入休眠状态。

线程B通过HandlerFactory的一个封装方法关闭了文件处理器。

HandlerFactory通知正在休眠的线程。

线程A醒来,并成功获取到foo.txt的文件处理器。

这是我目前的进展,

def get_handler(self, file_path, type):
    self.lock.acquire()
    if file_path not in self.handlers:
        self.handlers[file_path] = open(file_path, type)
    elif not self.handlers[file_path].closed:
        time.sleep(1)
    self.lock.release()
    return self.handlers[file_path][type]

我认为这部分已经成功处理了线程的休眠和处理器的获取,但我不确定如何唤醒所有线程,或者更好的是,如何唤醒特定的线程。

3 个回答

0

你知道吗,Python有一个很大的锁,这样一来,你就无法充分利用多线程的好处了,对吧?

除非主线程需要处理每个工作线程的结果,否则你可以考虑为每个请求启动一个新的进程。这样你就不用担心锁的问题了。让子进程完成它们需要做的事情,然后结束。如果它们需要回传信息,可以通过管道、XMLRPC,或者使用一个线程安全的sqlite数据库来进行沟通。

2

看起来你想要为每个处理程序使用一个 threading.Semaphore(其他同步对象,比如事件和条件也可以用,但对于你的需求来说,信号量似乎是最简单的选择)。具体来说,使用 BoundedSemaphore:在你的使用场景中,如果编程错误导致信号量被释放的次数超过获取的次数,它会立即抛出异常——这正是“有界”信号量存在的原因;-)

在你创建每个信号量时,将其初始化为 1(这意味着处理程序是可用的)。每个使用线程在获取处理程序时调用 acquire 方法(这可能会让它阻塞),完成处理后调用 release 方法(这会让一个等待的线程继续执行)。这比条件的获取/等待/通知/释放的生命周期简单多了,而且更具未来适应性,因为条件的文档中提到:

当前的实现会唤醒 正在等待的线程中的 一个。然而,依赖这种行为 并不安全。未来的 优化实现可能会 偶尔唤醒多个线程。

而使用信号量则更安全(其语义是可以依赖的:如果信号量初始化为 N,那么在任何时候,成功获取信号量但尚未释放的线程数量都在 0 到 N-1 之间[[包括]])。

6

你要找的东西叫做条件变量。

条件变量

这里是Python 2的库参考。
对于Python 3,你可以在这里找到相关信息。

撰写回答