MySQL/SQLAlchemy中的死锁重试

27 投票
2 回答
11712 浏览
提问于 2025-04-18 06:46

我搜索了很久,但还是找不到解决我问题的方法。我们在项目中使用SQLAlchemy和MySQL,遇到了一个让人头疼的错误:

1213,'尝试获取锁时发现死锁;请尝试重新启动事务'。

我们希望在这种情况下最多尝试重新启动事务三次。

我开始写一个装饰器来实现这个功能,但我不知道在失败之前如何保存会话状态,然后在失败后重新尝试相同的事务?(因为SQLAlchemy在出现异常时需要回滚)

到目前为止我的工作:

def retry_on_deadlock_decorator(func):
    lock_messages_error = ['Deadlock found', 'Lock wait timeout exceeded']

    @wraps(func)
    def wrapper(*args, **kwargs):
        attempt_count = 0
        while attempt_count < settings.MAXIMUM_RETRY_ON_DEADLOCK:
            try:
                return func(*args, **kwargs)
            except OperationalError as e:
                if any(msg in e.message for msg in lock_messages_error) \
                        and attempt_count <= settings.MAXIMUM_RETRY_ON_DEADLOCK:
                    logger.error('Deadlock detected. Trying sql transaction once more. Attempts count: %s'
                                 % (attempt_count + 1))
                else:
                    raise
            attempt_count += 1
    return wrapper

2 个回答

1

你是不是用过这样的代码呢?

try: 

     Perform table transaction 
     break 
except: 
     rollback 
     delay 
     try again to perform table transaction 

处理死锁的唯一真正方法就是让你的代码做好应对死锁的准备。如果你的数据库代码写得不错,这通常并不难。你可以在查询执行的逻辑周围加一个尝试/捕获的结构,当出现错误时就检查是否是死锁。如果捕获到了死锁,通常的做法就是重新尝试执行失败的查询。

有用的链接:

3

你其实无法从外部直接操作SessionSession需要在内部支持这种操作。这涉及到保存很多私有状态,所以可能不值得你花时间去做。

我完全放弃了大部分ORM的东西,转而使用更底层的SQLAlchemy Core接口。使用这个(或者任何dbapi接口),你可以很简单地使用你的retry_on_deadlock_decorator装饰器(见上面的提问),来制作一个支持重试的db.execute封装。

 @retry_on_deadlock_decorator
 def deadlock_safe_execute(db, stmt, *args, **kw):
     return db.execute(stmt, *args, **kw)

而不是

 db.execute("UPDATE users SET active=0")

你可以这样做

 deadlock_safe_execute(db, "UPDATE users SET active=0")

这样如果发生死锁,它会自动重试。

撰写回答