Python中具有依赖关系的惰性数据流(类似电子表格)属性

14 投票
3 回答
1565 浏览
提问于 2025-04-17 07:27

我的问题是这样的:我有一些Python类,这些类里面有一些属性是从其他属性计算得来的;一旦这些属性被计算出来,就应该把结果缓存起来,并且每次基础属性发生变化时,缓存的结果都要失效。

我可以手动去做这些事情,但如果属性的数量增加,这样维护起来会很麻烦。所以我希望能在我的对象里有类似于Makefile的规则,自动跟踪哪些东西需要重新计算。

我希望的语法和行为大概是这样的:

# this does dirty magic, like generating the reverse dependency graph,
# and preparing the setters that invalidate the cached values
@dataflow_class
class Test(object):

    def calc_a(self):
        return self.b + self.c

    def calc_c(self):
        return self.d * 2

    a = managed_property(calculate=calc_a, depends_on=('b', 'c'))
    b = managed_property(default=0)
    c = managed_property(calculate=calc_c, depends_on=('d',))
    d = managed_property(default=0)


t = Test()

print t.a
# a has not been initialized, so it calls calc_a
# gets b value
# c has not been initialized, so it calls calc_c
# c value is calculated and stored in t.__c
# a value is calculated and stored in t.__a

t.b = 1
# invalidates the calculated value stored in self.__a

print t.a
# a has been invalidated, so it calls calc_a
# gets b value
# gets c value, from t.__c
# a value is calculated and stored in t.__a

print t.a
# gets value from t.__a

t.d = 2
# invalidates the calculated values stored in t.__a and t.__c

那么,是否已经有类似的东西可以用,还是我应该自己去实现一个?如果是后者,欢迎提供建议 :-)

3 个回答

1
import collections

sentinel=object()

class ManagedProperty(object):
    '''
    If deptree = {'a':set('b','c')}, then ManagedProperties `b` and
    `c` will be reset whenever `a` is modified.
    '''
    def __init__(self,property_name,calculate=None,depends_on=tuple(),
                 default=sentinel):
        self.property_name=property_name
        self.private_name='_'+property_name 
        self.calculate=calculate
        self.depends_on=depends_on
        self.default=default
    def __get__(self,obj,objtype):
        if obj is None:
            # Allows getattr(cls,mprop) to return the ManagedProperty instance
            return self
        try:
            return getattr(obj,self.private_name)
        except AttributeError:
            result=(getattr(obj,self.calculate)()
                    if self.default is sentinel else self.default)
            setattr(obj,self.private_name,result)
            return result
    def __set__(self,obj,value):
        # obj._dependencies is defined by @register
        map(obj.__delattr__,getattr(obj,'_dependencies').get(self.property_name,tuple()))
        setattr(obj,self.private_name,value)        
    def __delete__(self,obj):
        if hasattr(obj,self.private_name):
            delattr(obj,self.private_name)

def register(*mproperties):
    def flatten_dependencies(name, deptree, all_deps=None):
        '''
        A deptree such as {'c': set(['a']), 'd': set(['c'])} means
        'a' depends on 'c' and 'c' depends on 'd'.

        Given such a deptree, flatten_dependencies('d', deptree) returns the set
        of all property_names that depend on 'd' (i.e. set(['a','c']) in the
        above case).
        '''
        if all_deps is None:
            all_deps = set()
        for dep in deptree.get(name,tuple()):
            all_deps.add(dep)
            flatten_dependencies(dep, deptree, all_deps)
        return all_deps

    def classdecorator(cls):
        deptree=collections.defaultdict(set)
        for mprop in mproperties:
            setattr(cls,mprop.property_name,mprop)
        # Find all ManagedProperties in dir(cls). Note that some of these may be
        # inherited from bases of cls; they may not be listed in mproperties.
        # Doing it this way allows ManagedProperties to be overridden by subclasses.
        for propname in dir(cls):
            mprop=getattr(cls,propname)
            if not isinstance(mprop,ManagedProperty):
                continue
            for underlying_prop in mprop.depends_on:
                deptree[underlying_prop].add(mprop.property_name)

        # Flatten the dependency tree so no recursion is necessary. If one were
        # to use recursion instead, then a naive algorithm would make duplicate
        # calls to __delete__. By flattening the tree, there are no duplicate
        # calls to __delete__.
        dependencies={key:flatten_dependencies(key,deptree)
                      for key in deptree.keys()}
        setattr(cls,'_dependencies',dependencies)
        return cls
    return classdecorator
