SQLAlchemy版本控制关心类的导入顺序
我在这里跟着一个指南:
http://www.sqlalchemy.org/docs/orm/examples.html?highlight=versioning#versioned-objects
但是遇到了一个问题。我定义了我的关系,像这样:
generic_ticker = relation('MyClass', backref=backref("stuffs"))
用字符串来定义,这样就不需要担心我的模型模块的导入顺序。通常情况下,这一切都能正常工作,但当我使用版本控制的元数据时,我遇到了以下错误:
sqlalchemy.exc.InvalidRequestError: 在初始化映射器 Mapper|MyClass|stuffs 时,表达式 'Trader' 找不到名称(“name 'MyClass' is not defined”)。如果这是一个类名,请考虑在两个相关类都定义后再将这个 relationship() 添加到类中。
我追踪到错误的原因是:
File "/home/nick/workspace/gm3/gm3/lib/history_meta.py", line 90, in __init__
mapper = class_mapper(cls)
File "/home/nick/venv/tg2env/lib/python2.6/site-packages/sqlalchemy/orm/util.py", line 622, in class_mapper
mapper = mapper.compile()
class VersionedMeta(DeclarativeMeta):
def __init__(cls, classname, bases, dict_):
DeclarativeMeta.__init__(cls, classname, bases, dict_)
try:
mapper = class_mapper(cls)
_history_mapper(mapper)
except UnmappedClassError:
pass
我通过把 try: except 的代码放进一个 lambda 函数里,并在所有导入完成后再运行它们来解决了这个问题。这虽然能用,但感觉有点不太好,有没有更好的解决办法?
谢谢!
更新
实际上,这个问题并不是关于导入顺序的。版本控制的示例设计成映射器需要在每个版本类的构造函数中进行编译。当相关类尚未定义时,编译就会失败。在循环引用的情况下,改变映射类的定义顺序也无法解决这个问题。
更新 2
正如上面的更新所说(我之前不知道可以编辑别人的帖子 :)),这很可能是由于循环引用导致的。在这种情况下,也许有人会觉得我的解决方法有用(我在使用 turbogears 时用到的)(替换 VersionedMeta,并在 history_meta 中添加 create_mappers 全局变量)
create_mappers = []
class VersionedMeta(DeclarativeMeta):
def __init__(cls, classname, bases, dict_):
DeclarativeMeta.__init__(cls, classname, bases, dict_)
#I added this code in as it was crashing otherwise
def make_mapper():
try:
mapper = class_mapper(cls)
_history_mapper(mapper)
except UnmappedClassError:
pass
create_mappers.append(lambda: make_mapper())
然后你可以在你的模型的 __init__.py 中做类似以下的事情:
# Import your model modules here.
from myproj.lib.history_meta import create_mappers
from myproj.model.misc import *
from myproj.model.actor import *
from myproj.model.stuff1 import *
from myproj.model.instrument import *
from myproj.model.stuff import *
#setup the history
[func() for func in create_mappers]
这样就能在所有类都定义后再创建映射器。
更新 3
稍微不相关,但在某些情况下我遇到了重复主键的错误(一次提交对同一个对象的两个更改)。我的解决办法是添加一个新的主键,且这个主键是自动递增的。当然,在 MySQL 中你不能有多个主键,所以我不得不取消已有的用于创建历史表的主键。看看我的整体代码(包括 hist_id 和去掉外键约束):
"""Stolen from the offical sqlalchemy recpies
"""
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, Integer
from sqlalchemy.orm.interfaces import SessionExtension
from sqlalchemy.orm.properties import RelationshipProperty
from sqlalchemy.types import DateTime
import datetime
from sqlalchemy.orm.session import Session
def col_references_table(col, table):
for fk in col.foreign_keys:
if fk.references(table):
return True
return False
def _history_mapper(local_mapper):
cls = local_mapper.class_
# set the "active_history" flag
# on on column-mapped attributes so that the old version
# of the info is always loaded (currently sets it on all attributes)
for prop in local_mapper.iterate_properties:
getattr(local_mapper.class_, prop.key).impl.active_history = True
super_mapper = local_mapper.inherits
super_history_mapper = getattr(cls, '__history_mapper__', None)
polymorphic_on = None
super_fks = []
if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
cols = []
for column in local_mapper.local_table.c:
if column.name == 'version':
continue
col = column.copy()
col.unique = False
#don't auto increment stuff from the normal db
if col.autoincrement:
col.autoincrement = False
#sqllite falls over with auto incrementing keys if we have a composite key
if col.primary_key:
col.primary_key = False
if super_mapper and col_references_table(column, super_mapper.local_table):
super_fks.append((col.key, list(super_history_mapper.base_mapper.local_table.primary_key)[0]))
cols.append(col)
if column is local_mapper.polymorphic_on:
polymorphic_on = col
#if super_mapper:
# super_fks.append(('version', super_history_mapper.base_mapper.local_table.c.version))
cols.append(Column('hist_id', Integer, primary_key=True, autoincrement=True))
cols.append(Column('version', Integer))
cols.append(Column('changed', DateTime, default=datetime.datetime.now))
if super_fks:
cols.append(ForeignKeyConstraint(*zip(*super_fks)))
table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
*cols, mysql_engine='InnoDB')
else:
# single table inheritance. take any additional columns that may have
# been added and add them to the history table.
for column in local_mapper.local_table.c:
if column.key not in super_history_mapper.local_table.c:
col = column.copy()
super_history_mapper.local_table.append_column(col)
table = None
if super_history_mapper:
bases = (super_history_mapper.class_,)
else:
bases = local_mapper.base_mapper.class_.__bases__
versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})
m = mapper(
versioned_cls,
table,
inherits=super_history_mapper,
polymorphic_on=polymorphic_on,
polymorphic_identity=local_mapper.polymorphic_identity
)
cls.__history_mapper__ = m
if not super_history_mapper:
cls.version = Column('version', Integer, default=1, nullable=False)
create_mappers = []
class VersionedMeta(DeclarativeMeta):
def __init__(cls, classname, bases, dict_):
DeclarativeMeta.__init__(cls, classname, bases, dict_)
#I added this code in as it was crashing otherwise
def make_mapper():
try:
mapper = class_mapper(cls)
_history_mapper(mapper)
except UnmappedClassError:
pass
create_mappers.append(lambda: make_mapper())
def versioned_objects(iter):
for obj in iter:
if hasattr(obj, '__history_mapper__'):
yield obj
def create_version(obj, session, deleted = False):
obj_mapper = object_mapper(obj)
history_mapper = obj.__history_mapper__
history_cls = history_mapper.class_
obj_state = attributes.instance_state(obj)
attr = {}
obj_changed = False
for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
if hm.single:
continue
for hist_col in hm.local_table.c:
if hist_col.key == 'version' or hist_col.key == 'changed' or hist_col.key == 'hist_id':
continue
obj_col = om.local_table.c[hist_col.key]
# get the value of the
# attribute based on the MapperProperty related to the
# mapped column. this will allow usage of MapperProperties
# that have a different keyname than that of the mapped column.
try:
prop = obj_mapper.get_property_by_column(obj_col)
except UnmappedColumnError:
# in the case of single table inheritance, there may be
# columns on the mapped table intended for the subclass only.
# the "unmapped" status of the subclass column on the
# base class is a feature of the declarative module as of sqla 0.5.2.
continue
# expired object attributes and also deferred cols might not be in the
# dict. force it to load no matter what by using getattr().
if prop.key not in obj_state.dict:
getattr(obj, prop.key)
a, u, d = attributes.get_history(obj, prop.key)
if d:
attr[hist_col.key] = d[0]
obj_changed = True
elif u:
attr[hist_col.key] = u[0]
else:
# if the attribute had no value.
attr[hist_col.key] = a[0]
obj_changed = True
if not obj_changed:
# not changed, but we have relationships. OK
# check those too
for prop in obj_mapper.iterate_properties:
if isinstance(prop, RelationshipProperty) and \
attributes.get_history(obj, prop.key).has_changes():
obj_changed = True
break
if not obj_changed and not deleted:
return
attr['version'] = obj.version
hist = history_cls()
for key, value in attr.iteritems():
setattr(hist, key, value)
obj.version += 1
session.add(hist)
class VersionedListener(SessionExtension):
def before_flush(self, session, flush_context, instances):
for obj in versioned_objects(session.dirty):
create_version(obj, session)
for obj in versioned_objects(session.deleted):
create_version(obj, session, deleted = True)
1 个回答
我通过把try和except的代码放在一个lambda表达式里,然后在所有导入完成后再运行它们,解决了这个问题。
太好了!