如何在Alembic升级脚本中执行插入和更新?
我需要在进行Alembic升级时修改一些数据。
我现在有一个名为'players'的表,这是我的第一次版本:
def upgrade():
op.create_table('player',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.Unicode(length=200), nullable=False),
sa.Column('position', sa.Unicode(length=200), nullable=True),
sa.Column('team', sa.Unicode(length=100), nullable=True)
sa.PrimaryKeyConstraint('id')
)
我想引入一个' teams'的表。我已经创建了第二个版本:
def upgrade():
op.create_table('teams',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=80), nullable=False)
)
op.add_column('players', sa.Column('team_id', sa.Integer(), nullable=False))
我希望第二次迁移也能添加以下数据:
填充teams表:
INSERT INTO teams (name) SELECT DISTINCT team FROM players;
根据players的team名称更新players.team_id:
UPDATE players AS p JOIN teams AS t SET p.team_id = t.id WHERE p.team = t.name;
我该如何在升级脚本中执行插入和更新操作呢?
3 个回答
我建议使用SQLAlchemy的核心语句,配合临时表,具体可以参考官方文档。这样做的好处是可以使用通用的SQL语句,同时也能用Python的方式来写,而且它是自包含的。SQLAlchemy Core在迁移脚本中是两全其美的选择。
下面是这个概念的一个例子:
from sqlalchemy.sql import table, column
from sqlalchemy import String
from alembic import op
account = table('account',
column('name', String)
)
op.execute(
account.update().\\
where(account.c.name==op.inline_literal('account 1')).\\
values({'name':op.inline_literal('account 2')})
)
# If insert is required
from sqlalchemy.sql import insert
from sqlalchemy import orm
bind = op.get_bind()
session = orm.Session(bind=bind)
data = {
"name": "John",
}
ret = session.execute(insert(account).values(data))
# for use in other insert calls
account_id = ret.lastrowid
你也可以直接使用SQL语句,具体可以参考这个链接:Alembic 操作参考,下面是一个例子:
from alembic import op
# revision identifiers, used by Alembic.
revision = '1ce7873ac4ced2'
down_revision = '1cea0ac4ced2'
branch_labels = None
depends_on = None
def upgrade():
# ### commands made by andrew ###
op.execute('UPDATE STOCK SET IN_STOCK = -1 WHERE IN_STOCK IS NULL')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
你问的其实是一个 数据迁移 的问题,而不是在 Alembic 文档中常见的 模式迁移。
这个回答假设你是在使用声明式方法(而不是类-映射-表或核心方法)来定义你的模型。把这个方法调整到其他形式应该也比较简单。
要注意的是,Alembic 提供了一些基本的数据操作功能,比如 op.bulk_insert()
和 op.execute()
。如果你的操作比较简单,可以使用这些。如果迁移需要处理关系或其他复杂的交互,我更喜欢使用下面描述的模型和会话的全部功能。
以下是一个示例迁移脚本,它设置了一些声明式模型,用于在会话中操作数据。关键点有:
定义你需要的基本模型,包含你需要的列。你不需要每一列,只需要主键和你会用到的列。
在升级函数中,使用
op.get_bind()
来获取当前连接,并用它创建一个会话。- 或者使用
bind.execute()
,通过 SQLAlchemy 的底层直接写 SQL 查询。这对于简单的迁移很有用。
- 或者使用
像平常一样在你的应用中使用模型和会话。
"""create teams table
Revision ID: 169ad57156f0
Revises: 29b4c2bfce6d
Create Date: 2014-06-25 09:00:06.784170
"""
revision = '169ad57156f0'
down_revision = '29b4c2bfce6d'
from alembic import op
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Player(Base):
__tablename__ = 'players'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String, nullable=False)
team_name = sa.Column('team', sa.String, nullable=False)
team_id = sa.Column(sa.Integer, sa.ForeignKey('teams.id'), nullable=False)
team = orm.relationship('Team', backref='players')
class Team(Base):
__tablename__ = 'teams'
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String, nullable=False, unique=True)
def upgrade():
bind = op.get_bind()
session = orm.Session(bind=bind)
# create the teams table and the players.team_id column
Team.__table__.create(bind)
op.add_column('players', sa.Column('team_id', sa.ForeignKey('teams.id'), nullable=False)
# create teams for each team name
teams = {name: Team(name=name) for name in session.query(Player.team).distinct()}
session.add_all(teams.values())
# set player team based on team name
for player in session.query(Player):
player.team = teams[player.team_name]
session.commit()
# don't need team name now that team relationship is set
op.drop_column('players', 'team')
def downgrade():
bind = op.get_bind()
session = orm.Session(bind=bind)
# re-add the players.team column
op.add_column('players', sa.Column('team', sa.String, nullable=False)
# set players.team based on team relationship
for player in session.query(Player):
player.team_name = player.team.name
session.commit()
op.drop_column('players', 'team_id')
op.drop_table('teams')
迁移定义了独立的模型,因为你代码中的模型代表了数据库的 当前状态,而迁移则代表了 过程中的步骤。你的数据库可能处于这个路径上的任何状态,所以模型可能还没有和数据库同步。如果你不小心,直接使用真实模型可能会导致缺少列、数据无效等问题。明确说明在迁移中你将使用哪些列和模型会更清晰。