如何高效地使用SQLAlchemy进行批量插入或更新?

8 投票
2 回答
8160 浏览
提问于 2025-04-15 13:52

我正在使用SQLAlchemy和Postgres数据库进行批量插入或更新。为了提高性能,我尝试每插入大约一千行数据就提交一次:

trans = engine.begin()
  for i, rec in enumerate(records):
    if i % 1000 == 0:
      trans.commit()
      trans = engine.begin()
    try:
        inserter.execute(...)
    except sa.exceptions.SQLError:
        my_table.update(...).execute()
trans.commit()

但是,这个方法并没有奏效。看起来当插入失败时,系统会把状态搞得很奇怪,这样就导致更新无法进行。是不是系统自动回滚了事务?如果是的话,有办法阻止这种情况吗?我不想因为出现问题而导致整个事务都被回滚,这也是我一开始就想捕捉异常的原因。

顺便提一下,我收到的错误信息是“sqlalchemy.exc.InternalError: (InternalError) 当前事务已中止,命令在事务块结束之前被忽略”,这个错误发生在update().execute()调用的时候。

2 个回答

4

这个错误来自PostgreSQL数据库。PostgreSQL不允许在同一个事务中执行命令,如果其中一个命令出错了,就会导致整个事务失败。要解决这个问题,你可以使用嵌套事务(通过SQL的保存点来实现),可以用conn.begin_nested()来开始。这里有一些可能有效的做法。我把代码改成了使用明确的连接,分离了处理数据块的部分,并且使用了上下文管理器来正确管理事务。

from itertools import chain, islice
def chunked(seq, chunksize):
    """Yields items from an iterator in chunks."""
    it = iter(seq)
    while True:
        yield chain([it.next()], islice(it, chunksize-1))

conn = engine.commit()
for chunk in chunked(records, 1000):
    with conn.begin():
        for rec in chunk:
            try:
                with conn.begin_nested():
                     conn.execute(inserter, ...)
            except sa.exceptions.SQLError:
                conn.execute(my_table.update(...))

不过,这样做的性能可能还是不太好,因为嵌套事务会增加开销。如果你想要更好的性能,可以提前通过查询来检测哪些行会出错,然后使用批量执行的方式(如果所有插入都使用相同的列,execute可以接收一个字典列表)。如果你需要处理并发更新,你仍然需要进行错误处理,可以选择重试或者逐条插入。

5

你遇到了一些奇怪的Postgresql特有的行为:如果在一个事务中发生错误,整个事务都会被撤销。我认为这是Postgres的设计缺陷;在某些情况下,需要用很多复杂的SQL技巧来解决这个问题。

一个解决方法是先进行更新操作。你可以通过查看cursor.rowcount来判断是否真的修改了某一行;如果没有修改任何行,说明该行并不存在,这时就可以进行插入操作。(当然,如果你更新的频率高于插入的频率,这样做会更快。)

另一个解决方法是使用保存点:

SAVEPOINT a;
INSERT INTO ....;
-- on error:
ROLLBACK TO SAVEPOINT a;
UPDATE ...;
-- on success:
RELEASE SAVEPOINT a;

不过,这对于生产环境的代码来说是个严重的问题:你必须准确地检测到错误。你可能是期待遇到唯一约束检查,但也可能会遇到一些意想不到的错误,而且很难可靠地区分预期的错误和意外的错误。如果错误条件被错误地触发,就会导致一些难以理解的问题,比如什么都没有更新或插入,但也没有看到错误信息。对此要非常小心。你可以通过查看Postgresql的错误代码来缩小错误范围,以确保是你预期的错误类型,但潜在的问题依然存在。

最后,如果你真的想要批量插入或更新,实际上你应该在几条命令中处理多个项目,而不是每个命令处理一个项目。这需要更复杂的SQL:在INSERT中嵌套SELECT,过滤出需要插入和更新的正确项目。

撰写回答