python + sqlalchemy:在循环中调用存储过程

1 投票
1 回答
3213 浏览
提问于 2025-04-17 16:21

我有一个Python脚本,它会定期查询一个MySQL数据库的数据(使用的是sqlalchemy 0.7.4)。它通过运行一个存储过程来完成这个查询。如果这个过程返回了数据,脚本就会尝试处理这些数据(这一部分和数据库没有关系),然后再通过第二个过程把结果保存回去。

处理完后,它会暂停一段时间(通常是一分钟),然后再重复这个过程,直到被停止。这个脚本应该可以连续运行几周。

我经常会遇到一个错误:“在无效事务回滚之前无法重新连接”。我根据找到的各种信息做了一些修改,现在我在想这样做是否能达到我想要的效果:

from sqlalchemy import create_engine, exc
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text, func
import time

class StoredProcedures():
    _engine = None
    _connection = None
    _session = None

    def __init__(self, cs):
        self._engine = create_engine(cs, encoding='utf-8', echo=False, pool_recycle=600)
        self._connection = self._engine.connect()
        Session = sessionmaker(bind=self._engine)
        self._session = Session()

    def sp_test_1(self, user_id):
        t = self._session.begin(subtransactions=True)

        try:
            query = 'call sp_get_files(%d)'%user_id
            result = self._session.execute(query).fetchall()
            t.close()
            return result
        except exc.DBAPIError, e: #Proper way of reconnecting?
            t.rollback()
            time.sleep(5)
            self._connection = self._engine.connect()
            Session = sessionmaker(bind=self._engine)
            self._session = Session()
        except:
            t.rollback()

        return None


cs = "mysql://test:test@127.0.0.1/test_db"
db_stored_procedures = StoredProcedures(cs)

while (True):
    files = db_stored_procedures.sp_test_1(1)
    if len(files) > 0:
        print "This is where processing happens"
        #And this is where the second procedure would be called to store the results
    time.sleep(15)

我测试过这个,但基本上只是写了它,所以还没有进行长期测试。我想先听听你们的意见。

编辑:最开始我是用连接来执行查询的,像这样(省略了大部分与上面相同的脚本):

def sp_test_1(self, user_id):
    t = self._connection.begin()

    try:
        query = 'call sp_get_files(%d)'%user_id
        result = self._connection.execute(query).fetchall()
        t.close()
        return result
    except exc.DBAPIError, e:
        #same as above
    except:
        t.rollback()

    return None

1 个回答

0

你正在使用一个会话接口,这个接口内部使用了一个事务对象,所以我觉得你不需要自己去管理事务。

我觉得只需要一个简单的代码就够了:

def sp_test_1(self, user_id):

    query = 'call sp_get_files(%d)'%user_id
    result = self._session.execute(query).fetchall()
    return result

如果这样还是出现同样的错误,能不能把完整的错误信息发出来?错误其实是我们的朋友,而不是敌人。:)

撰写回答