访问SQLite DB/w python并获取格式错误的数据库

2024-06-06 11:01:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一些python代码可以跨sftp复制SQLite db。然而,它是一个高度活跃的数据库,所以很多时候我都会遇到一个格式不正确的数据库。我正在考虑这些可能的选项,但我不知道如何实现它们,因为我是python新手

  • 复制sqlite数据库的替代方法
  • 也许有办法从设备中查询sqlite文件?不确定这是否可行,因为sqlite更像是一个本地数据库,不确定如何像查询mysql一样查询它
  • 创建一个循环?我可以在异常中再次调用该函数,但不确定如何重试其余代码
  • 此外,畸形的数据库问题可能会发生在我想的其他部分?也许我需要做一个pragma快速检查

这是我经常看到的。。。。另一个问题是为什么我经常看到它?因为如果我从我的主机加载sqlite文件,它运行查询文件

pragma

query

(venv) dulanic@mediaserver:/opt/python_scripts/rpi$  cd /opt/python_scripts/rpi ; /usr/bin/env /opt/python_scripts/rpi/venv/bin/python /home/dulanic/.vscode-server/extensions/ms-python.python-2021.2.636928669/pythonFiles/lib/python/debugpy/launcher 37599 -- /opt/python_scripts/rpi/rpdb.py 
An error occurred: database disk image is malformed

这是我当前的代码:

#!/usr/bin/env python3
import psycopg2, sqlite3, sys, paramiko, sys, os, socket, time 

scpuser=os.getenv('scpuser')
scppw = os.getenv('scppw')
sqdb = os.getenv('sqdb')
sqlike = os.getenv('sqlike')
pgdb = os.getenv('pgdb')
pguser = os.getenv('pguser')
pgpswd = os.getenv('pgpswd')
pghost = os.getenv('pghost')
pgport = os.getenv('pgport')
pgschema = os.getenv('pgschema')
database = r"./pihole.db"
pihole = socket.gethostbyname('pi.hole')
tabnames=[]
tabgrab = ''

def pullsqlite(): 
    sftp.get('/etc/pihole/pihole-FTL.db','pihole.db')
    sftp.close()

# SFTP pull config
ssh_client=paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(hostname=pihole,username=scpuser,password=scppw)
sftp=ssh_client.open_sftp()

# Pull SQlite
pullsqlite()

# Load sqlite tables to list
consq=sqlite3.connect(sqdb)
cursq=consq.cursor()    
cursq.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name in ({sqlike})" )
tabgrab = cursq.fetchall()

# postgres connection
conpg = psycopg2.connect(database=pgdb, user=pguser, password=pgpswd,
                        host=pghost, port=pgport)

#Load data to postgres from sqlite

for item in tabgrab:
    tabnames.append(item[0])
    start = time.perf_counter() 

for table in tabnames:
    curpg = conpg.cursor()
    if table=='queries':
        curpg.execute(f"SELECT max(id)  FROM {table};")
        max_id = curpg.fetchone()[0]
        cursq.execute(f"SELECT * FROM {table} where id > {max_id};")
    else:
        cursq.execute(f"SELECT * FROM {table};")
    try:     
        rows=cursq.fetchall()
    except sqlite3.Error as e:
        print("An error occurred:", e.args[0])

    colcount=len(rows[0])
    pholder=('%s,'*colcount)[:-1]
 
    try:
 
        curpg.execute(f"SET search_path TO {pgschema};" )
        curpg.executemany(f"INSERT INTO {table} VALUES ({pholder}) ON CONFLICT DO NOTHING;" ,rows)
        conpg.commit()
        print(f'Inserted {len(rows)} rows into {table}')
 
    except psycopg2.DatabaseError as e:
        print (f'Error {e}') 
        sys.exit(1)

if 'start' in locals():
    elapsed = time.perf_counter() - start
    print(f'Time {elapsed:0.4}')
consq.close()

Tags: 数据库executedbsqliteostablescriptsrpi