有用的发布订阅语义
我在寻找一些像维基百科那样的参考资料,关于轻量级的发布-订阅机制的设计和实现,这些机制真的能用。我会根据回答和评论,以及我自己的研究来更新这个问题。
我查阅了我的书籍和网络,寻找与Python和Delphi相关的发布/订阅的工作,但结果让我不太满意。这些设计依赖于函数签名、位图或槽来过滤消息或决定什么应该发送给谁,有的设计限制太多(绑定到一个消息服务器),有的则太开放(每个人都可以订阅任何东西)。
我不想自己写一个。我想找到一个已经设计得很好、经过讨论并且在实际中验证过的方案。
今天我在Delphi Pascal中实现了一个设计(因为Delphi是我首先需要的)。像这个API那样根据参数类型进行分发并不是一个新主意(这在设计模式中的Visitor
模式里有解释),我觉得我以前见过类似的东西(但我不记得在哪里了;Taligent?)。它的核心是,订阅、过滤和分发都是基于类型系统的。
unit JalSignals;
// A publish/subscribe mechanism.
// 1. Signal payloads are objects, and their class is their signal type.
// 2. Free is called on the payloads after they have been delivered.
// 3. Members subscribe by providing a callback method (of object).
// 4. Members may subscribe with the same method to different types of signals.
// 5. A member subscribes to a type, which means that all signals
// with payloads of that class or any of its subclasses will be delivered
// to the callback, with one important exception
// 6. A forum breaks the general class hierarchy into independent branches.
// A signal will not be delivered to members subscribed to classes that
// are not in the branch.
// 7. This is a GPL v3 design.
interface
uses
SysUtils;
type
TSignal = TObject;
TSignalType = TClass;
TSignalAction = (soGo, soStop);
TCallback = function(signal :TSignal) :TSignalAction of object;
procedure signal(payload: TSignal);
procedure subscribe( callback :TCallback; atype :TSignalType);
procedure unsubscribe(callback :TCallback; atype :TSignalType = nil); overload;
procedure unsubscribe(obj :TObject; atype :TSignalType = nil); overload;
procedure openForum( atype :TSignalType);
procedure closeForum(atype :TSignalType);
上面的“回调”就像Python中的绑定方法。
Delphi实现的完整源代码可以在这里找到:
这是Python中的实现。我改变了键的名称,因为signal和message已经被用得太多了。与Delphi实现不同的是,结果,包括异常,会被收集并以列表的形式返回给信号发送者。
"""
A publish/subscribe mechanism.
1. Signal payloads are objects, and their class is their signal type.
2. Free is called on the payloads after they have been delivered.
3. Members subscribe by providing a callback method (of object).
4. Members may subscribe with the same method to different types of signals.
5. A member subscribes to a type, which means that all signals
with payloads of that class or any of its subclasses will be delivered
to the callback, with one important exception:
6. A forum breaks the general class hierarchy into independent branches.
A signal will not be delivered to members subscribed to classes that
are not in the branch.
"""
__all__ = ['open_forum', 'close_forum', 'announce',
'subscribe', 'unsubscribe'
]
def _is_type(atype):
return issubclass(atype, object)
class Sub(object):
def __init__(self, callback, atype):
assert callable(callback)
assert issubclass(atype, object)
self.atype = atype
self.callback = callback
__forums = set()
__subscriptions = []
def open_forum(forum):
assert issubclass(forum, object)
__forums.add(forum)
def close_forum(forum):
__forums.remove(forum)
def subscribe(callback, atype):
__subscriptions.append(Sub(callback, atype))
def unsubscribe(callback, atype=None):
for i, sub in enumerate(__subscriptions):
if sub.callback is not callback:
continue
if atype is None or issubclass(sub.atype, atype):
del __subscriptions[i]
def _boundary(atype):
assert _is_type(atype)
lower = object
for f in __forums:
if (issubclass(atype, f)
and issubclass(f, lower)):
lower = f
return lower
def _receivers(news):
bound = _boundary(type(news))
for sub in __subscriptions:
if not isinstance(news, sub.atype):
continue
if not issubclass(sub.atype, bound):
continue
yield sub
def announce(news):
replies = []
for sub in _receivers(news):
try:
reply = sub.callback(news)
replies.append(reply)
except Exception as e:
replies.append(e)
return replies
if __name__ == '__main__':
i = 0
class A(object):
def __init__(self):
global i
self.msg = type(self).__name__ + str(i)
i += 1
class B(A): pass
class C(B): pass
assert _is_type(A)
assert _is_type(B)
assert _is_type(C)
assert issubclass(B, A)
assert issubclass(C, B)
def makeHandler(atype):
def handler(s):
assert isinstance(s, atype)
return 'handler' + atype.__name__ + ' got ' + s.msg
return handler
handleA = makeHandler(A)
handleB = makeHandler(B)
handleC = makeHandler(C)
def failer(s):
raise Exception, 'failed on' + s.msg
assert callable(handleA) and callable(handleB) and callable(handleC)
subscribe(handleA, A)
subscribe(handleB, B)
subscribe(handleC, C)
subscribe(failer, A)
assert _boundary(A) is object
assert _boundary(B) is object
assert _boundary(C) is object
print announce(A())
print announce(B())
print announce(C())
print
open_forum(B)
assert _boundary(A) is object
assert _boundary(B) is B
assert _boundary(C) is B
assert issubclass(B, B)
print announce(A())
print announce(B())
print announce(C())
print
close_forum(B)
print announce(A())
print announce(B())
print announce(C())
我搜索的原因有以下几点:
- 我正在维护几千行的Delphi代码。它们使用观察者模式来解耦MVC,但由于观察者和主题之间的依赖关系太明确,整体还是很耦合。
- 我在学习PyQt4,如果每个事件都要在Qt4Designer里点来点去,我会崩溃的。
- 在另一个个人数据应用中,我需要抽象事件的传递和处理,因为持久化和用户界面会因平台而异,并且必须完全独立。
参考资料
自己和其他人找到的资料应该放在这里
- PybubSub使用字符串作为主题,并使用方法签名(第一个信号定义了签名)。
- FinalBuilder的博客中有一篇文章提到,他们成功使用了一个系统,使用整数结构作为负载,作为消息,并使用整数掩码进行过滤。
- PyDispatcher的文档很少。
- D-Bus已经被Gnome和KDE等项目采用。还有一个Python绑定可用。
2 个回答
2
你也可以试试DDS。数据分发服务是一种完整的标准,用于通过发布/订阅的方式进行通信。
2
你可以看看企业集成模式这个网站,它对发布-订阅这种方式进行了非常详细的讲解,虽然它主要是讲不同程序之间如何传递消息。