if __name__ == "__main__":
    import unittest
    import sys
    def count(meth):
        def wrapper(self,*args):
            countname=meth.func_name+'_count'
            setattr(self,countname,getattr(self,countname,0)+1)
            return meth(self,*args)
        return wrapper

    class Test(unittest.TestCase):
        def setUp(self):
            @register(
                ManagedProperty('d',default=0),
                ManagedProperty('b',default=0),
                ManagedProperty('c',calculate='calc_c',depends_on=('d',)),
                ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
            class Foo(object):
                @count
                def calc_a(self):
                    return self.b + self.c
                @count
                def calc_c(self):
                    return self.d * 2
            @register(ManagedProperty('c',calculate='calc_c',depends_on=('b',)),
                      ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
            class Bar(Foo):
                @count
                def calc_c(self):
                    return self.b * 3
            self.Foo=Foo
            self.Bar=Bar
            self.foo=Foo()
            self.foo2=Foo()            
            self.bar=Bar()

        def test_two_instances(self):
            self.foo.b = 1
            self.assertEqual(self.foo.a,1)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)

            self.assertEqual(self.foo2.a,0)
            self.assertEqual(self.foo2.b,0)
            self.assertEqual(self.foo2.c,0)
            self.assertEqual(self.foo2.d,0)


        def test_initialization(self):
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)            
            self.assertEqual(self.foo.b,0)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)
            self.assertEqual(self.bar.a,0)
            self.assertEqual(self.bar.b,0)
            self.assertEqual(self.bar.c,0)
            self.assertEqual(self.bar.d,0)

        def test_dependence(self):
            self.assertEqual(self.Foo._dependencies,
                             {'c': set(['a']), 'b': set(['a']), 'd': set(['a', 'c'])})

            self.assertEqual(self.Bar._dependencies,
                             {'c': set(['a']), 'b': set(['a', 'c'])})

        def test_setting_property_updates_dependent(self):
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)

            self.foo.b = 1
            # invalidates the calculated value stored in foo.a
            self.assertEqual(self.foo.a,1)
            self.assertEqual(self.foo.calc_a_count,2)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)

            self.foo.d = 2
            # invalidates the calculated values stored in foo.a and foo.c
            self.assertEqual(self.foo.a,5)
            self.assertEqual(self.foo.calc_a_count,3)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,4)
            self.assertEqual(self.foo.d,2)

            self.assertEqual(self.bar.a,0)
            self.assertEqual(self.bar.calc_a_count,1)
            self.assertEqual(self.bar.b,0)
            self.assertEqual(self.bar.c,0)
            self.assertEqual(self.bar.calc_c_count,1)
            self.assertEqual(self.bar.d,0)

            self.bar.b = 2
            self.assertEqual(self.bar.a,8)
            self.assertEqual(self.bar.calc_a_count,2)
            self.assertEqual(self.bar.b,2)
            self.assertEqual(self.bar.c,6)
            self.assertEqual(self.bar.calc_c_count,2)
            self.assertEqual(self.bar.d,0)

            self.bar.d = 2
            self.assertEqual(self.bar.a,8)
            self.assertEqual(self.bar.calc_a_count,2)            
            self.assertEqual(self.bar.b,2)
            self.assertEqual(self.bar.c,6)
            self.assertEqual(self.bar.calc_c_count,2)
            self.assertEqual(self.bar.d,2)

    sys.argv.insert(1,'--verbose')
    unittest.main(argv=sys.argv)

