如何在Python中使用异步迭代器实现观察者模式?
我正在用Python的asyncio和异步迭代器来实现观察者模式。我的目标是创建一个“变化流”,让任务可以添加变化,其他任务可以作为异步迭代器来订阅这些变化。我想做一个类似于Dart中广播流的接口。
下面是我目前的简化版本:
from asyncio import Condition
class ChangeStream:
def __init__(self):
self._condition = Condition()
self._change = None
async def add_change(self, change):
async with self._condition:
self._change = change
self._condition.notify_all()
async def __aiter__(self):
async with self._condition:
while True:
await self._condition.wait()
yield self._change
这样实现观察者模式是否是最好的方法?有没有更高效或者更容易理解的实现方式?
补充说明:我的目标是让观察者能看到他们订阅后所有的变化,但看不到他们订阅前的变化。
1 个回答
3
根据你的描述和评论,看来你其实是在寻找一种“发布-订阅”模式,而不是“观察者”模式。这两者有点相似,但观察者模式通常是由主题来知道观察者,并直接给他们更新;而在发布-订阅模式中,发布者会把信息发布到一个公共的地方(就像你的流),这个地方会跟踪那些对数据感兴趣的消费者。
这里有一个基于队列
的实现示例,还有一些额外的代码来展示这一切是如何运作的——当然,你可以用不同的方式使用这个类,任务组
只是启动一些任务的一种方法:
from asyncio import Queue, TaskGroup, sleep, run
from random import random
class ChangeStream:
def __init__(self):
self._subscribers = []
async def add_change(self, change):
for queue in self._subscribers:
await queue.put(change)
async def __aiter__(self):
queue = Queue()
self._subscribers.append(queue)
try:
while True:
yield (value := await queue.get())
if value is None:
raise GeneratorExit
except GeneratorExit:
self._subscribers.remove(queue)
async def produce_changes(stream: ChangeStream):
for i in range(10):
await sleep(random() * 3)
print(f"Add value: {i}")
await stream.add_change(i)
print("Done adding values, writing None to stream.")
await stream.add_change(None)
async def consume_changes(stream: ChangeStream, name: str, start_delay: int):
await sleep(start_delay)
print(f"{name} starting...")
async for value in stream:
print(f"{name} received: {value}")
if value is None:
break
async def main():
# create a stream, pass it to a producer task to publish to
stream = ChangeStream()
async with TaskGroup() as tg:
tg.create_task(produce_changes(stream))
tg.create_task(consume_changes(stream, 'consumer 1', 0))
tg.create_task(consume_changes(stream, 'consumer 2', 5)) # start after 5 sec
if __name__ == "__main__":
run(main())
输出(示例,由于随机性):
consumer 1 starting...
Add value: 0
consumer 1 received: 0
Add value: 1
consumer 1 received: 1
Add value: 2
consumer 1 received: 2
consumer 2 starting...
Add value: 3
consumer 1 received: 3
consumer 2 received: 3
Add value: 4
consumer 1 received: 4
consumer 2 received: 4
Add value: 5
consumer 1 received: 5
consumer 2 received: 5
Add value: 6
consumer 1 received: 6
consumer 2 received: 6
Add value: 7
consumer 1 received: 7
consumer 2 received: 7
Add value: 8
consumer 1 received: 8
consumer 2 received: 8
Add value: 9
Done adding values, writing None to stream.
consumer 1 received: 9
consumer 1 received: None
consumer 2 received: 9
consumer 2 received: None