Python - SQLAlchemy - MySQL - 多个实例操作相同数据

6 投票
1 回答
3296 浏览
提问于 2025-04-18 17:56

我在数据库里有一个表,使用SQLAlchemy这个工具来管理数据(我有一个叫“scoped_session”的变量)。我希望我的程序可以在多个实例中运行(不仅仅是线程,还包括来自不同服务器的实例),并且能够同时处理同一个表里的数据,但不想处理同样的数据。

为此,我写了一个手动的“行锁”机制,确保每一行数据都能被独立处理。在这个过程中,我对整个表使用了“完全锁定”,同时也对行进行了锁定:

def instance:
        s = scoped_session(sessionmaker(bind=engine)
        engine.execute("LOCK TABLES my_data WRITE")
        rows = s.query(Row_model).filter(Row_model.condition == 1).filter(Row_model.is_locked == 0).limit(10)
        for row in rows:
            row.is_locked = 1
            row.lock_time = datetime.now()
        s.commit()
        engine.execute("UNLOCK TABLES")
        for row in row:
            manipulate_data(row)
            row.is_locked = 0
        s.commit()

for i in range(10):
    t = threading.Thread(target=instance)
    t.start()

问题是,当我运行多个实例时,有几个线程出现了崩溃,并产生了这个错误(每个线程都会出现):

sqlalchemy.exc.DatabaseError: (由于查询触发的自动刷新而引发;如果这个刷新发生得太早,可以考虑使用session.no_autoflush块)(DatabaseError) 1205 (HY000): 锁等待超时;请尝试重新启动事务 'UPDATE my_data SET row_var = 1'

问题出在哪里呢?是什么原因导致我的数据库表无法成功解锁?

谢谢。

1 个回答

0

锁是个麻烦事,尽量避免使用。出错的时候情况会变得很糟糕,尤其是当你把会话和原始SQL语句混在一起的时候。

有了作用域会话的好处在于,它可以把数据库操作包裹成一个事务。这个事务确保对数据库的修改是原子性的,也就是说,要么全部成功,要么全部失败,并且在出错时会自动处理清理工作。

你可以这样使用作用域会话:

with scoped_session(sessionmaker(bind=engine) as s:
    <ORM actions using s>

可能需要花一些时间来重写你的代码,让它变得符合事务的要求,但这绝对是值得的!Sqlalchemy有一些技巧可以帮助你做到这一点。

撰写回答