如何将DataFrame写入Postgres表

183 投票
8 回答
292190 浏览
提问于 2025-04-18 02:56

有一个叫做 DataFrame.to_sql 的方法,但它只适用于 mysql、sqlite 和 oracle 这几种数据库。我不能把 postgres 的连接或者 sqlalchemy 的引擎传给这个方法。

8 个回答

2

更快地将数据框(df)写入自定义结构的表格,带或不带索引:

"""
Faster way to write df to table.
Slower way is to use df.to_sql()
"""

from io import StringIO

from pandas import DataFrame
from sqlalchemy.engine.base import Engine


class WriteDfToTableWithIndexMixin:
    @classmethod
    def write_df_to_table_with_index(
            cls,
            df: DataFrame,
            table_name: str,
            schema_name: str,
            engine: Engine
    ):
        """
        Truncate existing table and load df into table.
        Keep each column as string to avoid datatype conflicts.
        """
        df.head(0).to_sql(table_name, engine, if_exists='replace',
                          schema=schema_name, index=True, index_label='id')

        conn = engine.raw_connection()
        cur = conn.cursor()
        output = StringIO()
        df.to_csv(output, sep='\t', header=False,
                  index=True, index_label='id')
        output.seek(0)
        contents = output.getvalue()
        cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
        conn.commit()


class WriteDfToTableWithoutIndexMixin:
    @classmethod
    def write_df_to_table_without_index(
            cls,
            df: DataFrame,
            table_name: str,
            schema_name: str,
            engine: Engine
    ):
        """
        Truncate existing table and load df into table.
        Keep each column as string to avoid datatype conflicts.
        """
        df.head(0).to_sql(table_name, engine, if_exists='replace',
                          schema=schema_name, index=False)

        conn = engine.raw_connection()
        cur = conn.cursor()
        output = StringIO()
        df.to_csv(output, sep='\t', header=False, index=False)
        output.seek(0)
        contents = output.getvalue()
        cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
        conn.commit()

如果你的数据框中有一列是JSON格式的数据,使用上面的方法仍然可以正确加载所有数据,但这列JSON数据可能会变得很奇怪。所以把这个JSON列转换成::json可能会出错。你需要使用to_sql()。为了加快速度,可以加上method=multi,并且设置chunksize来防止你的电脑卡住:

df.to_sql(table_name, engine, if_exists='replace', schema=schema_name, index=False, method='multi', chunksize=1000)
44

这是我怎么做的。

这样做可能会更快,因为它使用了 execute_batch

# df is the dataframe
if len(df) > 0:
    df_columns = list(df)
    # create (col1,col2,...)
    columns = ",".join(df_columns)

    # create VALUES('%s', '%s",...) one '%s' per column
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 

    #create INSERT INTO table (columns) VALUES('%s',...)
    insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)

    cur = conn.cursor()
    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
    conn.commit()
    cur.close()
52

Pandas 0.24.0+ 的解决方案

在 Pandas 0.24.0 版本中,新增了一个专门为快速写入 Postgres 数据库而设计的功能。你可以在这里了解更多信息: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

import csv
from io import StringIO

from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)
147

更快的选项:

下面的代码可以比使用 df.to_sql 方法更快地将你的 Pandas 数据框(DF)复制到 Postgres 数据库,而且你不需要任何中间的 CSV 文件来存储数据框。

首先,根据你的数据库设置创建一个引擎。

然后,在你的 Postgres 数据库中创建一个表,这个表的列数要和数据框(df)一样。

数据框中的数据将会被 插入 到你的 Postgres 表中。

from sqlalchemy import create_engine
import psycopg2 
import io

如果你想替换这个表,可以使用普通的 to_sql 方法,利用数据框中的表头,然后将整个耗时较长的数据框加载到数据库中。

engine = create_engine(
    'postgresql+psycopg2://username:password@host:port/database')

# Drop old table and create new empty table
df.head(0).to_sql('table_name', engine, if_exists='replace',index=False)

conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()
cur.close()
conn.close()
242

从pandas 0.14版本开始(2014年5月底发布),它开始支持postgresql数据库。现在的sql模块使用sqlalchemy来支持不同类型的数据库。你可以为postgresql数据库传递一个sqlalchemy引擎(具体可以查看文档)。例如:

from sqlalchemy import create_engine
engine = create_engine('postgresql://username:password@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

你说得对,在pandas 0.13.1版本之前是没有支持postgresql的。如果你需要使用旧版本的pandas,这里有一个修补过的pandas.io.sql版本:https://gist.github.com/jorisvandenbossche/10841234
我写这个的时候已经有一段时间了,所以不能完全保证它总是有效,但基本功能应该是可以的。如果你把这个文件放在你的工作目录里并导入它,那么你应该能够做到(其中con是一个postgresql连接):

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

撰写回答