在SQLAlchemy(声明式样式)中处理插入时的重复主键

2024-04-25 14:27:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我的应用程序正在使用范围会话和SQLALchemy的声明式风格。它是一个web应用程序,许多DB插入都是由任务调度程序Celery执行的。

通常,当决定插入一个对象时,我的代码可能会沿着以下几行执行操作:

from schema import Session
from schema.models import Bike

pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
    new_bike = Bike(pk, "shiny", "bike")
    Session.add(new_bike)
    Session.commit()

这里的问题是,因为很多工作都是由异步工作人员完成的,所以一个工作人员可能在插入带有id=123Bike的过程中完成了一半,而另一个工作人员正在检查它的存在性。在这种情况下,第二个worker将尝试插入具有相同主键的行,SQLAlchemy将引发一个IntegrityError

我一辈子都找不到解决这个问题的好办法,除了把Session.commit()换成:

'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())

def commit(ignore=False):
    try:
        Session.commit()
    except IntegrityError as e:
        reason = e.message
        logger.warning(reason)

        if not ignore:
            raise e

        if "Duplicate entry" in reason:
            logger.info("%s already in table." % e.params[0])
            Session.rollback()

然后在任何地方我都有Session.commit我现在有schema.commit(ignore=True)我不介意行不再被插入。

在我看来,这似乎很脆弱,因为字符串检查。正如仅供参考,当IntegrityError升高时,它看起来是这样的:

(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")

当然,我插入的主键是Duplicate entry is a cool thing,我想我可能会漏掉IntegrityError,这并不是因为主键重复。

有没有更好的方法可以保持我使用的干净的SQLAlchemy方法(而不是开始用字符串写出语句等等…)

Db是MySQL(尽管对于单元测试,我喜欢使用SQLite,并且不想用任何新方法阻碍这种能力)。

干杯!


Tags: infromimportifsessionschemacommitreason
3条回答

如果使用session.merge(bike)而不是session.add(bike),则不会生成主键错误。将根据需要检索并更新或创建bike

我假设你的主键在某种程度上是自然的,这就是为什么你不能依赖普通的自动增量技术。所以假设这个问题实际上是需要插入的某个唯一列中的一个,这是比较常见的。

如果希望“尝试插入,失败时部分回滚”,可以使用保存点,对于SQLAlchemy,保存点是begin_nested()。下一个rollback()或commit()只作用于该保存点,而不是作用于更大范围的事件。

然而,总的来说,这里的模式只是一个真正应该避免的模式。你真正想在这里做的是三件事之一。一。不要运行处理需要插入的相同密钥的并发作业。2。以某种方式在与和3一起工作的并发密钥上同步作业。使用一些公共服务生成此特定类型的新记录,这些记录由作业共享(或确保在作业运行之前都已设置好)。

如果你仔细想想,2在任何情况下都是高度孤立的。开始两个postgres会话。会议1:

test=> create table foo(id integer primary key);
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"
CREATE TABLE
test=> begin;
BEGIN
test=> insert into foo (id) values (1);

会议2:

test=> begin;
BEGIN
test=> insert into foo(id) values(1);

您将看到的是,会话2块,因为PK#1的行被锁定。我不确定MySQL是否足够聪明,但这是正确的行为。如果您试图插入另一个PK:

^CCancel request sent
ERROR:  canceling statement due to user request
test=> rollback;
ROLLBACK
test=> begin;
BEGIN
test=> insert into foo(id) values(2);
INSERT 0 1
test=> \q

一切进展顺利,没有阻碍。

关键是,如果您正在进行这种PK/UQ争用,那么您的芹菜任务无论如何都要序列化自己,或者至少应该序列化。

您应该以同样的方式处理每一个IntegrityError:回滚事务,然后有选择地重试。有些数据库甚至不允许您在一个IntegrityError之后做更多的事情。您还可以在两个冲突事务开始时获取表上的锁,或者在数据库允许的情况下获取更细粒度的锁。

使用with语句显式开始事务,并自动提交(或回滚任何异常):

from schema import Session
from schema.models import Bike

session = Session()
with session.begin():
    pk = 123 # primary key
    bike = session.query(Bike).filter_by(bike_id=pk).first()
    if not bike: # no bike in DB
        new_bike = Bike(pk, "shiny", "bike")
        session.add(new_bike)

相关问题 更多 >

    热门问题