如何在Python中使用异步迭代器实现观察者模式?

2 投票
1 回答
67 浏览
提问于 2025-04-13 02:44

我正在用Python的asyncio和异步迭代器来实现观察者模式。我的目标是创建一个“变化流”,让任务可以添加变化,其他任务可以作为异步迭代器来订阅这些变化。我想做一个类似于Dart中广播流的接口。

下面是我目前的简化版本:

from asyncio import Condition

class ChangeStream:
    def __init__(self):
        self._condition = Condition()
        self._change = None

    async def add_change(self, change):
        async with self._condition:
            self._change = change
            self._condition.notify_all()
    
    async def __aiter__(self):
        async with self._condition:
            while True:
                await self._condition.wait()
                yield self._change

这样实现观察者模式是否是最好的方法?有没有更高效或者更容易理解的实现方式?

补充说明:我的目标是让观察者能看到他们订阅后所有的变化,但看不到他们订阅前的变化。

1 个回答

3

根据你的描述和评论,看来你其实是在寻找一种“发布-订阅”模式,而不是“观察者”模式。这两者有点相似,但观察者模式通常是由主题来知道观察者,并直接给他们更新;而在发布-订阅模式中,发布者会把信息发布到一个公共的地方(就像你的流),这个地方会跟踪那些对数据感兴趣的消费者。

这里有一个基于队列的实现示例,还有一些额外的代码来展示这一切是如何运作的——当然,你可以用不同的方式使用这个类,任务组只是启动一些任务的一种方法:

from asyncio import Queue, TaskGroup, sleep, run
from random import random


class ChangeStream:
    def __init__(self):
        self._subscribers = []

    async def add_change(self, change):
        for queue in self._subscribers:
            await queue.put(change)

    async def __aiter__(self):
        queue = Queue()
        self._subscribers.append(queue)
        try:
            while True:
                yield (value := await queue.get())
                if value is None:
                    raise GeneratorExit
        except GeneratorExit:
            self._subscribers.remove(queue)


async def produce_changes(stream: ChangeStream):
    for i in range(10):
        await sleep(random() * 3)
        print(f"Add value: {i}")
        await stream.add_change(i)
    print("Done adding values, writing None to stream.")
    await stream.add_change(None)


async def consume_changes(stream: ChangeStream, name: str, start_delay: int):
    await sleep(start_delay)
    print(f"{name} starting...")
    async for value in stream:
        print(f"{name} received: {value}")
        if value is None:
            break


async def main():
    # create a stream, pass it to a producer task to publish to
    stream = ChangeStream()

    async with TaskGroup() as tg:
        tg.create_task(produce_changes(stream))
        tg.create_task(consume_changes(stream, 'consumer 1', 0))
        tg.create_task(consume_changes(stream, 'consumer 2', 5))  # start after 5 sec


if __name__ == "__main__":
    run(main())

输出(示例,由于随机性):

consumer 1 starting...
Add value: 0
consumer 1 received: 0
Add value: 1
consumer 1 received: 1
Add value: 2
consumer 1 received: 2
consumer 2 starting...
Add value: 3
consumer 1 received: 3
consumer 2 received: 3
Add value: 4
consumer 1 received: 4
consumer 2 received: 4
Add value: 5
consumer 1 received: 5
consumer 2 received: 5
Add value: 6
consumer 1 received: 6
consumer 2 received: 6
Add value: 7
consumer 1 received: 7
consumer 2 received: 7
Add value: 8
consumer 1 received: 8
consumer 2 received: 8
Add value: 9
Done adding values, writing None to stream.
consumer 1 received: 9
consumer 1 received: None
consumer 2 received: 9
consumer 2 received: None

撰写回答