使用psycopg2的二进制COPY表FROM

41 投票
2 回答
29156 浏览
提问于 2025-04-17 06:23

我有几千万行数据需要从多维数组文件转移到PostgreSQL数据库里。我使用的工具是Python和psycopg2。批量插入数据最有效的方法是用copy_from。不过,我的数据大多数是32位的浮点数(real或float4),所以我不想把它们从实数转换成文本再转换回实数。下面是一个数据库的DDL示例:

CREATE TABLE num_data
(
  id serial PRIMARY KEY NOT NULL,
  node integer NOT NULL,
  ts smallint NOT NULL,
  val1 real,
  val2 double precision
);

这是我用字符串(文本)在Python中的进展:

# Just one row of data
num_row = [23253, 342, -15.336734, 2494627.949375]

import psycopg2
# Python3:
from io import StringIO
# Python2, use: from cStringIO import StringIO

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# Convert floating point numbers to text, write to COPY input
cpy = StringIO()
cpy.write('\t'.join([repr(x) for x in num_row]) + '\n')

# Insert data; database converts text back to floating point numbers
cpy.seek(0)
curs.copy_from(cpy, 'num_data', columns=('node', 'ts', 'val1', 'val2'))
conn.commit()

有没有什么方法可以用二进制模式来实现呢?也就是说,能不能直接保持浮点数为二进制格式?这样不仅能保持浮点数的精度,而且可能会更快。

(注意:要看到和示例一样的精度,可以使用SET extra_float_digits='2'

2 个回答

0

对于那些仍然关心这个问题的人。

我需要用 COPY 导入 bytea 列。为了自己解决这个问题,我先用 COPY FROM 或者 psycopg2 的 copy_from() 将数据导入到一个临时表中。然后,我通过 UPDATE ... FROM 从临时表更新到实际的数据。在 UPDATE 语句中,我把以十六进制表示的字节解码成 SQL 中的实际 bytea 对象。

这种方法相比 PSQL 的二进制导入格式,唯一的好处是数据在传输时更容易被人理解,可能也更容易编写代码。

需要注意的是,psycopg3 提供了更高级的 COPY 功能,但我目前使用的是 SQLALchemy 1.4,也就是 psycopg2。

以下是我的 Python 示例:


        # Temp table is dropped at the end of the session
        # https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-temporary-table/
        # This must match data_as_dicts structure.
        # We will pass binary data as hex strings in an unprefixed format:
        # 01020304
        sql = f"""
        CREATE TEMP TABLE IF NOT EXISTS {temp_table_name}
        (
          id int,
          sync_event_id int,
          sync_reserve0 varchar(128), 
          sync_reserve1 varchar(128)
        );    
        """
        conn.execute(sql)

        # Clean any pending data in the temp table
        # between update chunks.
        # TODO: Not sure why this does not clear itself at conn.close()
        # as I would expect based on the documentation.
        sql = f"TRUNCATE {temp_table_name}"
        conn.execute(sql)

        # Load data from CSV to the temp table
        # https://www.psycopg.org/docs/cursor.html
        cursor = conn.connection.cursor()
        out.seek(0)
        cursor.copy_from(out, temp_table_name, sep=delim, columns=columns)

        # Fill real table from the temp table
        # This copies values from the temp table using
        # UPDATE...FROM and matching by the row id.
        # We will also reconstruct binary from the hex strings.
        sql = f"""
        UPDATE {real_table_name}  
        SET 
            sync_event_id=b.sync_event_id,
            sync_reserve0=decode(b.sync_reserve0, 'hex'),
            sync_reserve1=decode(b.sync_reserve1, 'hex')        
        FROM {temp_table_name} AS b 
        WHERE {real_table_name}.id=b.id;
        """
        res = conn.execute(sql)
        logger.debug("Updated %d rows", res.rowcount)
47

这里是Python 3中COPY FROM的二进制等价实现:

from io import BytesIO
from struct import pack
import psycopg2

# Two rows of data; "id" is not in the upstream data source
# Columns: node, ts, val1, val2
data = [(23253, 342, -15.336734, 2494627.949375),
        (23256, 348, 43.23524, 2494827.949375)]

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# Determine starting value for sequence
curs.execute("SELECT nextval('num_data_id_seq')")
id_seq = curs.fetchone()[0]

# Make a binary file object for COPY FROM
cpy = BytesIO()
# 11-byte signature, no flags, no header extension
cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))

# Columns: id, node, ts, val1, val2
# Zip: (column position, format, size)
row_format = list(zip(range(-1, 4),
                      ('i', 'i', 'h', 'f', 'd'),
                      ( 4,   4,   2,   4,   8 )))
