Python观察者模式:示例,提示?

2024-05-13 20:22:52 发布

您现在位置:Python中文网/ 问答频道 /正文

有没有用Python实现GoF观察者的示例?我有一个位代码,它当前有一些调试代码放在key类中(如果设置了magic env,则当前会向stderr生成消息)。此外,该类还有一个接口,用于以增量方式返回结果,以及将结果存储(在内存中)以进行后期处理。(类本身是一个作业管理器,用于通过ssh在远程计算机上并发执行命令)。

当前类的用法如下:

job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
    for each in job.poll():
        incrementally_process(job.results[each])
        time.sleep(0.2) # or other more useful work
post_process(job.results)

alernative使用模型是:

job = SSHJobMan(hostlist, cmd)
job.wait()  # implicitly performs a start()
process(job.results)

对于当前的实用程序,这一切都可以正常工作。但它确实缺乏灵活性。例如,我目前支持一个简短的输出格式或进度条作为增量结果,我还支持 post_process()函数的简短、完整和“合并消息”输出。

但是,我希望支持多个结果/输出流(到终端的进度条、对日志文件的调试和警告、从成功的作业到一个文件/目录的输出、错误消息和从不成功的作业到另一个作业的其他结果等)。

这听起来像是需要观察者。。。让我的类的实例接受来自其他对象的注册,并在事件发生时用特定类型的事件回调它们。

我在看PyPubSub,因为我在这么相关的问题中看到了几个对它的引用。我不确定是否已经准备好将外部依赖项添加到我的实用程序中,但是我可以看到将它们的接口用作我的模型的价值,如果这将使其他人更容易使用的话。(该项目既是一个独立的命令行实用程序,也是一个用于编写其他脚本/实用程序的类)。

总之,我知道如何做我想做的。。。但是有很多方法可以做到这一点。我想得到一些建议,从长远来看,什么最有可能对代码的其他用户有用。

代码本身位于:classh


Tags: 代码实用程序cmd消息作业jobpostprocess
3条回答

However it does lack flexibility.

嗯。。。实际上,如果您想要异步API,这看起来是一个不错的设计。通常是这样。也许您只需要从stderr切换到Python的^{}模块,它有自己的发布/订阅模型,使用Logger.addHandler()等等。

如果你真的想支持观察员,我的建议是保持简单。你真的只需要几行代码。

class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.iteritems():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

您的作业类可以子类Observable。当发生感兴趣的事情时,呼叫self.fire(type="progress", percent=50)等。

还有一些方法。。。

示例:日志模块

也许您只需要从stderr切换到Python的^{}模块,该模块具有强大的发布/订阅模型。

很容易开始制作日志记录。

# producer
import logging

log = logging.getLogger("myjobs")  # that's all the setup you need

class MyJob(object):
    def run(self):
        log.info("starting job")
        n = 10
        for i in range(n):
            log.info("%.1f%% done" % (100.0 * i / n))
        log.info("work complete")

在消费者方面,还有更多的工作要做。不幸的是,配置记录器输出需要大约7行代码。;)

# consumer
import myjobs, sys, logging

if user_wants_log_output:
    ch = logging.StreamHandler(sys.stderr)
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    myjobs.log.addHandler(ch)
    myjobs.log.setLevel(logging.INFO)

myjobs.MyJob().run()

另一方面,日志包中有大量的内容。如果您需要将日志数据发送到一组旋转的文件、电子邮件地址和Windows事件日志,我们将为您提供帮助。

示例:最简单的可能观察者

但你根本不需要使用任何库。支持观察者的一个非常简单的方法是调用一个什么也不做的方法。

# producer
class MyJob(object):
    def on_progress(self, pct):
        """Called when progress is made. pct is the percent complete.
        By default this does nothing. The user may override this method
        or even just assign to it."""
        pass

    def run(self):
        n = 10
        for i in range(n):
            self.on_progress(100.0 * i / n)
        self.on_progress(100.0)

# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

有时候你可以说job.on_progress = progressBar.update,而不是写一个lambda,这很好。

这很简单。一个缺点是它不能自然地支持订阅同一事件的多个侦听器。

示例:类C事件

通过一些支持代码,您可以在Python中获得类似C#的事件。代码如下:

# glue code
class event(object):
    def __init__(self, func):
        self.__doc__ = func.__doc__
        self._key = ' ' + func.__name__
    def __get__(self, obj, cls):
        try:
            return obj.__dict__[self._key]
        except KeyError, exc:
            be = obj.__dict__[self._key] = boundevent()
            return be

class boundevent(object):
    def __init__(self):
        self._fns = []
    def __iadd__(self, fn):
        self._fns.append(fn)
        return self
    def __isub__(self, fn):
        self._fns.remove(fn)
        return self
    def __call__(self, *args, **kwargs):
        for f in self._fns[:]:
            f(*args, **kwargs)

生产者使用decorator声明事件:

# producer
class MyJob(object):
    @event
    def progress(pct):
        """Called when progress is made. pct is the percent complete."""

    def run(self):
        n = 10
        for i in range(n+1):
            self.progress(100.0 * i / n)

#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

它的工作方式与上面的“简单观察者”代码完全相同,但是您可以使用+=添加任意数量的侦听器。(与C不同,这里没有事件处理程序类型,订阅事件时不必new EventHandler(foo.bar),也不必在触发事件之前检查null。与C#一样,事件不会压制异常。)

如何选择

如果logging做了您需要的一切,那么就使用它。否则就做最简单的事情。需要注意的关键是,您不需要承担很大的外部依赖性。

我认为其他答案的人做得太过分了。用不到15行代码就可以轻松地在Python中实现事件。

简单地说,有两个类:EventObserver。任何想要监听事件的类,都需要继承Observer并设置为监听(observe)特定事件。当初始化并触发Event时,侦听该事件的所有观察者都将运行指定的回调函数。

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observables = {}
    def observe(self, event_name, callback):
        self._observables[event_name] = callback


class Event():
    def __init__(self, name, data, autofire = True):
        self.name = name
        self.data = data
        if autofire:
            self.fire()
    def fire(self):
        for observer in Observer._observers:
            if self.name in observer._observables:
                observer._observables[self.name](self.data)

示例

class Room(Observer):

    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # Observer's init needs to be called
    def someone_arrived(self, who):
        print(who + " has arrived!")

room = Room()
room.observe('someone arrived',  room.someone_arrived)

Event('someone arrived', 'Lenard')

输出:

Room is ready.
Lenard has arrived!

相关问题 更多 >