如何将DataFrame写入Postgres表
有一个叫做 DataFrame.to_sql 的方法,但它只适用于 mysql、sqlite 和 oracle 这几种数据库。我不能把 postgres 的连接或者 sqlalchemy 的引擎传给这个方法。
8 个回答
更快地将数据框(df)写入自定义结构的表格,带或不带索引:
"""
Faster way to write df to table.
Slower way is to use df.to_sql()
"""
from io import StringIO
from pandas import DataFrame
from sqlalchemy.engine.base import Engine
class WriteDfToTableWithIndexMixin:
@classmethod
def write_df_to_table_with_index(
cls,
df: DataFrame,
table_name: str,
schema_name: str,
engine: Engine
):
"""
Truncate existing table and load df into table.
Keep each column as string to avoid datatype conflicts.
"""
df.head(0).to_sql(table_name, engine, if_exists='replace',
schema=schema_name, index=True, index_label='id')
conn = engine.raw_connection()
cur = conn.cursor()
output = StringIO()
df.to_csv(output, sep='\t', header=False,
index=True, index_label='id')
output.seek(0)
contents = output.getvalue()
cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
conn.commit()
class WriteDfToTableWithoutIndexMixin:
@classmethod
def write_df_to_table_without_index(
cls,
df: DataFrame,
table_name: str,
schema_name: str,
engine: Engine
):
"""
Truncate existing table and load df into table.
Keep each column as string to avoid datatype conflicts.
"""
df.head(0).to_sql(table_name, engine, if_exists='replace',
schema=schema_name, index=False)
conn = engine.raw_connection()
cur = conn.cursor()
output = StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
conn.commit()
如果你的数据框中有一列是JSON格式的数据,使用上面的方法仍然可以正确加载所有数据,但这列JSON数据可能会变得很奇怪。所以把这个JSON列转换成::json
可能会出错。你需要使用to_sql()
。为了加快速度,可以加上method=multi
,并且设置chunksize
来防止你的电脑卡住:
df.to_sql(table_name, engine, if_exists='replace', schema=schema_name, index=False, method='multi', chunksize=1000)
这是我怎么做的。
这样做可能会更快,因为它使用了 execute_batch
:
# df is the dataframe
if len(df) > 0:
df_columns = list(df)
# create (col1,col2,...)
columns = ",".join(df_columns)
# create VALUES('%s', '%s",...) one '%s' per column
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
#create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)
cur = conn.cursor()
psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
conn.commit()
cur.close()
Pandas 0.24.0+ 的解决方案
在 Pandas 0.24.0 版本中,新增了一个专门为快速写入 Postgres 数据库而设计的功能。你可以在这里了解更多信息: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
import csv
from io import StringIO
from sqlalchemy import create_engine
def psql_insert_copy(table, conn, keys, data_iter):
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)
engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)
更快的选项:
下面的代码可以比使用 df.to_sql 方法更快地将你的 Pandas 数据框(DF)复制到 Postgres 数据库,而且你不需要任何中间的 CSV 文件来存储数据框。
首先,根据你的数据库设置创建一个引擎。
然后,在你的 Postgres 数据库中创建一个表,这个表的列数要和数据框(df)一样。
数据框中的数据将会被 插入 到你的 Postgres 表中。
from sqlalchemy import create_engine
import psycopg2
import io
如果你想替换这个表,可以使用普通的 to_sql 方法,利用数据框中的表头,然后将整个耗时较长的数据框加载到数据库中。
engine = create_engine(
'postgresql+psycopg2://username:password@host:port/database')
# Drop old table and create new empty table
df.head(0).to_sql('table_name', engine, if_exists='replace',index=False)
conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()
cur.close()
conn.close()
从pandas 0.14版本开始(2014年5月底发布),它开始支持postgresql数据库。现在的sql
模块使用sqlalchemy
来支持不同类型的数据库。你可以为postgresql数据库传递一个sqlalchemy引擎(具体可以查看文档)。例如:
from sqlalchemy import create_engine
engine = create_engine('postgresql://username:password@localhost:5432/mydatabase')
df.to_sql('table_name', engine)
你说得对,在pandas 0.13.1版本之前是没有支持postgresql的。如果你需要使用旧版本的pandas,这里有一个修补过的pandas.io.sql
版本:https://gist.github.com/jorisvandenbossche/10841234。
我写这个的时候已经有一段时间了,所以不能完全保证它总是有效,但基本功能应该是可以的。如果你把这个文件放在你的工作目录里并导入它,那么你应该能够做到(其中con
是一个postgresql连接):
import sql # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')