for row in data:
    # Number of columns/fields (always 5)
    cpy.write(pack('!h', 5))
    for col, fmt, size in row_format:
        value = (id_seq if col == -1 else row[col])
        cpy.write(pack('!i' + fmt, size, value))
    id_seq += 1  # manually increment sequence outside of database

# File trailer
cpy.write(pack('!h', -1))

# Copy data to database
cpy.seek(0)
curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)

# Update sequence on database
curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))
conn.commit()

更新

我重新写了上面提到的COPY文件的方法。我的数据在Python中是以NumPy数组的形式存储的,所以使用这些数组是很合理的。这里有一些示例数据,包含100万行和7列:

import psycopg2
import numpy as np
from struct import pack
from io import BytesIO
from datetime import datetime

conn = psycopg2.connect("dbname=mydb user=postgres")
curs = conn.cursor()

# NumPy record array
shape = (7, 2000, 500)
print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))

dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +
         [('s' + str(x), 'f4') for x in range(shape[0])])
data = np.empty(shape[1]*shape[2], dtype)
data['id'] = np.arange(shape[1]*shape[2]) + 1
data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])
data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])
data['s0'] = np.random.rand(shape[1]*shape[2]) * 100
prv = 's0'
for nxt in data.dtype.names[4:]:
    data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10
    prv = nxt

在我的数据库中,有两个表看起来像这样:

CREATE TABLE num_data_binary
(
  id integer PRIMARY KEY,
  node integer NOT NULL,
  ts smallint NOT NULL,
  s0 real,
  s1 real,
  s2 real,
  s3 real,
  s4 real,
  s5 real,
  s6 real
) WITH (OIDS=FALSE);

还有一个类似的表叫做num_data_text

这里有一些简单的辅助函数,用来准备COPY的数据(包括文本和二进制格式),这些函数利用了NumPy记录数组中的信息:

def prepare_text(dat):
    cpy = BytesIO()
    for row in dat:
        cpy.write('\t'.join([repr(x) for x in row]) + '\n')
    return(cpy)

def prepare_binary(dat):
    pgcopy_dtype = [('num_fields','>i2')]
    for field, dtype in dat.dtype.descr:
        pgcopy_dtype += [(field + '_length', '>i4'),
                         (field, dtype.replace('<', '>'))]
    pgcopy = np.empty(dat.shape, pgcopy_dtype)
    pgcopy['num_fields'] = len(dat.dtype)
    for i in range(len(dat.dtype)):
        field = dat.dtype.names[i]
        pgcopy[field + '_length'] = dat.dtype[i].alignment
        pgcopy[field] = dat[field]
    cpy = BytesIO()
    cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
    cpy.write(pgcopy.tostring())  # all rows
    cpy.write(pack('!h', -1))  # file trailer
    return(cpy)

这是我如何使用这些辅助函数来对比这两种COPY格式的方法:

def time_pgcopy(dat, table, binary):
    print('Processing copy object for ' + table)
    tstart = datetime.now()
    if binary:
        cpy = prepare_binary(dat)
    else:  # text
        cpy = prepare_text(dat)
    tendw = datetime.now()
    print('Copy object prepared in ' + str(tendw - tstart) + '; ' +
          str(cpy.tell()) + ' bytes; transfering to database')
    cpy.seek(0)
    if binary:
        curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)
    else:  # text
        curs.copy_from(cpy, table)
    conn.commit()
    tend = datetime.now()
    print('Database copy time: ' + str(tend - tendw))
    print('        Total time: ' + str(tend - tstart))
    return

time_pgcopy(data, 'num_data_text', binary=False)
time_pgcopy(data, 'num_data_binary', binary=True)

以下是最后两个time_pgcopy命令的输出:

Processing copy object for num_data_text
Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database
Database copy time: 0:00:37.929166
        Total time: 0:01:53.217861
Processing copy object for num_data_binary
Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database
Database copy time: 0:00:23.325952
        Total time: 0:00:24.622095

所以,使用二进制方法时,NumPy到文件和文件到数据库的步骤都快得多。明显的区别在于Python准备COPY文件的方式,对于文本格式来说,这个过程非常慢。一般来说,对于这个模式,二进制格式加载到数据库的时间是文本格式的2/3。

最后,我比较了数据库中两个表的值,看看数字是否不同。大约1.46%的行在s0列的值不同,而在s6列中,这个比例增加到了6.17%(可能与我使用的随机方法有关)。在所有7000万条32位浮点值中,非零绝对差异的范围在9.3132257e-010到7.6293945e-006之间。这些文本和二进制加载方法之间的小差异是由于浮点数到文本再到浮点数的转换过程中精度的损失。

撰写回答