需要检查和填充SQLite数据库的管理和操作代码

3 投票
1 回答
2200 浏览
提问于 2025-04-16 03:12

大家好,

更新:根据谷歌的结果和回答,我添加了更多提示,但还没有完成。

在使用sqlite3和学习sqlalchemy的过程中,我发现为了管理数据,有必要写下面的代码来处理一些日常维护的工作。不过,对于我来说,在sqlalchemy中实现这些功能可能比较困难,所以我又回到了sqlite3模块。

下面的代码列出了10个日常维护的步骤,大部分内容来自网络。我怀疑有经验的人能帮忙检查并补充缺失的部分。如果有人知道如何在SQLAlchemy中做到这些,也请分享一下,谢谢!

1. 检查数据库文件是否存在

import sqlite3 
import os 
database_name = "newdb.db" 
if not os.path.isfile(database_name): 
    print "the database already exist" 

# connect to to db, refer #2
db_connection = sqlite3.connect(database_name) 
db_cursor = db_connection.cursor() 

2. 检查数据库文件是否是有效的sqlite3格式

    http://stackoverflow.com/questions/1516508/sqlite3-in-python


    >>> c.execute("SELECT * FROM tbl") 
    Traceback (most recent call last): 
      File "<stdin>", line 1, in <module> 
    sqlite3.DatabaseError: file is encrypted or is not a database
    =========sqlalchemy way ===============
    http://www.mail-archive.com/sqlalchemy@googlegroups.com/msg20860.html
    import os, os.path as osp
try:
    from pysqlite2 import dbapi2 as sqlite
except:
    import sqlite3 as sqlite

def isSQLite(filename):
    """True if filename is a SQLite database
    File is database if: (1) file exists, (2) length is non-zero,
                        (3) can connect, (4) has sqlite_master table
    """
    # validate file exists
    if not osp.isfile(filename):
        return False
    # is not an empty file
    if not os.stat(filename).st_size:
        return False
    # can open a connection
    try:
        conn = sqlite.connect(filename)
    except:
        return False
    # has sqlite_master
    try:
        result = conn.execute('pragma table_info(sqlite_master)').fetchall()
        if len(result) == 0:
            conn.close()
            return False
    except:
        conn.close()
        return False

    # looks like a good database
    conn.close()
    return True 

3. 检查表是否存在

c=conn.cursor() 
if table_name in [row for row in c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='table_name';")]

4. 备份磁盘上的数据库文件

http://stuvel.eu/archive/55/safely-copy-a-sqlite-database
import shutil, os, sqlite3
if not os.path.isdir ( backupdir ):
    raise Exception 
backupfile = os.path.join ( backupdir, os.path.basename(dbfile) + time.strftime(".%Y%m%d-%H%M") )
db = sqlite3.connect ( dbfile )
cur = db.cursor ()
cur.execute ( 'begin immediate' )
shutil.copyfile ( dbfile, backupfile )
cur.execute ( 'rollback' )
=========or========
http://github.com/husio/python-sqlite3-backup
=========or========
http://docs.python.org/release/2.6/library/sqlite3.html#sqlite3.Connection.iterdump

5. 在同一个数据库文件中备份表

   c=conn.cursor() 
   c.execute("CREATE TABLE demo_backup AS SELECT * FROM demo;") 

6. 重命名表

c.execute("ALTER TABLE foo RENAME TO bar;")

7. 从不同的数据库中复制表:

Thanks, MPelletier

Connect to one database 
db_connection = sqlite3.connect(database_file) 
Attach the second database
db_connection.execute("ATTACH database_file2 AS database_name2")
Insert from one to the other:
db_connection.execute("INSERT INTO FooTable SELECT * FROM database_name2.FooTable")
==========or============
db_connection.execute("INSERT INTO database_name2.FooTable SELECT * FROM FooTable")

    ========sqlalchemy way======
    http://www.mail-archive.com/sqlalchemy@googlegroups.com/msg11563.html
      def duplicateToDisk(self, file):
    '''Tohle ulozi databazi, ktera byla pouze v pameti, na disk'''
    cur = self.connection()
    import os
    if os.path.exists(file):
      os.remove(file)
    cur.execute("attach %s as extern" % file)

    self.checkTable('extern.dictionary')
    cur.execute("insert into extern.dictionary select * from dictionary")
    cur.execute("detach extern")
    self.commit()

8. 检查数据库是否被锁定?

 #possible?
 try:
    c = sqlite.connect(database_name, timeout=0)  
    c.commit() 
 except OperationalError               # OperationalError: database is locked  

9. 连接数据库超时,等待其他调用者释放锁

c = sqlite.connect(database_name, timeout=30.0)  # default 5sec

10. 强制释放所有数据库连接/提交,也就是释放所有锁?

   refer #12

11. 在Python中使用sqlite的多线程:

http://code.activestate.com/recipes/526618/
http://www.yeraze.com/2009/01/python-sqlite-multiple-threads/

12. 从SQLAlchemy获取连接?

   #from FAQ
    #try to reuse the connection pool from SQLAlchemy
    engine = create_engine(...)
    conn = engine.connect()              #****1
    conn.connection.<do DBAPI things>
    cursor = conn.connection.cursor(<DBAPI specific arguments..>)
    ===or ==== can out of pool's manage
    conn = engine.connect()
    conn.detach()  # detaches the DBAPI connection from the connection pool
    conn.connection.<go nuts>
    conn.close()  # connection is closed for real, the pool replaces it with a new connect

    ========and not sure if this works ===========
#from sqlalchemy document                #http://www.sqlalchemy.org/docs/reference/sqlalchemy/pooling.html?highlight=connection%20pool
import sqlalchemy.pool as pool
import sqlite3 as sqlite3

conn_proxy = pool.manage(sqlite3)
# then connect normally
connection = conn_proxy.connect(...)


=====================================================================
    #****1  : what is #****1 on above code invoked                  =_=!!
    A engine.raw_connection()
    =
    A pool.unique_connection()
    =
    A _ConnectionFairy(self).checkout()
    =
    A return _ConnectionFairy <== cls
    =   _connection_record.get_connection()
    =            _ConnectionRecord.connection
    =                return a pool.creator **which is a callable function that returns a DB-API connection object**

感谢大家的时间!

祝好,
KC

1 个回答

2

要在不同的数据库之间复制数据,SQLite的一般步骤是:

  1. 先连接到一个数据库

    db_connection = sqlite3.connect(database_file) 
    
  2. 然后把第二个数据库附加上来

    db_connection.execute("ATTACH database_file2 AS database_name2")
    
  3. 接着把数据从一个数据库插入到另一个数据库:

    db_connection.execute("INSERT INTO FooTable SELECT * FROM database_name2.FooTable")
    

    或者

    db_connection.execute("INSERT INTO database_name2.FooTable SELECT * FROM FooTable")
    

撰写回答