在Python中使用psycopg2的Postgres COPY管道

8 投票
2 回答
7098 浏览
提问于 2025-04-16 21:56

我正在写一个脚本,用来在同一网络上的两台机器之间复制一些数据,使用的是psycopg2库。我想用这个新的方法替代一些老旧且难看的bash脚本来完成复制。

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN"

这个方法看起来既简单又高效,在Python中用stringIO或者临时文件来实现也很容易,像这样:

buf = StringIO()

from_curs   = from_conn.cursor()
to_curs     = to_conn.cursor()

from_curs.copy_expert("COPY table TO STDOUT", buf)
buf.seek(0, os.SEEK_SET)
to_curs.copy_expert("COPY table FROM STDIN", buf)

...不过这需要把所有数据先保存到磁盘或者内存中。

有没有人找到一种方法,可以在这样的复制过程中模拟Unix管道的行为?我找不到不涉及POpen的unix管道对象——也许最好的解决方案还是使用POpen和subprocess呢。

2 个回答

0

你可以使用一个你自己扩展的双端队列(deque),来支持读和写操作:

from collections import deque
from Exceptions import IndexError

class DequeBuffer(deque):
    def write(self, data):
        self.append(data)
    def read(self):
        try:
            return self.popleft()
        except IndexError:
            return ''

buf = DequeBuffer()

如果读取的速度远远快于写入的速度,并且数据表很大,那么这个双端队列会变得很大,但它的大小会比存储整个数据表要小。

另外,我不太确定当双端队列为空时,return ''是否安全,而不是一直重试直到不为空,但我猜应该是安全的。如果你试过了,告诉我结果如何。

记得在确认复制完成后使用del buf,特别是当脚本在那时并不是直接退出的时候。

16

你需要把你的一些调用放到一个单独的线程里。我刚刚意识到你可以使用 os.pipe(),这样后面的事情就简单多了:

#!/usr/bin/python
import psycopg2
import os
import threading

fromdb = psycopg2.connect("dbname=from_db")
todb = psycopg2.connect("dbname=to_db")

r_fd, w_fd = os.pipe()

def copy_from():
    cur = todb.cursor()
    cur.copy_from(os.fdopen(r_fd), 'table')
    cur.close()
    todb.commit()

to_thread = threading.Thread(target=copy_from)
to_thread.start()

cur = fromdb.cursor()
write_f = os.fdopen(w_fd, 'w')
cur.copy_to(write_f, 'table')
write_f.close()   # or deadlock...

to_thread.join()

撰写回答