Python SQLAlchemy-“MySQL服务器消失了”

2024-06-16 11:25:30 发布

您现在位置:Python中文网/ 问答频道 /正文

让我们看看下一个片段-

@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):

cursor = dbapi_con.cursor()
try:
    cursor.execute("SELECT 1")  # could also be dbapi_con.ping(),
                                # not sure what is better
except exc.OperationalError, ex:
    if ex.args[0] in (2006,   # MySQL server has gone away
                      2013,   # Lost connection to MySQL server during query
                      2055):  # Lost connection to MySQL server at '%s', system error: %d
        # caught by pool, which will retry with a new connection
        raise exc.DisconnectionError()
    else:
        raise


engine = create_engine('mysql://user:puss123@10.0.51.5/dbname', pool_recycle = 3600,pool_size=10, listeners=[check_connection])

session_factory = sessionmaker(bind = engine, autoflush=True, autocommit=False)
db_session = session_factory()

...
some code that may take several hours to run
...

db_session.execute('SELECT * FROM ' + P_TABLE + " WHERE id = '%s'" % id)        

我以为在checkout事件下注册checkout_连接函数可以解决这个问题,但是它没有 现在的问题是,我应该如何告诉SQLAlchemy处理连接丢失,以便每次调用execute()时,它都会检查连接是否可用,如果不可用,它将再次启动连接?

----更新----

SQLAlchemy的版本是0.7.4

----更新----

def checkout_listener(dbapi_con, con_record, con_proxy):
    try:
        try:
            dbapi_con.ping(False)
        except TypeError:
            dbapi_con.ping()
    except dbapi_con.OperationalError as exc:
        if exc.args[0] in (2006, 2013, 2014, 2045, 2055):
            raise DisconnectionError()
        else:
            raise


engine = create_engine(CONNECTION_URI, pool_recycle = 3600,pool_size=10)
event.listen(engine, 'checkout', checkout_listener)
session_factory = sessionmaker(bind = engine, autoflush=True, autocommit=False)
db_session = session_factory()

会话工厂被发送到每个新创建的线程

class IncidentProcessor(threading.Thread):

    def __init__(self, queue, session_factory):
        if not isinstance(queue, Queue.Queue):
            raise TypeError, "first argument should be of %s" (type(Queue.Queue))
        self.queue = queue
        self.db_session = scoped_session(session_factory)
        threading.Thread.__init__(self)

    def run(self):

    self.db_session().execute('SELECT * FROM ...')

    ...
        some code that takes alot of time
    ...

    self.db_session().execute('SELECT * FROM ...')

现在,当execute在一段时间后运行时,我会得到“MySQL server has gone away”错误


Tags: selfexecutedbsessionfactorydefconnectioncon
3条回答

你可以试试这样的方法:

while True:
    try:
        db_session.execute('SELECT * FROM ' + PONY_TABLE + " WHERE id = '%s'" % incident_id)
        break
    except SQLAlchemyError:
        db_session.rollback()

如果连接已断开,这将引发异常,会话将是rollbackd,它将重试,很可能成功。

尝试将pool_recycle参数设置为create_engine

来自the documentation

Connection Timeouts

MySQL features an automatic connection close behavior, for connections that have been idle for eight hours or more. To circumvent having this issue, use the pool_recycle option which controls the maximum age of any connection:

engine = create_engine('mysql+mysqldb://...', pool_recycle=3600)

有人讨论过这个问题,这个文档很好地描述了这个问题,所以我使用了他们推荐的方法来处理这些错误:http://discorporate.us/jek/talks/SQLAlchemy-EuroPython2010.pdf

看起来像这样:

from sqlalchemy import create_engine, event
from sqlalchemy.exc import DisconnectionError


def checkout_listener(dbapi_con, con_record, con_proxy):
    try:
        try:
            dbapi_con.ping(False)
        except TypeError:
            dbapi_con.ping()
    except dbapi_con.OperationalError as exc:
        if exc.args[0] in (2006, 2013, 2014, 2045, 2055):
            raise DisconnectionError()
        else:
            raise


db_engine = create_engine(DATABASE_CONNECTION_INFO,
                          pool_size=100,
                          pool_recycle=3600)
event.listen(db_engine, 'checkout', checkout_listener)

相关问题 更多 >