支持多通道订阅者的Python可观察实现

1 投票
1 回答
1228 浏览
提问于 2025-04-17 03:36

在我的一个复杂应用中,有一系列的资源控制器/管理类,它们通过可观察者模式进行互动。一般来说,大多数观察者会订阅一个特定的频道(比如“foo.bar.entity2”),但有一些情况我希望能知道某个特定频道下的所有事件(比如“foo.*”)。为此,我写了如下的代码:

    from collections import defaultdict

    class SimplePubSub(object):

        def __init__(self):
            self.subjects = defaultdict(list)


        def subscribe(self, subject, callbackstr):
            """
                for brevity, callbackstr would be a valid Python function or bound method but here is just a string
            """
            self.subjects[subject].append(callbackstr)

        def fire(self, subject):
            """
                Again for brevity, fire would have *args, **kwargs or some other additional message arguments but not here            
            """

            if subject in self.subjects:
                print "Firing callback %s" % subject
                for callback in self.subjects[subject]:
                    print "callback %s" % callback



    pubSub = SimplePubSub()
    pubSub.subscribe('foo.bar', "foo.bar1")
    pubSub.subscribe('foo.foo', "foo.foo1")

    pubSub.subscribe('foo.ich.tier1', "foo.ich.tier3_1")
    pubSub.subscribe('foo.ich.tier2', "foo.ich.tier2_1")
    pubSub.subscribe('foo.ich.tier3', "foo.ich.tier2_1")

    #Find everything that starts with foo
    #say foo.bar is fired
    firedSubject = "foo.bar"
    pubSub.fire(firedSubject)
    #outputs 
    #>>Firing callback foo.bar
    #>>callback foo.bar1

    #but let's say I want to add a callback for everything undr foo.ich

    class GlobalPubSub(SimplePubSub):

        def __init__(self):
            self.globals = defaultdict(list)
            super(GlobalPubSub, self).__init__()


        def subscribe(self, subject, callback):
            if subject.find("*") > -1:
                #assumes global suscriptions would be like subject.foo.* and we want to catch all subject.foo's 
                self.globals[subject[:-2]].append(callback)
            else:
                super(GlobalPubSub, self).subscribe(subject, callback)

        def fire(self, subject):
            super(GlobalPubSub, self).fire(subject)
            if self.globals:
                for key in self.globals.iterkeys():
                    if subject.startswith(key):
                        for callback in self.globals[key]:
                            print "global callback says", callback

    print "Now with global subscriptions"
    print
    pubSub = GlobalPubSub()
    pubSub.subscribe('foo.bar', "foo.bar1")
    pubSub.subscribe('foo.foo', "foo.foo1")

    pubSub.subscribe('foo.ich.tier1', "foo.ich.tier3_1")
    pubSub.subscribe('foo.ich.tier2', "foo.ich.tier2_1")
    pubSub.subscribe('foo.ich.tier3', "foo.ich.tier2_1")
    pubSub.subscribe("foo.ich.*", "All the ichs, all the time!")

    #Find everything that starts with foo.ich
    firedSubject = "foo.ich.tier2"
    pubSub.fire(firedSubject)
    #outputs 
    #>>Firing callback foo.bar
    #>>callback foo.bar1
    #>>Now with global subscriptions
    #
    #>>Firing callback foo.ich.tier2
    #>>callback foo.ich.tier2_1
    #>>global callback says All the ichs, all the time!

这样做是否已经足够好,还是说我需要使用一些更复杂的结构(比如尝试使用某种特殊的构造)?我希望能得到确认,看看我是否走在正确的路上,或者有没有更好的建议,关于一个纯Python的全局订阅处理器(不使用外部库或服务)。

1 个回答

1

看起来你走在正确的道路上。我之前在一个wxPython的应用中用过PyPubSub,后来自己实现了一个“更简单”的版本,基本上和你现在做的很像,只是多了一些额外的功能,这些功能你可能在完善需求时会逐渐添加上去。

这里给出的答案也和你做的很相似

这个答案提供了一些稍微不同的方法示例。

除了PyPubSub,还有很多现成的库,比如pydispatchblinker,这些也许值得你参考或者获取灵感。

撰写回答