Sql Alchemy QueuePool 限制溢出

64 投票
3 回答
101679 浏览
提问于 2025-04-18 14:45

我有一个使用Sql Alchemy的应用程序,它出现了超时错误:

TimeoutError: 队列池的限制是5,溢出10,连接超时,超时时间30

我在其他帖子里看到,这种情况发生是因为我没有关闭会话,但我不确定这是否适用于我的代码:

我在init.py文件中连接到数据库:

from .dbmodels import (
    DBSession,
    Base,    

engine = create_engine("mysql://" + loadConfigVar("user") + ":" + loadConfigVar("password") + "@" + loadConfigVar("host") + "/" + loadConfigVar("schema"))

#Sets the engine to the session and the Base model class
DBSession.configure(bind=engine)
Base.metadata.bind = engine

然后在另一个Python文件中,我在两个函数里收集一些数据,但使用的是我在init.py中初始化的DBSession:

from .dbmodels import DBSession
from .dbmodels import resourcestatsModel

def getFeaturedGroups(max = 1):

    try:
        #Get the number of download per resource
        transaction.commit()
        rescount = DBSession.connection().execute("select resource_id,count(resource_id) as total FROM resourcestats")

        #Move the data to an array
        resources = []
        data = {}
        for row in rescount:
            data["resource_id"] = row.resource_id
            data["total"] = row.total
            resources.append(data)

        #Get the list of groups
        group_list = toolkit.get_action('group_list')({}, {})
        for group in group_list:
            #Get the details of each group
            group_info = toolkit.get_action('group_show')({}, {'id': group})
            #Count the features of the group
            addFesturedCount(resources,group,group_info)

        #Order the FeaturedGroups by total
        FeaturedGroups.sort(key=lambda x: x["total"],reverse=True)

        print FeaturedGroups
        #Move the data of the group to the result array.
        result = []
        count = 0
        for group in FeaturedGroups:
            group_info = toolkit.get_action('group_show')({}, {'id': group["group_id"]})
            result.append(group_info)
            count = count +1
            if count == max:
                break

        return result
    except:
        return []


    def getResourceStats(resourceID):
        transaction.commit()
        return  DBSession.query(resourcestatsModel).filter_by(resource_id = resourceID).count()

会话变量是这样创建的:

#Basic SQLAlchemy types
from sqlalchemy import (
    Column,
    Text,
    DateTime,
    Integer,
    ForeignKey
    )
# Use SQLAlchemy declarative type
from sqlalchemy.ext.declarative import declarative_base

#
from sqlalchemy.orm import (
    scoped_session,
    sessionmaker,
    )

#Use Zope' sqlalchemy  transaction manager
from zope.sqlalchemy import ZopeTransactionExtension

#Main plugin session
DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))

因为会话是在init.py中创建的,而在后续的代码中我只是使用它;那么我应该在什么地方关闭会话呢?或者我还需要做些什么来管理连接池的大小呢?

3 个回答

1
rescount = DBSession.connection().execute()

rescount 是一种叫做 <class 'sqlalchemy.engine.cursor.CursorResult'> 的类型。

你应该调用一下 close() 这个函数。

7

在你的代码中添加以下方法。它会自动关闭所有未使用或挂起的连接,防止你的代码出现瓶颈。特别是当你使用以下语法 Model.query.filter_by(attribute=var).first() 以及关系/懒加载时。

   @app.teardown_appcontext
    def shutdown_session(exception=None):
        db.session.remove()

关于这个的详细说明可以在这里找到: http://flask.pocoo.org/docs/1.0/appcontext/

78

你可以通过在函数 create_engine 中添加参数 pool_size 和 max_overflow 来管理连接池的大小。

engine = create_engine("mysql://" + loadConfigVar("user") + ":" + loadConfigVar("password") + "@" + loadConfigVar("host") + "/" + loadConfigVar("schema"), 
                        pool_size=20, max_overflow=0)

相关信息可以在 这里 找到。

你不需要关闭会话,但在你的事务完成后,连接应该被关闭。把:

rescount = DBSession.connection().execute("select resource_id,count(resource_id) as total FROM resourcestats")

替换为:

connection = DBSession.connection()
try:
    rescount = connection.execute("select resource_id,count(resource_id) as total FROM resourcestats")
    #do something
finally:
    connection.close()

相关信息可以在 这里 找到。

另外,要注意,MySQL 的连接如果长时间没有使用会在一段时间后自动关闭(这个时间可以在 MySQL 中配置,我不记得默认值是什么),所以你需要在创建引擎时传入 pool_recycle 的值。

撰写回答