如何将数据从SQLite复制到PostgreSQL?

0 投票
1 回答
66 浏览
提问于 2025-04-11 21:49

我有一个任务,需要把一个SQLite表里的数据复制到一个几乎相同的PostgreSQL表里。我写的代码不工作,但我搞不清楚问题出在哪里。它显示连接成功,没有错误信息,但当我去终端检查content.film_work表时,用命令SELECT * FROM content.film_work;查看数据,却什么也看不见。显示的是:

movies_database=# SELECT * FROM content.film_work;



created | modified | id | title | description | creation_date | rating | type | file_path 
---------+----------+----+-------+-------------+---------------+--------+------+-----------
(0 rows)

以下是一些步骤:

  1. 先连接到SQLite数据库,把film_work表里的数据复制到数据类中。
  2. 把数据类实例的列表(也就是表的行)传递给另一个函数,在那里我连接到PostgreSQL。
  3. 连接到PostgreSQL,获取PostgreSQL表(content.film_work)的列名,以便传递给INSERT SQL查询。
  4. 在这一切之前,我调整了列的顺序,以便正确地将数据从SQLite传递到PostgreSQL。

SQLite表(film_work):

0|id|TEXT|0||1
1|title|TEXT|1||0
2|description|TEXT|0||0
3|creation_date|DATE|0||0
4|file_path|TEXT|0||0
5|rating|FLOAT|0||0
6|type|TEXT|1||0
7|created_at|timestamp with time zone|0||0
8|updated_at|timestamp with time zone|0||0

PostgreSQL表(content.film_work):

 created | modified | id | title | description | creation_date | rating | type | file_path

代码片段:

psycopg2.extras.register_uuid()
db_path = 'db.sqlite'

@contextmanager
def conn_context(db_path: str):
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row

    try:
        yield conn
    finally:
        conn.close() 

@dataclass
class FilmWork:
    created_at: date = None
    updated_at: date = None
    id: uuid.UUID = field(default_factory=uuid.uuid4)
    title: str = ''
    description: str = ''
    creation_date: date = None
    rating: float = 0.0
    type: str = ''
    file_path: str = ''


def __post_init__(self):
    if self.creation_date is None:
        self.creation_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    if self.created_at is None:
        self.created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    if self.updated_at is None:
        self.updated_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    if self.description is None:
        self.description = 'Нет описания'
    if self.rating is None:
        self.rating = 0.0


def copy_from_sqlite():
    with conn_context(db_path) as connection:
        cursor = connection.cursor()
        cursor.execute("SELECT * FROM film_work;")

        result = cursor.fetchall()

        films = [FilmWork(**dict(film)) for film in result]

        save_film_work_to_postgres(films)

def save_film_work_to_postgres(films: list):
    psycopg2.extras.register_uuid()
    dsn = {
        'dbname': 'movies_database',
        'user': 'app',
        'password': '123qwe',
        'host': 'localhost',
        'port': 5432,
        'options': '-c search_path=content',
    }

    try:
        conn = psycopg2.connect(**dsn)
        print("Successfull connection!")

        with conn.cursor() as cursor:
            cursor.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = 'film_work' ORDER BY ordinal_position;")
            column_names_list = [row[0] for row in cursor.fetchall()]            
            column_names_str = ','.join(column_names_list)

            col_count = ', '.join(['%s'] * len(column_names_list)) 
            bind_values = ','.join(cursor.mogrify(f"({col_count})", astuple(film)).decode('utf-8') for film in films)

            cursor.execute(f"""INSERT INTO content.film_work ({column_names_str}) VALUES {bind_values} """)
    except psycopg2.Error as _e:
        print("Ошибка:", _e)
    finally:
        if conn is not None:
            conn.close()
    

copy_from_sqlite()

1 个回答

2

检查一下这段代码是否正确执行,我做了一些修改:数据格式调整、字段初始化,以及修正了列名不匹配的问题。

import sqlite3
import psycopg2
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from psycopg2 import extras
from uuid import UUID

psycopg2.extras.register_uuid()
db_path = 'db.sqlite'

@contextmanager
def conn_context(db_path: str):
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row

    try:
        yield conn
    finally:
        conn.close() 

@dataclass
class FilmWork:
    created_at: datetime = None
    updated_at: datetime = None
    id: UUID = field(default_factory=uuid.uuid4)
    title: str = ''
    description: str = ''
    creation_date: datetime = None
    rating: float = 0.0
    type: str = ''
    file_path: str = ''

    def __post_init__(self):
        if self.creation_date is None:
            self.creation_date = datetime.now()
        if self.created_at is None:
            self.created_at = datetime.now()
        if self.updated_at is None:
            self.updated_at = datetime.now()
        if self.description is None:
            self.description = 'Нет описания'
        if self.rating is None:
            self.rating = 0.0

def copy_from_sqlite():
    with conn_context(db_path) as connection:
        cursor = connection.cursor()
        cursor.execute("SELECT * FROM film_work;")

        result = cursor.fetchall()

        films = [FilmWork(**dict(film)) for film in result]

        save_film_work_to_postgres(films)

def save_film_work_to_postgres(films: list):
    dsn = {
        'dbname': 'movies_database',
        'user': 'app',
        'password': '123qwe',
        'host': 'localhost',
        'port': 5432,
        'options': '-c search_path=content',
    }

    try:
        conn = psycopg2.connect(**dsn)
        print("Successful connection!")

        with conn.cursor() as cursor:
            cursor.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = 'film_work' ORDER BY ordinal_position;")
            column_names_list = [row[0] for row in cursor.fetchall()]            
            column_names_str = ','.join(column_names_list)

            col_count = ', '.join(['%s'] * len(column_names_list)) 
            bind_values = ','.join(cursor.mogrify(f"({col_count})", film).decode('utf-8') for film in films)

            cursor.execute(f"""INSERT INTO content.film_work ({column_names_str}) VALUES {bind_values} """)
            conn.commit()  # Don't forget to commit changes
    except psycopg2.Error as _e:
        print("Ошибка:", _e)
    finally:
        if conn is not None:
            conn.close()

copy_from_sqlite()

撰写回答