SqlAlchemy 向类添加新字段并在表中创建相应列
我想在一个已经映射好的类里添加一个字段,那我该怎么让SQL表自动更新呢?SQLAlchemy有没有什么方法可以在类里添加字段后,自动更新数据库,增加一个新列?
6 个回答
4
Alembic 是一个最新的工具,可以帮助我们在数据库中进行数据迁移。
想了解更多关于数据库迁移的信息,可以查看sqlalchemy 的文档。
14
有时候,使用迁移工具太麻烦了——你只是想在运行修改后的代码时,自动添加一个列。这里有一个函数可以做到这一点。
注意事项:这个方法会直接操作SQLAlchemy的内部结构,每次SQLAlchemy进行重大更新时,可能需要做一些小调整。(可能还有更好的方法来实现这个功能——我不是SQLAlchemy的专家)。另外,它也不处理约束条件。
import logging
import re
import sqlalchemy
from sqlalchemy import MetaData, Table, exceptions
import sqlalchemy.engine.ddl
_new_sa_ddl = sqlalchemy.__version__.startswith('0.7')
def create_and_upgrade(engine, metadata):
"""For each table in metadata, if it is not in the database then create it.
If it is in the database then add any missing columns and warn about any columns
whose spec has changed"""
db_metadata = MetaData()
db_metadata.bind = engine
for model_table in metadata.sorted_tables:
try:
db_table = Table(model_table.name, db_metadata, autoload=True)
except exceptions.NoSuchTableError:
logging.info('Creating table %s' % model_table.name)
model_table.create(bind=engine)
else:
if _new_sa_ddl:
ddl_c = engine.dialect.ddl_compiler(engine.dialect, None)
else:
# 0.6
ddl_c = engine.dialect.ddl_compiler(engine.dialect, db_table)
# else:
# 0.5
# ddl_c = engine.dialect.schemagenerator(engine.dialect, engine.contextual_connect())
logging.debug('Table %s already exists. Checking for missing columns' % model_table.name)
model_columns = _column_names(model_table)
db_columns = _column_names(db_table)
to_create = model_columns - db_columns
to_remove = db_columns - model_columns
to_check = db_columns.intersection(model_columns)
for c in to_create:
model_column = getattr(model_table.c, c)
logging.info('Adding column %s.%s' % (model_table.name, model_column.name))
assert not model_column.constraints, \
'Arrrgh! I cannot automatically add columns with constraints to the database'\
'Please consider fixing me if you care!'
model_col_spec = ddl_c.get_column_specification(model_column)
sql = 'ALTER TABLE %s ADD %s' % (model_table.name, model_col_spec)
engine.execute(sql)
# It's difficult to reliably determine if the model has changed
# a column definition. E.g. the default precision of columns
# is None, which means the database decides. Therefore when I look at the model
# it may give the SQL for the column as INTEGER but when I look at the database
# I have a definite precision, therefore the returned type is INTEGER(11)
for c in to_check:
model_column = model_table.c[c]
db_column = db_table.c[c]
x = model_column == db_column
logging.debug('Checking column %s.%s' % (model_table.name, model_column.name))
model_col_spec = ddl_c.get_column_specification(model_column)
db_col_spec = ddl_c.get_column_specification(db_column)
model_col_spec = re.sub('[(][\d ,]+[)]', '', model_col_spec)
db_col_spec = re.sub('[(][\d ,]+[)]', '', db_col_spec)
db_col_spec = db_col_spec.replace('DECIMAL', 'NUMERIC')
db_col_spec = db_col_spec.replace('TINYINT', 'BOOL')
if model_col_spec != db_col_spec:
logging.warning('Column %s.%s has specification %r in the model but %r in the database' %
(model_table.name, model_column.name, model_col_spec, db_col_spec))
if model_column.constraints or db_column.constraints:
# TODO, check constraints
logging.debug('Column constraints not checked. I am too dumb')
for c in to_remove:
model_column = getattr(db_table.c, c)
logging.warning('Column %s.%s in the database is not in the model' % (model_table.name, model_column.name))
def _column_names(table):
# Autoloaded columns return unicode column names - make sure we treat all are equal
return set((unicode(i.name) for i in table.c))
7
SQLAlchemy本身不支持自动更新数据库结构,但有一个第三方工具叫做SQLAlchemy Migrate,可以帮助你自动进行数据库迁移。你可以查看“数据库结构版本控制工作流程”章节,了解它是如何工作的。