如何高效地使用SQLAlchemy进行批量插入或更新?
我正在使用SQLAlchemy和Postgres数据库进行批量插入或更新。为了提高性能,我尝试每插入大约一千行数据就提交一次:
trans = engine.begin()
for i, rec in enumerate(records):
if i % 1000 == 0:
trans.commit()
trans = engine.begin()
try:
inserter.execute(...)
except sa.exceptions.SQLError:
my_table.update(...).execute()
trans.commit()
但是,这个方法并没有奏效。看起来当插入失败时,系统会把状态搞得很奇怪,这样就导致更新无法进行。是不是系统自动回滚了事务?如果是的话,有办法阻止这种情况吗?我不想因为出现问题而导致整个事务都被回滚,这也是我一开始就想捕捉异常的原因。
顺便提一下,我收到的错误信息是“sqlalchemy.exc.InternalError: (InternalError) 当前事务已中止,命令在事务块结束之前被忽略”,这个错误发生在update().execute()调用的时候。
2 个回答
这个错误来自PostgreSQL数据库。PostgreSQL不允许在同一个事务中执行命令,如果其中一个命令出错了,就会导致整个事务失败。要解决这个问题,你可以使用嵌套事务(通过SQL的保存点来实现),可以用conn.begin_nested()
来开始。这里有一些可能有效的做法。我把代码改成了使用明确的连接,分离了处理数据块的部分,并且使用了上下文管理器来正确管理事务。
from itertools import chain, islice
def chunked(seq, chunksize):
"""Yields items from an iterator in chunks."""
it = iter(seq)
while True:
yield chain([it.next()], islice(it, chunksize-1))
conn = engine.commit()
for chunk in chunked(records, 1000):
with conn.begin():
for rec in chunk:
try:
with conn.begin_nested():
conn.execute(inserter, ...)
except sa.exceptions.SQLError:
conn.execute(my_table.update(...))
不过,这样做的性能可能还是不太好,因为嵌套事务会增加开销。如果你想要更好的性能,可以提前通过查询来检测哪些行会出错,然后使用批量执行的方式(如果所有插入都使用相同的列,execute可以接收一个字典列表)。如果你需要处理并发更新,你仍然需要进行错误处理,可以选择重试或者逐条插入。
你遇到了一些奇怪的Postgresql特有的行为:如果在一个事务中发生错误,整个事务都会被撤销。我认为这是Postgres的设计缺陷;在某些情况下,需要用很多复杂的SQL技巧来解决这个问题。
一个解决方法是先进行更新操作。你可以通过查看cursor.rowcount来判断是否真的修改了某一行;如果没有修改任何行,说明该行并不存在,这时就可以进行插入操作。(当然,如果你更新的频率高于插入的频率,这样做会更快。)
另一个解决方法是使用保存点:
SAVEPOINT a;
INSERT INTO ....;
-- on error:
ROLLBACK TO SAVEPOINT a;
UPDATE ...;
-- on success:
RELEASE SAVEPOINT a;
不过,这对于生产环境的代码来说是个严重的问题:你必须准确地检测到错误。你可能是期待遇到唯一约束检查,但也可能会遇到一些意想不到的错误,而且很难可靠地区分预期的错误和意外的错误。如果错误条件被错误地触发,就会导致一些难以理解的问题,比如什么都没有更新或插入,但也没有看到错误信息。对此要非常小心。你可以通过查看Postgresql的错误代码来缩小错误范围,以确保是你预期的错误类型,但潜在的问题依然存在。
最后,如果你真的想要批量插入或更新,实际上你应该在几条命令中处理多个项目,而不是每个命令处理一个项目。这需要更复杂的SQL:在INSERT中嵌套SELECT,过滤出需要插入和更新的正确项目。