Python Psycopg2 循环处理大数据库问题

1 投票
3 回答
4532 浏览
提问于 2025-04-18 07:43

我正在尝试用psycopg2和Python来处理一个8GB的大数据库。我按照文档的说明去做,但遇到了一个错误。我想逐行读取数据库里的数据,但不想用.fetchall(),因为一次性把所有数据都加载到内存里太大了。也不能用.fetchone(),因为它会一个一个地取出每一列的数据。

需要注意的是,第一次循环时会返回一个值,但第二次循环时就会出现错误。

文档上是这么写的:

Note cursor objects are iterable, so, instead of calling explicitly fetchone() in a loop, the object itself can be used:
>>> cur.execute("SELECT * FROM test;")
>>> for record in cur:
...     print record
...
(1, 100, "abc'def")
(2, None, 'dada')
(3, 42, 'bar')

我的代码是:

statement = ("select source_ip,dest_ip,bytes,datetime from IPS")
cursor.execute(statement)

for sip,dip,bytes,datetime in cursor:
    if sip in cidr:
        ip = sip
        in_bytes = bytes
        out_bytes = 0
        time = datetime
    else:
        ip = dip
        out_bytes = bytes
        in_bytes = 0
        time = datetime    
    cursor.execute("INSERT INTO presum (ip, in_bytes, out_bytes, datetime) VALUES (%s,%s,%s,%s);", (ip, in_bytes, out_bytes, time,))
    conn.commit()
    print "writing to presum"

然后我得到了以下错误:

for sip,dip,bytes,datetime in cursor: psycopg2.ProgrammingError: no results to fetch

3 个回答

0

我对这个问题很感兴趣。我觉得你可以试试使用 cursor.fetchmany(size) 这个方法。比如:

cursor.execute("select * from large_table")

# Set the max number of rows to fetch at each iteration
max_rows = 100
while 1:
  rows = cursor.fetchmany(max_rows)
  if len(rows) == 0:
     break
  else:
     for arow in rows:
        # do some processing of the row

这样做可能对你有帮助吧?

1

你在这里的循环里改变了结果集

cursor.execute("INSERT INTO presum (ip, in_bytes, out_bytes, datetime) VALUES (%s,%s,%s,%s);", (ip, in_bytes, out_bytes, time,))

其实可以全部在sql里完成

statement = """
    insert into presum (ip, in_bytes, out_bytes, datetime)

    select source_ip, bytes, 0, datetime
    from IPS
    where source_ip << %(cidr)s

    union all

    select dest_ip, 0, bytes, datetime
    from IPS
    where not source_ip << %(cidr)s
"""

cidr = IP('200.90.230/23')

cursor.execute(statement, {'cidr': cidr.strNormal()})
conn.commit()

我假设source_ipinet类型的。<<这个操作符是用来检查一个inet地址是否在某个子网内

1

看起来你把一个元组传给了cursor.execute。试着传入你想要执行的SQL字符串。

statement = "select source_ip,dest_ip,bytes,datetime from IPS"
cursor.execute(statement)

撰写回答