从远程服务器提取大量数据到DataFrame

9 投票
4 回答
10481 浏览
提问于 2025-04-19 11:46

为了提供尽可能多的背景信息,我正在尝试从一个远程的Postgres服务器(Heroku)中提取一些数据,并使用psycopg2来连接。

我对两个特定的表格感兴趣,分别是usersevents,连接是正常的,因为在提取用户数据时,

import pandas.io.sql as sql 
# [...]
users = sql.read_sql("SELECT * FROM users", conn)

等了几秒钟后,数据框(DataFrame)如预期返回了。

<class 'pandas.core.frame.DataFrame'>
Int64Index: 67458 entries, 0 to 67457
Data columns (total 35 columns): [...]

但是当我尝试直接从iPython提取更大、更复杂的events数据时,经过很长时间,它就崩溃了:

In [11]: events = sql.read_sql("SELECT * FROM events", conn)
vagrant@data-science-toolbox:~$

而当我在iPython笔记本中尝试时,出现了Dead kernel的错误。

内核已死,您想要重启它吗?如果您不重启内核,您可以保存笔记本,但在重新打开笔记本之前,运行代码将无法工作。


更新 #1:

为了更好地了解我尝试提取的events表的大小,这里是每个表的记录数和属性数:

In [11]: sql.read_sql("SELECT count(*) FROM events", conn)
Out[11]:
     count
0  2711453

In [12]: len(sql.read_sql("SELECT * FROM events LIMIT 1", conn).columns)
Out[12]: 18

更新 #2:

内存绝对是当前read_sql实现的瓶颈:在提取events数据并尝试运行另一个iPython实例时,结果是

vagrant@data-science-toolbox:~$ sudo ipython
-bash: fork: Cannot allocate memory

更新 #3:

我最开始尝试使用read_sql_chunked的实现,这样可以返回部分数据框的数组:

def read_sql_chunked(query, conn, nrows, chunksize=1000):
    start = 0
    dfs = []
    while start < nrows:
        df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), conn)
        start += chunksize
        dfs.append(df)
        print "Events added: %s to %s of %s" % (start-chunksize, start, nrows)
    # print "concatenating dfs"
    return dfs

event_dfs = read_sql_chunked("SELECT * FROM events", conn, events_count, 100000)

这个方法效果不错,但在尝试合并数据框时,内核又崩溃了。
而且这是在给虚拟机分配了2GB内存之后。

根据Andy对read_sqlread_csv在实现和性能上的解释,我接下来尝试将记录追加到一个CSV文件中,然后再将它们全部读入一个数据框:

event_dfs[0].to_csv(path+'new_events.csv', encoding='utf-8')

for df in event_dfs[1:]:
    df.to_csv(path+'new_events.csv', mode='a', header=False, encoding='utf-8')

再次,写入CSV成功完成——生成了一个657MB的文件——但从CSV读取时却从未完成。

如何估算读取一个657MB的CSV文件所需的内存,因为2GB似乎不够用?


感觉我对数据框或psycopg2的基本理解有些欠缺,但我卡住了,甚至无法确定瓶颈在哪里或如何优化。

从远程(Postgres)服务器提取大量数据的正确策略是什么?

4 个回答

0

使用https://github.com/sfu-db/connector-x,你可以实现更快的数据加载速度:

在他们的说明文档中提到:

ConnectorX可以让你以最快和最节省内存的方式将数据库中的数据加载到Python中。

你只需要一行代码:

import connectorx as cx

cx.read_sql("postgresql://username:password@server:port/database", "SELECT * FROM lineitem")

如果你想更快地加载数据,可以通过指定一个分区列来使用并行处理。

import connectorx as cx

cx.read_sql("postgresql://username:password@server:port/database", "SELECT * FROM lineitem", partition_on="l_orderkey", partition_num=10)

这个功能会根据你指定的列,将查询结果均匀分成多个部分。ConnectorX会为每个部分分配一个线程,来并行加载和写入数据。

注意:我自己没有使用过这个工具,但我看到我朋友的项目中使用connector-x后,性能有了很大的提升。


虽然这与问题没有直接关系,但如果查询比较复杂,使用connector-x可能会有一些额外的开销,具体可以查看常见问题

在这种情况下,使用Arrow作为中间目的地可能会更快。 (Arrow可以通过pip install pyarrow安装)

table = cx.read_sql(db_uri, query, return_type="arrow") # or arrow2 https://github.com/jorgecarleitao/arrow2
df = table.to_pandas(split_blocks=False, date_as_object=False)
0

这里有一个简单的游标示例,可能对你有帮助:

首先,我们需要导入一个叫做psycopg2的库。

接着,我们还要导入psycopg2的额外功能库!

然后,我们再导入系统库。

接下来是一个主函数:

在这个函数里,我们定义了一个连接字符串,这个字符串包含了连接数据库所需的信息,比如主机名、数据库名、用户名和密码。

### 这里我们打印出将要用来连接的连接字符串

conn = psycopg2.connect(conn_string)

### HERE IS THE IMPORTANT PART, by specifying a name for the cursor
### psycopg2 creates a server-side cursor, which prevents all of the
### records from being downloaded at once from the server.
cursor = conn.cursor('cursor_unique_name', cursor_factory=psycopg2.extras.DictCursor)
cursor.execute('SELECT * FROM my_table LIMIT 1000')

### Because cursor objects are iterable we can just call 'for - in' on
### the cursor object and the cursor will automatically advance itself
### each iteration.
### This loop should run 1000 times, assuming there are at least 1000
### records in 'my_table'
row_count = 0
for row in cursor:
    row_count += 1
    print "row: %s    %s\n" % (row_count, row)

最后,如果这个文件是直接运行的,就调用主函数。

0

试着使用 pandas:

mysql_cn = mysql.connector.connect(host='localhost', port=123, user='xyz',  passwd='****', db='xy_db')**

data= pd.read_sql('SELECT * FROM table;', con=mysql_cn)

mysql_cn.close()

对我来说,这个方法有效。

5

我觉得这里有几个相关的原因导致速度慢:

  1. read_sql是用Python写的,所以速度有点慢(特别是跟read_csv比,后者是用Cython写的,速度优化得很好!),而且它依赖于sqlalchemy,而不是一些可能更快的C-DBAPI。之所以选择sqlalchemy,是为了将来更容易迁移(还有跨SQL平台的支持)。
  2. 你可能内存不够,因为内存中有太多Python对象(这和不使用C-DBAPI有关),但这个问题可能可以解决……

我认为直接的解决办法是使用分块处理(而且有一个功能请求希望在pandas的read_sqlread_sql_table中原生支持这个功能)。

编辑:从Pandas v0.16.2开始,这种分块处理在read_sql中已经原生实现了。


因为你在使用Postgres,所以可以使用LIMIT和OFFSET查询,这让分块处理变得很简单。(我没记错的话,这些在所有SQL语言中都不一定有吧?)

首先,获取你表中的行数(或者一个估算值):

nrows = con.execute('SELECT count(*) FROM users').fetchone()[0]  # also works with an sqlalchemy engine

用这个来遍历表(为了调试,你可以加一些打印语句来确认它在正常工作/没有崩溃!),然后把结果合并起来:

def read_sql_chunked(query, con, nrows, chunksize=1000):
    start = 1
    dfs = []  # Note: could probably make this neater with a generator/for loop
    while start < nrows:
        df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), con)
        dfs.append(df)
    return pd.concat(dfs, ignore_index=True)

注意:这假设数据库可以放进内存!如果不行,你需要对每个分块进行处理(像mapreduce那样)……或者投资更多内存!

撰写回答