如何在多线程Python应用中共享单一SQLite连接

15 投票
3 回答
37328 浏览
提问于 2025-04-18 00:31

我正在尝试写一个多线程的Python应用程序,想让多个线程共享一个SQLite连接。但我遇到了一些问题,无法让它正常工作。实际上,我的应用是一个cherrypy网络服务器,但下面这段简单的代码可以展示我遇到的问题。

我需要做什么修改,才能成功运行下面的示例代码呢?

当我把THREAD_COUNT设置为1时,程序运行得很好,数据库也按我预期更新了(也就是说,字母“X”被添加到了SectorGroup列的文本值中)。

但是当我把THREAD_COUNT设置为大于1的值时,除了一个线程外,其他所有线程都提前终止,并出现与SQLite相关的异常。不同的线程抛出不同的异常(没有明显的规律),包括:

OperationalError: cannot start a transaction within a transaction 

(发生在UPDATE语句上)

OperationalError: cannot commit - no transaction is active 

(发生在.commit()调用上)

InterfaceError: Error binding parameter 0 - probably unsupported type. 

(发生在UPDATESELECT语句上)

IndexError: tuple index out of range

(这个让我完全困惑,它发生在语句group = rows[0][0] or ''上,但只有在多个线程同时运行时才会出现)

以下是代码:

CONNECTION = sqlite3.connect('./database/mydb', detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread = False)
CONNECTION.row_factory = sqlite3.Row

def commands(start_id):

    # loop over 100 records, read the SectorGroup column, and write it back with "X" appended.
    for inv_id in range(start_id, start_id + 100):

        rows = CONNECTION.execute('SELECT SectorGroup FROM Investment WHERE InvestmentID = ?;', [inv_id]).fetchall()
        if rows:
            group = rows[0][0] or ''
            msg = '{} inv {} = {}'.format(current_thread().name, inv_id, group)
            print msg
            CONNECTION.execute('UPDATE Investment SET SectorGroup = ? WHERE InvestmentID = ?;', [group + 'X', inv_id])

        CONNECTION.commit()

if __name__ == '__main__':

    THREAD_COUNT = 10

    for i in range(THREAD_COUNT):
        t = Thread(target=commands, args=(i*100,))
        t.start()

3 个回答

0

我在这里猜测一下,但看起来你这样做的原因是为了提高性能。

在这个情况下,Python 的线程并不能有效提升性能。相反,你应该使用 sqlite 的事务,这样会快得多。

如果你把所有的更新都放在一个事务中进行,你会发现速度提升非常明显。

7

我在写一个简单的WSGI服务器时遇到了SqLite的线程问题,这个项目是为了好玩和学习。WSGI在Apache下本身就是多线程的。以下代码对我来说似乎可以正常工作:

import sqlite3
import threading

class LockableCursor:
    def __init__ (self, cursor):
        self.cursor = cursor
        self.lock = threading.Lock ()

    def execute (self, arg0, arg1 = None):
        self.lock.acquire ()

        try:
            self.cursor.execute (arg1 if arg1 else arg0)

            if arg1:
                if arg0 == 'all':
                    result = self.cursor.fetchall ()
                elif arg0 == 'one':
                    result = self.cursor.fetchone ()
        except Exception as exception:
            raise exception

        finally:
            self.lock.release ()
            if arg1:
                return result

def dictFactory (cursor, row):
    aDict = {}
    for iField, field in enumerate (cursor.description):
        aDict [field [0]] = row [iField]
    return aDict

class Db:
    def __init__ (self, app):
        self.app = app

    def connect (self):
        self.connection = sqlite3.connect (self.app.dbFileName, check_same_thread = False, isolation_level = None)  # Will create db if nonexistent
        self.connection.row_factory = dictFactory
        self.cs = LockableCursor (self.connection.cursor ())

使用示例:

if not ok and self.user:    # Not logged out
    # Get role data for any later use
    userIdsRoleIds = self.cs.execute ('all', 'SELECT role_id FROM users_roles WHERE user_id == {}'.format (self.user ['id']))

    for userIdRoleId in userIdsRoleIds:
        self.userRoles.append (self.cs.execute ('one', 'SELECT name FROM roles WHERE id == {}'.format (userIdRoleId ['role_id'])))

另一个示例:

self.cs.execute ('CREATE TABLE users (id INTEGER PRIMARY KEY, email_address, password, token)')         
self.cs.execute ('INSERT INTO users (email_address, password) VALUES ("{}", "{}")'.format (self.app.defaultUserEmailAddress, self.app.defaultUserPassword))

# Create roles table and insert default role
self.cs.execute ('CREATE TABLE roles (id INTEGER PRIMARY KEY, name)')
self.cs.execute ('INSERT INTO roles (name) VALUES ("{}")'.format (self.app.defaultRoleName))

# Create users_roles table and assign default role to default user
self.cs.execute ('CREATE TABLE users_roles (id INTEGER PRIMARY KEY, user_id, role_id)') 

defaultUserId = self.cs.execute ('one', 'SELECT id FROM users WHERE email_address = "{}"'.format (self.app.defaultUserEmailAddress)) ['id']         
defaultRoleId = self.cs.execute ('one', 'SELECT id FROM roles WHERE name = "{}"'.format (self.app.defaultRoleName)) ['id']

self.cs.execute ('INSERT INTO users_roles (user_id, role_id) VALUES ({}, {})'.format (defaultUserId, defaultRoleId))

完整的程序可以在这里下载: http://www.josmith.org/

注意:上面的代码是实验性的,当用在(很多)并发请求时,可能会有一些(根本性的)问题(比如作为WSGI服务器的一部分)。对于我的应用来说,性能并不是关键。其实最简单的做法可能就是直接用MySql,但我喜欢尝试一些新东西,而且SqLite不需要安装这一点让我很感兴趣。如果有人觉得上面的代码有根本性的缺陷,请告诉我,因为我的目的是学习。如果没有,我希望这对其他人有帮助。

19

在不同的线程之间共享连接是不安全的;至少你需要使用锁来确保访问是有序的。你还应该查看一下这个链接,因为旧版本的SQLite还有更多问题。

关于check_same_thread这个选项,它的文档似乎故意写得不够详细,具体可以看看这个链接

你可以考虑每个线程使用一个连接,或者使用SQLAlchemy来管理连接池(这样可以更高效地处理工作和排队)。

撰写回答