为什么threading.Condition.notify_all没有唤醒等待线程?

0 投票
5 回答
59 浏览
提问于 2025-04-14 17:13

下面这段代码是用来演示如何和一个线程进行同步的。

  • 我想要有一个单独的线程来更新一张图片。
  • 从这些图片中,我想创建一个异步生成器。
  • 这些图片只有在异步生成器使用的时候才会被更新。
  • 这个异步生成器应该在等待新图片被创建。

下面是相关的代码。不过,它在等待第一张图片的时候卡住了。

为什么notify_all没有释放image_created.wait呢?

# Output
create new image
waiting for new image
start waiter
notify_all
wait for someone to take it
waiting for image_created
import asyncio
import random
import threading
import time


class ImageUpdater:
    def __init__(self):
        self.image = None
        self.image_used = threading.Event()
        self.image_created = threading.Condition()

    def update_image(self):
        while True:
            self.image_used.clear()
            with self.image_created:
                print("create new image")
                time.sleep(0.6)
                self.image = str(random.random())
                print("notify_all")
                self.image_created.notify_all()
            print("wait for someone to take it")
            self.image_used.wait()
            print("someone took it")

    async def image_generator(self):
        def waiter():
            print("start waiter")
            time.sleep(0.1)
            with self.image_created:
                print("waiting for image_created")
                self.image_created.wait()
            print("waiter finished")
            self.image_used.set()

        while True:
            print("waiting for new image")
            await asyncio.to_thread(waiter)

            yield self.image


async def main():
    updater = ImageUpdater()

    update_thread = threading.Thread(target=updater.update_image)
    update_thread.start()

    async for image in updater.image_generator():
        print(f"Received new image: {image}")


if __name__ == "__main__":
    loop = asyncio.run(main())

5 个回答

0

time.sleepenupdate_imagemethod 中可能会阻塞事件循环。最好使用 await asyncio.sleepen 来代替。

image_generator 方法必须是一个异步生成器函数,这样 async for 才能正常工作。

你可以使用 asyncio 来代替。它可以用来发出信号,表示新图像已经可用。

1

这里有一个替代方案,它使用队列来同步生产者和消费者:

import asyncio
import random
import threading
import queue
import time

class ImageUpdater:
    def __init__(self):
        self.q = queue.Queue(1)

    def update_image(self):
        while True:
            time.sleep(0.6)
            print('Creating new image')
            image = str(random.random())
            self.q.put(image)

    async def image_generator(self):
        while True:
            print("waiting for new image")
            image = self.q.get()
            yield image

async def main():
    updater = ImageUpdater()

    update_thread = threading.Thread(target=updater.update_image)
    update_thread.start()


    async for image in updater.image_generator():
        print(f"Received new image: {image}")

if __name__ == "__main__":
    asyncio.run(main())

示例输出:

waiting for new image
Creating new image
Received new image: 0.004152636324671333
waiting for new image
Creating new image
Received new image: 0.23799099029083715
waiting for new image
Creating new image
Received new image: 0.17644888185774932
waiting for new image

更新:更新了代码,使队列的最大容量为1,这样update_image()方法的线程会被阻塞,直到队列中有足够的空间来放下下一张图片。

1

在你的函数 waiter 中,使用上下文管理器的 __enter__ 方法(也就是 with self.image_created)会获取 self.image_created 的锁。当你调用 wait 时,这个锁会被释放,线程会在这里等待,直到另一个线程调用 self.image_created.notify。但这个情况并不会发生,因为另一个线程被阻塞了,它在等着 self.image_used 被设置。这样一来,两个线程就陷入了死锁。

对于 self.image_created 的上下文管理器来说,进入 with: 块时会获取锁,退出时会释放锁。所以这段代码:

with self.image_created:
    self.image_created.wait()

基本上等同于:

self.image_created.acquire()  
# when you reach here, this thread owns the lock
self.image_created.wait()  # releases the lock
                           # waits for another thread to call notify
# You will never get this far since the other thread is blocked
self.image_created.release()

与其在 with self.image_created 中调用 wait,不如直接处理图像并调用 self.image_process.set()。这样就避免了调用 self.image_process.wait,而这个调用正是导致死锁的原因。不过在这种情况下,你就完全没有利用 Condition 对象的功能了——你只是把它当成了另一个 threading.Event 来用。

这个更简单的程序,使用两个 Event 对象来确保线程轮流工作,是可以正常运行的:

import asyncio
import random
import threading
import time


class ImageUpdater:
    def __init__(self):
        self.image = None
        self.image_used = threading.Event()
        self.image_created = threading.Event()

    def update_image(self):
        while True:
            self.image_used.clear()
            print("create new image")
            time.sleep(0.6)
            self.image = str(random.random())
            print("new image created")
            self.image_created.set()
            print("wait for someone to take it")
            self.image_used.wait()
            print("someone took it")

    async def image_generator(self):
        def waiter():
            print("start waiter")
            time.sleep(0.1)
            print("waiting for image_created")
            self.image_created.wait()
            self.image_created.clear()
            time.sleep(0.3)
            self.image_used.set()    

        while True:
            print("waiting for new image")
            await asyncio.to_thread(waiter)

            yield self.image

async def main():
    updater = ImageUpdater()

    update_thread = threading.Thread(target=updater.update_image)
    update_thread.start()

    async for image in updater.image_generator():
        print(f"Received new image: {image}")


if __name__ == "__main__":
    loop = asyncio.run(main())

你提到过最终想要让多个线程处理图像,所以从长远来看,使用生产者-消费者架构可能会更清晰。可以参考 quamrana 的回答。

撰写回答