需要检查和填充SQLite数据库的管理和操作代码
大家好,
更新:根据谷歌的结果和回答,我添加了更多提示,但还没有完成。
在使用sqlite3和学习sqlalchemy的过程中,我发现为了管理数据,有必要写下面的代码来处理一些日常维护的工作。不过,对于我来说,在sqlalchemy中实现这些功能可能比较困难,所以我又回到了sqlite3模块。
下面的代码列出了10个日常维护的步骤,大部分内容来自网络。我怀疑有经验的人能帮忙检查并补充缺失的部分。如果有人知道如何在SQLAlchemy中做到这些,也请分享一下,谢谢!
1. 检查数据库文件是否存在
import sqlite3
import os
database_name = "newdb.db"
if not os.path.isfile(database_name):
print "the database already exist"
# connect to to db, refer #2
db_connection = sqlite3.connect(database_name)
db_cursor = db_connection.cursor()
2. 检查数据库文件是否是有效的sqlite3格式
http://stackoverflow.com/questions/1516508/sqlite3-in-python
>>> c.execute("SELECT * FROM tbl")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
sqlite3.DatabaseError: file is encrypted or is not a database
=========sqlalchemy way ===============
http://www.mail-archive.com/sqlalchemy@googlegroups.com/msg20860.html
import os, os.path as osp
try:
from pysqlite2 import dbapi2 as sqlite
except:
import sqlite3 as sqlite
def isSQLite(filename):
"""True if filename is a SQLite database
File is database if: (1) file exists, (2) length is non-zero,
(3) can connect, (4) has sqlite_master table
"""
# validate file exists
if not osp.isfile(filename):
return False
# is not an empty file
if not os.stat(filename).st_size:
return False
# can open a connection
try:
conn = sqlite.connect(filename)
except:
return False
# has sqlite_master
try:
result = conn.execute('pragma table_info(sqlite_master)').fetchall()
if len(result) == 0:
conn.close()
return False
except:
conn.close()
return False
# looks like a good database
conn.close()
return True
3. 检查表是否存在
c=conn.cursor()
if table_name in [row for row in c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='table_name';")]
4. 备份磁盘上的数据库文件
http://stuvel.eu/archive/55/safely-copy-a-sqlite-database
import shutil, os, sqlite3
if not os.path.isdir ( backupdir ):
raise Exception
backupfile = os.path.join ( backupdir, os.path.basename(dbfile) + time.strftime(".%Y%m%d-%H%M") )
db = sqlite3.connect ( dbfile )
cur = db.cursor ()
cur.execute ( 'begin immediate' )
shutil.copyfile ( dbfile, backupfile )
cur.execute ( 'rollback' )
=========or========
http://github.com/husio/python-sqlite3-backup
=========or========
http://docs.python.org/release/2.6/library/sqlite3.html#sqlite3.Connection.iterdump
5. 在同一个数据库文件中备份表
c=conn.cursor()
c.execute("CREATE TABLE demo_backup AS SELECT * FROM demo;")
6. 重命名表
c.execute("ALTER TABLE foo RENAME TO bar;")
7. 从不同的数据库中复制表:
Thanks, MPelletier
Connect to one database
db_connection = sqlite3.connect(database_file)
Attach the second database
db_connection.execute("ATTACH database_file2 AS database_name2")
Insert from one to the other:
db_connection.execute("INSERT INTO FooTable SELECT * FROM database_name2.FooTable")
==========or============
db_connection.execute("INSERT INTO database_name2.FooTable SELECT * FROM FooTable")
========sqlalchemy way======
http://www.mail-archive.com/sqlalchemy@googlegroups.com/msg11563.html
def duplicateToDisk(self, file):
'''Tohle ulozi databazi, ktera byla pouze v pameti, na disk'''
cur = self.connection()
import os
if os.path.exists(file):
os.remove(file)
cur.execute("attach %s as extern" % file)
self.checkTable('extern.dictionary')
cur.execute("insert into extern.dictionary select * from dictionary")
cur.execute("detach extern")
self.commit()
8. 检查数据库是否被锁定?
#possible?
try:
c = sqlite.connect(database_name, timeout=0)
c.commit()
except OperationalError # OperationalError: database is locked
9. 连接数据库超时,等待其他调用者释放锁
c = sqlite.connect(database_name, timeout=30.0) # default 5sec
10. 强制释放所有数据库连接/提交,也就是释放所有锁?
refer #12
11. 在Python中使用sqlite的多线程:
http://code.activestate.com/recipes/526618/
http://www.yeraze.com/2009/01/python-sqlite-multiple-threads/
12. 从SQLAlchemy获取连接?
#from FAQ
#try to reuse the connection pool from SQLAlchemy
engine = create_engine(...)
conn = engine.connect() #****1
conn.connection.<do DBAPI things>
cursor = conn.connection.cursor(<DBAPI specific arguments..>)
===or ==== can out of pool's manage
conn = engine.connect()
conn.detach() # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close() # connection is closed for real, the pool replaces it with a new connect
========and not sure if this works ===========
#from sqlalchemy document #http://www.sqlalchemy.org/docs/reference/sqlalchemy/pooling.html?highlight=connection%20pool
import sqlalchemy.pool as pool
import sqlite3 as sqlite3
conn_proxy = pool.manage(sqlite3)
# then connect normally
connection = conn_proxy.connect(...)
=====================================================================
#****1 : what is #****1 on above code invoked =_=!!
A engine.raw_connection()
=
A pool.unique_connection()
=
A _ConnectionFairy(self).checkout()
=
A return _ConnectionFairy <== cls
= _connection_record.get_connection()
= _ConnectionRecord.connection
= return a pool.creator **which is a callable function that returns a DB-API connection object**
感谢大家的时间!
祝好,
KC
1 个回答
2
要在不同的数据库之间复制数据,SQLite的一般步骤是:
先连接到一个数据库
db_connection = sqlite3.connect(database_file)
然后把第二个数据库附加上来
db_connection.execute("ATTACH database_file2 AS database_name2")
接着把数据从一个数据库插入到另一个数据库:
db_connection.execute("INSERT INTO FooTable SELECT * FROM database_name2.FooTable")
或者
db_connection.execute("INSERT INTO database_name2.FooTable SELECT * FROM FooTable")