SQLAlchemy经典映射模型与分片Postgres数据库
情况说明:
我有12张表(每张表代表一个月的数据),这些表分布在6个数据库中。我需要从这些数据库中获取某个月的数据样本。
我为什么选择经典映射模型而不是声明式模型:
我只需要访问12种表中的一种,因为每次运行这段代码时,我只会收集一个特定月份的数据样本。经典映射模型让我可以在运行时动态定义我想要映射的表名,而不是像声明式模型那样为6个数据库中的所有12张表创建映射。
问题所在:
我正在尝试按照这里提供的实体名称示例,将我的月份数据类映射到6个不同数据库中对应月份的每一张表。
但是我遇到了一个UnmappedClassError
错误,提示我的基类(所有新类都从这个基类派生)“没有被映射”。
所以当我尝试初始化我的一个新映射表type: <class '__main__.db1month1'>
时,它报告UnmappedClassError: Class 'audit.db.orm.mappedclasses.MonthData' is not mapped
。
有什么想法吗?
如果需要,我可以把我的代码粘贴在这里,但我担心代码有点长。我使用的是在实体名称示例中定义的map_class_to_some_table
方法来进行映射,并没有做任何修改。
2 个回答
0
我遇到了同样的情况。下面是我的方法:
class_registry = {}
DbBase = declarative_base(bind=engine, class_registry=class_registry)
def get_model(modelname, tablename, metadata=DbBase.metadata):
if modelname not in class_registry:
model = type(modelname, (DbBase,), dict(
__table__ = Table(tablename, metadata, autoload=True)
))
else:
model = class_registry[modelname]
return model
这个方法效果不错。不过,@Katie 的方法更好。
2
最后我决定放弃之前的做法,改为参考这个 ShardedSession 的例子。
我最终的类大概是这样的:
class ShardSessionManager(object):
def __init__(self, month):
self.month = month
#Step1: database engines
self.engines = {}
for name, db in shard_dbs.iteritems():
self.engines[name] = create_engine('postgresql+psycopg2://', creator=db.get_connection, client_encoding='utf8')
#Step2: create session function - bind shard ids to databases within a ShardedSession
self.create_session = sessionmaker(class_=ShardedSession)
self.create_session.configure(shards=self.engines,
shard_chooser=self.shard_chooser,
id_chooser=self.id_chooser,
query_chooser=self.query_chooser)
#Step3: table setup
self._make_tables(self.month)
#Step4: map classes
self._map_tables()
@staticmethod
def shard_chooser(mapper, instance, clause=None):
if isinstance(instance, DataTable):
return id_chooser(instance.brand_id)
@staticmethod
def id_chooser(data_id):
...
@staticmethod
def query_chooser(query):
...
def _make_tables(self, month):
self.meta = MetaData()
self.data_table = DataTable(month, self.meta).table
... other tables ...
def _map_tables(self):
try:
mapper(DataTable, self.data_table,
properties={ ... })
...
def get_random_data(self, parent_id):
session = self.create_session()
return session.query(DataTable).filter(...