如何对基于SQLAlchemy的应用进行性能分析?

57 投票
5 回答
23726 浏览
提问于 2025-04-15 13:06

有没有人有过对Python/SQLAlchemy应用进行性能分析的经验?有没有什么好的方法可以找到瓶颈和设计缺陷?

我们有一个Python应用,数据库部分是用SQLAlchemy来处理的。这个应用采用批处理设计,所以很多数据库请求是一个接一个地顺序进行,并且是在有限的时间内完成的。目前运行的时间有点长,所以需要进行一些优化。我们没有使用ORM的功能,数据库是PostgreSQL。

5 个回答

4

我刚刚发现了一个叫做 sqltap 的库(https://github.com/inconshreveable/sqltap)。这个库可以生成漂亮的HTML页面,帮助我们查看和分析由SQLAlchemy生成的SQL查询。

下面是一个使用的例子:

profiler = sqltap.start()
run_some_queries()
statistics = profiler.collect()
sqltap.report(statistics, "report.html")

这个库已经有两年没有更新了,不过我今天在我的应用中测试的时候,它运行得很好。

56

SQLAlchemy的维基上,有一个非常有用的性能分析方法。

只需做一些小修改,

from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
import logging

logging.basicConfig()
logger = logging.getLogger("myapp.sqltime")
logger.setLevel(logging.DEBUG)

@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, 
                        parameters, context, executemany):
    context._query_start_time = time.time()
    logger.debug("Start Query:\n%s" % statement)
    # Modification for StackOverflow answer:
    # Show parameters, which might be too verbose, depending on usage..
    logger.debug("Parameters:\n%r" % (parameters,))


@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, 
                        parameters, context, executemany):
    total = time.time() - context._query_start_time
    logger.debug("Query Complete!")

    # Modification for StackOverflow: times in milliseconds
    logger.debug("Total Time: %.02fms" % (total*1000))

if __name__ == '__main__':
    from sqlalchemy import *

    engine = create_engine('sqlite://')

    m1 = MetaData(engine)
    t1 = Table("sometable", m1, 
            Column("id", Integer, primary_key=True),
            Column("data", String(255), nullable=False),
        )

    conn = engine.connect()
    m1.create_all(conn)

    conn.execute(
        t1.insert(), 
        [{"data":"entry %d" % x} for x in xrange(100000)]
    )

    conn.execute(
        t1.select().where(t1.c.data.between("entry 25", "entry 7800")).order_by(desc(t1.c.data))
    )

输出结果大概是这样的:

DEBUG:myapp.sqltime:Start Query:
SELECT sometable.id, sometable.data 
FROM sometable 
WHERE sometable.data BETWEEN ? AND ? ORDER BY sometable.data DESC
DEBUG:myapp.sqltime:Parameters:
('entry 25', 'entry 7800')
DEBUG:myapp.sqltime:Query Complete!
DEBUG:myapp.sqltime:Total Time: 410.46ms

如果你发现某个查询特别慢,可以把查询字符串拿出来,按照参数格式化(至少对于psycopg2可以用%这个字符串格式化操作符),然后在前面加上“EXPLAIN ANALYZE”,再把查询计划的输出放到http://explain.depesz.com/上(这个网站可以通过这篇关于PostgreSQL性能的好文章找到)。

77

有时候,简单的SQL日志记录(通过Python的日志模块或在create_engine()中使用echo=True参数来启用)可以让你了解操作花了多长时间。例如,如果你在SQL操作后记录了一些信息,你的日志中可能会看到这样的内容:

17:37:48,325 INFO  [sqlalchemy.engine.base.Engine.0x...048c] SELECT ...
17:37:48,326 INFO  [sqlalchemy.engine.base.Engine.0x...048c] {<params>}
17:37:48,660 DEBUG [myapp.somemessage] 

如果你在操作后记录了myapp.somemessage,那么你就知道SQL部分的操作花了334毫秒。

记录SQL还可以显示是否发出了成百上千的查询,这些查询本可以通过连接(joins)更好地组织成更少的查询。当使用SQLAlchemy的ORM时,"急切加载"功能可以部分(contains_eager())或完全(eagerload(), eagerload_all())自动化这个过程,但如果不使用ORM,这就意味着要使用连接,以便在一个结果集中加载多个表的结果,而不是随着深度增加而增加查询的数量(例如r + r*r2 + r*r2*r3 ...)。

如果日志显示单个查询花费的时间太长,你需要分析一下在数据库中处理查询、通过网络发送结果、被DBAPI处理,以及最终被SQLAlchemy的结果集和/或ORM层接收的每个阶段花了多少时间。这些阶段可能会有各自的瓶颈,具体情况具体分析。

为此,你需要使用性能分析工具,比如cProfile或hotshot。我这里有一个我使用的装饰器:

import cProfile as profiler
import gc, pstats, time

def profile(fn):
    def wrapper(*args, **kw):
        elapsed, stat_loader, result = _profile("foo.txt", fn, *args, **kw)
        stats = stat_loader()
        stats.sort_stats('cumulative')
        stats.print_stats()
        # uncomment this to see who's calling what
        # stats.print_callers()
        return result
    return wrapper

def _profile(filename, fn, *args, **kw):
    load_stats = lambda: pstats.Stats(filename)
    gc.collect()

    began = time.time()
    profiler.runctx('result = fn(*args, **kw)', globals(), locals(),
                    filename=filename)
    ended = time.time()

    return ended - began, load_stats, locals()['result']

要分析一段代码,可以把它放在一个带有装饰器的函数中:

@profile
def go():
    return Session.query(FooClass).filter(FooClass.somevalue==8).all()
myfoos = go()

性能分析的输出可以帮助你了解时间花在哪里了。例如,如果你发现大部分时间都花在cursor.execute()上,那就是低级别的DBAPI调用数据库,这意味着你的查询需要优化,可能是通过添加索引或重构查询和/或底层架构。为此,我建议使用pgadmin及其图形化的EXPLAIN工具来查看查询在做什么。

如果你看到成千上万次与获取行相关的调用,这可能意味着你的查询返回的行数超过了预期——不完整的连接可能导致笛卡尔积的问题。还有一个问题是类型处理上花费的时间——像Unicode这样的SQLAlchemy类型会对绑定参数和结果列进行字符串编码/解码,这在某些情况下可能并不需要。

性能分析的输出可能看起来有点吓人,但经过一些练习后,它们其实很容易理解。曾经有个人在邮件列表上抱怨速度慢,在他发布性能分析结果后,我能够证明速度问题是由于网络延迟造成的——在cursor.execute()和所有Python方法中花费的时间都很快,而大部分时间都花在了socket.receive()上。

如果你觉得有挑战性,还有一个更复杂的SQLAlchemy性能分析示例,可以在SQLAlchemy的单元测试中找到,如果你去看看http://www.sqlalchemy.org/trac/browser/sqlalchemy/trunk/test/aaa_profiling。在那里,我们有使用装饰器的测试,确保特定操作使用的最大方法调用次数,这样如果有低效的代码被提交,测试就会揭示出来(需要注意的是,在Python中,函数调用的开销是所有操作中最高的,调用次数通常与花费的时间成正比)。值得注意的是“zoomark”测试,它使用了一种精巧的“SQL捕获”方案,将DBAPI的开销排除在外——虽然这种技术对于普通的性能分析并不是必需的。

撰写回答