如何用SqlAlchemy进行upsert?

138 投票
12 回答
134594 浏览
提问于 2025-04-17 00:13

我有一条记录,如果数据库里没有这条记录,我希望它能被添加进去;如果已经存在(也就是说主键已经存在),我希望更新它的字段到当前的状态。这种操作通常被称为 upsert

下面这个不完整的代码片段展示了可以实现这个功能的方法,但看起来有点繁琐(特别是如果有很多列的话)。有没有更好的方法呢?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

有没有更简单或者更简洁的方法来做到这一点?像这样的方式就很好:

sess.upsert_this(desired_default, unique_key = "name")

虽然 unique_key 这个参数显然是不必要的(ORM应该能轻松搞定这个),我加上它只是因为SQLAlchemy通常只处理主键。例如,我在考虑 Session.merge 是否适用,但这个方法只对主键有效,而在这种情况下,主键是一个自增的ID,这对我来说并不是特别有用。

这个操作的一个简单使用场景是,当启动一个服务器应用程序时,它可能已经升级了默认的数据格式。也就是说,这个upsert操作没有并发问题。

12 个回答

34

现在,SQLAlchemy 提供了两个很有用的功能,分别是 on_conflict_do_nothingon_conflict_do_update。这两个功能很实用,但你需要从 ORM 接口切换到更底层的接口,也就是 SQLAlchemy Core

虽然这两个功能让使用 SQLAlchemy 进行插入或更新(upsert)变得不那么困难,但它们远不能提供一个完整的即插即用的解决方案。

我常见的使用场景是一次性在一个 SQL 查询或会话中插入或更新大量行。在进行插入或更新时,我通常会遇到两个问题:

首先,我们习惯使用的高级 ORM 功能缺失。你不能使用 ORM 对象,而是必须在插入时提供 ForeignKey

为了解决这两个问题,我使用了我自己写的 这个 函数:

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)
75

SQLAlchemy确实有一个“保存或更新”的功能,最近的版本已经把这个功能整合进了session.add里,但之前是通过单独的session.saveorupdate来实现的。这不是一个“插入或更新”的操作,但可能对你的需求来说已经足够了。

你问关于一个有多个唯一键的类,这个问题很好;我认为这正是为什么没有单一的正确方法来处理这个问题。主键也是一个唯一键。如果没有唯一约束,只有主键,那这个问题就简单多了:如果没有给定ID的记录,或者ID是空的,就创建一条新记录;否则就更新已有记录中其他字段的值。

但是,当有额外的唯一约束时,这种简单的方法就会出现逻辑问题。如果你想“插入或更新”一个对象,而这个对象的主键和已有记录匹配,但另一个唯一字段却匹配了一个不同的记录,那你该怎么办?同样的,如果主键没有匹配任何已有记录,但另一个唯一字段却匹配了已有记录,那又该如何处理?对于你的具体情况可能会有一个正确的答案,但一般来说,我认为没有单一的正确答案。

这就是为什么没有内置的“插入或更新”操作的原因。应用程序必须在每个特定情况下定义这意味着什么。

81

SQLAlchemy支持处理冲突的功能,主要有两种方法:on_conflict_do_update()on_conflict_do_nothing()

以下内容摘自官方文档

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)

撰写回答