这是我用来检查它运行情况的单元测试。

1

我想要一些类似于Makefile规则的东西。

那就用一个吧!你可以考虑这个模型:

  • 一个规则 = 一个Python文件
  • 一个结果 = 一个*.data文件
  • 管道可以通过makefile或者其他依赖分析工具(比如cmake、scons)来实现

我们公司的硬件测试团队使用这样的框架进行深入的探索性测试:

  • 你可以轻松地集成其他语言和工具
  • 你会得到一个稳定且经过验证的解决方案
  • 计算可以分布在多个CPU/计算机上进行
  • 你可以追踪值规则之间的依赖关系
  • 调试中间值很简单

这种方法的一个(大)缺点是你必须放弃Python的import关键字,因为它会创建一个隐式(且未被追踪的)依赖关系(不过有一些解决方法可以绕过这个问题)。

8

这里,这个方法应该能解决你的问题。
描述符机制(也就是语言实现“属性”的方式)完全可以满足你的需求。

如果下面的代码在某些特殊情况下不管用,随时告诉我。

class DependentProperty(object):
    def __init__(self, calculate=None, default=None, depends_on=()):
        # "name" and "dependence_tree" properties are attributes
        # set up by the metaclass of the owner class
        if calculate:
            self.calculate = calculate
        else:
            self.default = default
        self.depends_on = set(depends_on)

    def __get__(self, instance, owner):
        if hasattr(self, "default"):
            return self.default
        if not hasattr(instance, "_" + self.name):
            setattr(instance, "_" + self.name,
                self.calculate(instance, getattr(instance, "_" + self.name + "_last_value")))
        return getattr(instance, "_" + self.name)

    def __set__(self, instance, value):
        setattr(instance, "_" + self.name + "_last_value", value)
        setattr(instance, "_" + self.name, self.calculate(instance, value))
        for attr in self.dependence_tree[self.name]:
            delattr(instance, attr)

    def __delete__(self, instance):
        try:
            delattr(instance, "_" + self.name)
        except AttributeError:
            pass


def assemble_tree(name,  dict_, all_deps = None):
    if all_deps is None:
        all_deps = set()
    for dependance in dict_[name].depends_on:
        all_deps.add(dependance)
        assemble_tree(dependance, dict_, all_deps)
    return all_deps

def invert_tree(tree):
    new_tree = {}
    for key, val in tree.items():
        for dependence in val:
            if dependence not in new_tree:
                new_tree[dependence] = set()
            new_tree[dependence].add(key)
    return new_tree

class DependenceMeta(type):
    def __new__(cls, name, bases, dict_):
        dependence_tree = {}
        properties = []
        for key, val in dict_.items():
            if not isinstance(val, DependentProperty):
                continue
            val.name = key
            val.dependence_tree = dependence_tree
            dependence_tree[key] = set()
            properties.append(val)
        inverted_tree = {}
        for property in properties:
            inverted_tree[property.name] = assemble_tree(property.name, dict_)
        dependence_tree.update(invert_tree(inverted_tree))
        return type.__new__(cls, name, bases, dict_)


if __name__ == "__main__":
    # Example and visual test:

    class Bla:
        __metaclass__ = DependenceMeta

        def calc_b(self, x):
            print "Calculating b"
            return x + self.a

        def calc_c(self, x):
            print "Calculating c"
            return x + self.b

        a = DependentProperty(default=10)    
        b = DependentProperty(depends_on=("a",), calculate=calc_b)
        c = DependentProperty(depends_on=("b",), calculate=calc_c)




    bla = Bla()
    bla.b = 5
    bla.c = 10

    print bla.a, bla.b, bla.c
    bla.b = 10
    print bla.b
    print bla.c

撰写回答