使用SQLAlchemy ORM进行批量插入

212 投票
12 回答
296386 浏览
提问于 2025-04-16 03:48

有没有办法让SQLAlchemy一次性插入多条数据,而不是一个一个地插入?也就是说,

像这样做:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

而不是:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

我刚把一些代码改成用SQLAlchemy,而不是直接用SQL,虽然现在用起来舒服多了,但速度似乎变慢了(慢了最多10倍),我在想这是不是原因。

也许我可以更有效地使用会话来改善这个情况。目前我设置了autoCommit=False,在添加了一些数据后再执行session.commit()。不过,这样似乎会导致数据变得过时,如果数据库在别的地方被修改了,即使我重新查询,还是会得到旧的结果?

谢谢你的帮助!

12 个回答

36

据我所知,ORM(对象关系映射)没有办法一次性插入很多数据。原因在于,SQLAlchemy需要跟踪每个对象的身份(也就是新的主键),而一次性插入会干扰这个过程。举个例子,假设你的 foo 表里有一个 id 列,并且这个表映射到了一个 Foo 类:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

因为SQLAlchemy直接从 INSERT 语句中获取了 x.id 的值,而没有发出新的查询,所以我们可以推测它是直接从插入语句中拿到的。如果你不需要通过同一个实例后续访问创建的对象,你可以跳过ORM层,直接插入数据:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy无法将这些新插入的行与任何现有对象匹配,所以在后续操作时,你需要重新查询这些对象。

关于过期数据,记住一点很重要:会话(session)没有内置的方法来知道数据库在会话外部发生了变化。为了通过现有实例访问外部修改的数据,这些实例必须被标记为过期。默认情况下,在 session.commit() 时会发生这种情况,但你也可以手动调用 session.expire_all()session.expire(instance) 来实现。举个例子(SQL省略):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit() 会使 x 过期,所以第一个打印语句会隐式地开启一个新事务,并重新查询 x 的属性。如果你注释掉第一个打印语句,你会发现第二个打印语句现在能获取到正确的值,因为新的查询是在更新之后才发出的。

从事务隔离的角度来看,这样的设计是合理的——你应该只在事务之间获取外部的修改。如果这给你带来了麻烦,我建议你先理清楚或重新考虑一下你应用的事务边界,而不是立刻使用 session.expire_all()

86

SQLAlchemy的文档里有一篇关于各种批量插入技术性能的介绍,具体可以查看这里

简单来说,ORM(对象关系映射)并不是为了高效的批量插入而设计的——这就是为什么SQLAlchemy除了ORM之外,还提供了Core作为一个重要的组成部分。

如果你需要快速进行批量插入,ORM所依赖的SQL生成和执行系统其实是Core的一部分。直接使用这个系统,我们可以生成一个INSERT语句,其性能可以和直接使用数据库的原始API相媲美。

另外,SQLAlchemy的ORM还提供了一套批量操作的方法,这些方法可以在工作单元的某些环节中插入Core级别的INSERT和UPDATE语句,同时保留一些ORM的自动化功能。

下面的例子展示了几种不同的插入行的方法的时间测试,从最自动化到最手动的方式。使用的是cPython 2.7,观察到的运行时间如下:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

脚本:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
281

SQLAlchemy 在 1.0.0 版本中引入了:

批量操作 - SQLAlchemy 文档

通过这些操作,你现在可以进行批量插入或更新了!

比如,你可以这样做:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

在这里,将进行一次批量插入。

撰写回答