为什么threading.Condition.notify_all没有唤醒等待线程?
下面这段代码是用来演示如何和一个线程进行同步的。
- 我想要有一个单独的线程来更新一张图片。
- 从这些图片中,我想创建一个异步生成器。
- 这些图片只有在异步生成器使用的时候才会被更新。
- 这个异步生成器应该在等待新图片被创建。
下面是相关的代码。不过,它在等待第一张图片的时候卡住了。
为什么notify_all没有释放image_created.wait呢?
# Output
create new image
waiting for new image
start waiter
notify_all
wait for someone to take it
waiting for image_created
import asyncio
import random
import threading
import time
class ImageUpdater:
def __init__(self):
self.image = None
self.image_used = threading.Event()
self.image_created = threading.Condition()
def update_image(self):
while True:
self.image_used.clear()
with self.image_created:
print("create new image")
time.sleep(0.6)
self.image = str(random.random())
print("notify_all")
self.image_created.notify_all()
print("wait for someone to take it")
self.image_used.wait()
print("someone took it")
async def image_generator(self):
def waiter():
print("start waiter")
time.sleep(0.1)
with self.image_created:
print("waiting for image_created")
self.image_created.wait()
print("waiter finished")
self.image_used.set()
while True:
print("waiting for new image")
await asyncio.to_thread(waiter)
yield self.image
async def main():
updater = ImageUpdater()
update_thread = threading.Thread(target=updater.update_image)
update_thread.start()
async for image in updater.image_generator():
print(f"Received new image: {image}")
if __name__ == "__main__":
loop = asyncio.run(main())
5 个回答
time.sleepen
在 update_imagemethod
中可能会阻塞事件循环。最好使用 await asyncio.sleepen
来代替。
image_generator
方法必须是一个异步生成器函数,这样 async for
才能正常工作。
你可以使用 asyncio
来代替。它可以用来发出信号,表示新图像已经可用。
这里有一个替代方案,它使用队列来同步生产者和消费者:
import asyncio
import random
import threading
import queue
import time
class ImageUpdater:
def __init__(self):
self.q = queue.Queue(1)
def update_image(self):
while True:
time.sleep(0.6)
print('Creating new image')
image = str(random.random())
self.q.put(image)
async def image_generator(self):
while True:
print("waiting for new image")
image = self.q.get()
yield image
async def main():
updater = ImageUpdater()
update_thread = threading.Thread(target=updater.update_image)
update_thread.start()
async for image in updater.image_generator():
print(f"Received new image: {image}")
if __name__ == "__main__":
asyncio.run(main())
示例输出:
waiting for new image
Creating new image
Received new image: 0.004152636324671333
waiting for new image
Creating new image
Received new image: 0.23799099029083715
waiting for new image
Creating new image
Received new image: 0.17644888185774932
waiting for new image
更新:更新了代码,使队列的最大容量为1,这样update_image()
方法的线程会被阻塞,直到队列中有足够的空间来放下下一张图片。
在你的函数 waiter
中,使用上下文管理器的 __enter__
方法(也就是 with self.image_created
)会获取 self.image_created
的锁。当你调用 wait
时,这个锁会被释放,线程会在这里等待,直到另一个线程调用 self.image_created.notify
。但这个情况并不会发生,因为另一个线程被阻塞了,它在等着 self.image_used
被设置。这样一来,两个线程就陷入了死锁。
对于 self.image_created
的上下文管理器来说,进入 with:
块时会获取锁,退出时会释放锁。所以这段代码:
with self.image_created:
self.image_created.wait()
基本上等同于:
self.image_created.acquire()
# when you reach here, this thread owns the lock
self.image_created.wait() # releases the lock
# waits for another thread to call notify
# You will never get this far since the other thread is blocked
self.image_created.release()
与其在 with self.image_created
中调用 wait,不如直接处理图像并调用 self.image_process.set()
。这样就避免了调用 self.image_process.wait
,而这个调用正是导致死锁的原因。不过在这种情况下,你就完全没有利用 Condition 对象的功能了——你只是把它当成了另一个 threading.Event 来用。
这个更简单的程序,使用两个 Event 对象来确保线程轮流工作,是可以正常运行的:
import asyncio
import random
import threading
import time
class ImageUpdater:
def __init__(self):
self.image = None
self.image_used = threading.Event()
self.image_created = threading.Event()
def update_image(self):
while True:
self.image_used.clear()
print("create new image")
time.sleep(0.6)
self.image = str(random.random())
print("new image created")
self.image_created.set()
print("wait for someone to take it")
self.image_used.wait()
print("someone took it")
async def image_generator(self):
def waiter():
print("start waiter")
time.sleep(0.1)
print("waiting for image_created")
self.image_created.wait()
self.image_created.clear()
time.sleep(0.3)
self.image_used.set()
while True:
print("waiting for new image")
await asyncio.to_thread(waiter)
yield self.image
async def main():
updater = ImageUpdater()
update_thread = threading.Thread(target=updater.update_image)
update_thread.start()
async for image in updater.image_generator():
print(f"Received new image: {image}")
if __name__ == "__main__":
loop = asyncio.run(main())
你提到过最终想要让多个线程处理图像,所以从长远来看,使用生产者-消费者架构可能会更清晰。可以参考 quamrana 的回答。