Python中具有依赖关系的惰性数据流(类似电子表格)属性
我的问题是这样的:我有一些Python类,这些类里面有一些属性是从其他属性计算得来的;一旦这些属性被计算出来,就应该把结果缓存起来,并且每次基础属性发生变化时,缓存的结果都要失效。
我可以手动去做这些事情,但如果属性的数量增加,这样维护起来会很麻烦。所以我希望能在我的对象里有类似于Makefile的规则,自动跟踪哪些东西需要重新计算。
我希望的语法和行为大概是这样的:
# this does dirty magic, like generating the reverse dependency graph,
# and preparing the setters that invalidate the cached values
@dataflow_class
class Test(object):
def calc_a(self):
return self.b + self.c
def calc_c(self):
return self.d * 2
a = managed_property(calculate=calc_a, depends_on=('b', 'c'))
b = managed_property(default=0)
c = managed_property(calculate=calc_c, depends_on=('d',))
d = managed_property(default=0)
t = Test()
print t.a
# a has not been initialized, so it calls calc_a
# gets b value
# c has not been initialized, so it calls calc_c
# c value is calculated and stored in t.__c
# a value is calculated and stored in t.__a
t.b = 1
# invalidates the calculated value stored in self.__a
print t.a
# a has been invalidated, so it calls calc_a
# gets b value
# gets c value, from t.__c
# a value is calculated and stored in t.__a
print t.a
# gets value from t.__a
t.d = 2
# invalidates the calculated values stored in t.__a and t.__c
那么,是否已经有类似的东西可以用,还是我应该自己去实现一个?如果是后者,欢迎提供建议 :-)
3 个回答
1
import collections
sentinel=object()
class ManagedProperty(object):
'''
If deptree = {'a':set('b','c')}, then ManagedProperties `b` and
`c` will be reset whenever `a` is modified.
'''
def __init__(self,property_name,calculate=None,depends_on=tuple(),
default=sentinel):
self.property_name=property_name
self.private_name='_'+property_name
self.calculate=calculate
self.depends_on=depends_on
self.default=default
def __get__(self,obj,objtype):
if obj is None:
# Allows getattr(cls,mprop) to return the ManagedProperty instance
return self
try:
return getattr(obj,self.private_name)
except AttributeError:
result=(getattr(obj,self.calculate)()
if self.default is sentinel else self.default)
setattr(obj,self.private_name,result)
return result
def __set__(self,obj,value):
# obj._dependencies is defined by @register
map(obj.__delattr__,getattr(obj,'_dependencies').get(self.property_name,tuple()))
setattr(obj,self.private_name,value)
def __delete__(self,obj):
if hasattr(obj,self.private_name):
delattr(obj,self.private_name)
def register(*mproperties):
def flatten_dependencies(name, deptree, all_deps=None):
'''
A deptree such as {'c': set(['a']), 'd': set(['c'])} means
'a' depends on 'c' and 'c' depends on 'd'.
Given such a deptree, flatten_dependencies('d', deptree) returns the set
of all property_names that depend on 'd' (i.e. set(['a','c']) in the
above case).
'''
if all_deps is None:
all_deps = set()
for dep in deptree.get(name,tuple()):
all_deps.add(dep)
flatten_dependencies(dep, deptree, all_deps)
return all_deps
def classdecorator(cls):
deptree=collections.defaultdict(set)
for mprop in mproperties:
setattr(cls,mprop.property_name,mprop)
# Find all ManagedProperties in dir(cls). Note that some of these may be
# inherited from bases of cls; they may not be listed in mproperties.
# Doing it this way allows ManagedProperties to be overridden by subclasses.
for propname in dir(cls):
mprop=getattr(cls,propname)
if not isinstance(mprop,ManagedProperty):
continue
for underlying_prop in mprop.depends_on:
deptree[underlying_prop].add(mprop.property_name)
# Flatten the dependency tree so no recursion is necessary. If one were
# to use recursion instead, then a naive algorithm would make duplicate
# calls to __delete__. By flattening the tree, there are no duplicate
# calls to __delete__.
dependencies={key:flatten_dependencies(key,deptree)
for key in deptree.keys()}
setattr(cls,'_dependencies',dependencies)
return cls
return classdecorator
if __name__ == "__main__":
import unittest
import sys
def count(meth):
def wrapper(self,*args):
countname=meth.func_name+'_count'
setattr(self,countname,getattr(self,countname,0)+1)
return meth(self,*args)
return wrapper
class Test(unittest.TestCase):
def setUp(self):
@register(
ManagedProperty('d',default=0),
ManagedProperty('b',default=0),
ManagedProperty('c',calculate='calc_c',depends_on=('d',)),
ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
class Foo(object):
@count
def calc_a(self):
return self.b + self.c
@count
def calc_c(self):
return self.d * 2
@register(ManagedProperty('c',calculate='calc_c',depends_on=('b',)),
ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
class Bar(Foo):
@count
def calc_c(self):
return self.b * 3
self.Foo=Foo
self.Bar=Bar
self.foo=Foo()
self.foo2=Foo()
self.bar=Bar()
def test_two_instances(self):
self.foo.b = 1
self.assertEqual(self.foo.a,1)
self.assertEqual(self.foo.b,1)
self.assertEqual(self.foo.c,0)
self.assertEqual(self.foo.d,0)
self.assertEqual(self.foo2.a,0)
self.assertEqual(self.foo2.b,0)
self.assertEqual(self.foo2.c,0)
self.assertEqual(self.foo2.d,0)
def test_initialization(self):
self.assertEqual(self.foo.a,0)
self.assertEqual(self.foo.calc_a_count,1)
self.assertEqual(self.foo.a,0)
self.assertEqual(self.foo.calc_a_count,1)
self.assertEqual(self.foo.b,0)
self.assertEqual(self.foo.c,0)
self.assertEqual(self.foo.d,0)
self.assertEqual(self.bar.a,0)
self.assertEqual(self.bar.b,0)
self.assertEqual(self.bar.c,0)
self.assertEqual(self.bar.d,0)
def test_dependence(self):
self.assertEqual(self.Foo._dependencies,
{'c': set(['a']), 'b': set(['a']), 'd': set(['a', 'c'])})
self.assertEqual(self.Bar._dependencies,
{'c': set(['a']), 'b': set(['a', 'c'])})
def test_setting_property_updates_dependent(self):
self.assertEqual(self.foo.a,0)
self.assertEqual(self.foo.calc_a_count,1)
self.foo.b = 1
# invalidates the calculated value stored in foo.a
self.assertEqual(self.foo.a,1)
self.assertEqual(self.foo.calc_a_count,2)
self.assertEqual(self.foo.b,1)
self.assertEqual(self.foo.c,0)
self.assertEqual(self.foo.d,0)
self.foo.d = 2
# invalidates the calculated values stored in foo.a and foo.c
self.assertEqual(self.foo.a,5)
self.assertEqual(self.foo.calc_a_count,3)
self.assertEqual(self.foo.b,1)
self.assertEqual(self.foo.c,4)
self.assertEqual(self.foo.d,2)
self.assertEqual(self.bar.a,0)
self.assertEqual(self.bar.calc_a_count,1)
self.assertEqual(self.bar.b,0)
self.assertEqual(self.bar.c,0)
self.assertEqual(self.bar.calc_c_count,1)
self.assertEqual(self.bar.d,0)
self.bar.b = 2
self.assertEqual(self.bar.a,8)
self.assertEqual(self.bar.calc_a_count,2)
self.assertEqual(self.bar.b,2)
self.assertEqual(self.bar.c,6)
self.assertEqual(self.bar.calc_c_count,2)
self.assertEqual(self.bar.d,0)
self.bar.d = 2
self.assertEqual(self.bar.a,8)
self.assertEqual(self.bar.calc_a_count,2)
self.assertEqual(self.bar.b,2)
self.assertEqual(self.bar.c,6)
self.assertEqual(self.bar.calc_c_count,2)
self.assertEqual(self.bar.d,2)
sys.argv.insert(1,'--verbose')
unittest.main(argv=sys.argv)
这是我用来检查它运行情况的单元测试。
1
我想要一些类似于Makefile规则的东西。
那就用一个吧!你可以考虑这个模型:
- 一个规则 = 一个Python文件
- 一个结果 = 一个*.data文件
- 管道可以通过makefile或者其他依赖分析工具(比如cmake、scons)来实现
我们公司的硬件测试团队使用这样的框架进行深入的探索性测试:
- 你可以轻松地集成其他语言和工具
- 你会得到一个稳定且经过验证的解决方案
- 计算可以分布在多个CPU/计算机上进行
- 你可以追踪值和规则之间的依赖关系
- 调试中间值很简单
这种方法的一个(大)缺点是你必须放弃Python的import
关键字,因为它会创建一个隐式(且未被追踪的)依赖关系(不过有一些解决方法可以绕过这个问题)。
8
这里,这个方法应该能解决你的问题。
描述符机制(也就是语言实现“属性”的方式)完全可以满足你的需求。
如果下面的代码在某些特殊情况下不管用,随时告诉我。
class DependentProperty(object):
def __init__(self, calculate=None, default=None, depends_on=()):
# "name" and "dependence_tree" properties are attributes
# set up by the metaclass of the owner class
if calculate:
self.calculate = calculate
else:
self.default = default
self.depends_on = set(depends_on)
def __get__(self, instance, owner):
if hasattr(self, "default"):
return self.default
if not hasattr(instance, "_" + self.name):
setattr(instance, "_" + self.name,
self.calculate(instance, getattr(instance, "_" + self.name + "_last_value")))
return getattr(instance, "_" + self.name)
def __set__(self, instance, value):
setattr(instance, "_" + self.name + "_last_value", value)
setattr(instance, "_" + self.name, self.calculate(instance, value))
for attr in self.dependence_tree[self.name]:
delattr(instance, attr)
def __delete__(self, instance):
try:
delattr(instance, "_" + self.name)
except AttributeError:
pass
def assemble_tree(name, dict_, all_deps = None):
if all_deps is None:
all_deps = set()
for dependance in dict_[name].depends_on:
all_deps.add(dependance)
assemble_tree(dependance, dict_, all_deps)
return all_deps
def invert_tree(tree):
new_tree = {}
for key, val in tree.items():
for dependence in val:
if dependence not in new_tree:
new_tree[dependence] = set()
new_tree[dependence].add(key)
return new_tree
class DependenceMeta(type):
def __new__(cls, name, bases, dict_):
dependence_tree = {}
properties = []
for key, val in dict_.items():
if not isinstance(val, DependentProperty):
continue
val.name = key
val.dependence_tree = dependence_tree
dependence_tree[key] = set()
properties.append(val)
inverted_tree = {}
for property in properties:
inverted_tree[property.name] = assemble_tree(property.name, dict_)
dependence_tree.update(invert_tree(inverted_tree))
return type.__new__(cls, name, bases, dict_)
if __name__ == "__main__":
# Example and visual test:
class Bla:
__metaclass__ = DependenceMeta
def calc_b(self, x):
print "Calculating b"
return x + self.a
def calc_c(self, x):
print "Calculating c"
return x + self.b
a = DependentProperty(default=10)
b = DependentProperty(depends_on=("a",), calculate=calc_b)
c = DependentProperty(depends_on=("b",), calculate=calc_c)
bla = Bla()
bla.b = 5
bla.c = 10
print bla.a, bla.b, bla.c
bla.b = 10
print bla.b
print bla.c