如何高效迭代Pandas DataFrame的连续块

120 投票
8 回答
116978 浏览
提问于 2025-04-20 03:54

我有一个很大的数据表(有几百万行)。

我想对它进行分组操作,但不是根据每一行的某个特定属性来分组,而是想按任意连续的(最好是大小相等的)行来分组。

使用场景是:我想通过IPython中的并行映射对每一行应用一个函数。哪几行分到哪个后端引擎并不重要,因为这个函数是基于每一行单独计算结果的。(至少在概念上是这样;实际上是向量化的。)

我想出了类似这样的做法:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)

# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]

# Process chunks in parallel
results = dview.map_sync(my_function, groups)

但是这样做似乎太繁琐了,而且不能保证每个分组的大小相等。特别是当索引稀疏、不是整数或者其他情况时。

有没有更好的方法呢?

谢谢!

8 个回答

13

一个好的环境的标志就是有很多选择,所以我想分享一下来自 Anaconda Blaze 的内容,实际上是使用了 Odo

import blaze as bz
import pandas as pd

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
    # Do stuff with chunked dataframe
14

分块生成器函数,用于遍历 pandas 数据框和序列

下面展示了一个生成器版本的分块函数。此外,这个版本可以处理自定义索引的 pd.DataFrame 或 pd.Series(比如浮点类型的索引)。

import numpy as np
import pandas as pd

df_sz = 14

df = pd.DataFrame(
    np.random.rand(df_sz,4), 
    index=np.linspace(0., 10., num=df_sz),
    columns=['a', 'b', 'c', 'd'],
)

def chunker(seq, size):
    for pos in range(0, len(seq), size):
        yield seq.iloc[pos:pos + size] 

chunk_size = 6
for i in chunker(df, chunk_size):
    print(i)

chnk = chunker(df, chunk_size)
print('\n', chnk)
print(next(chnk))
print(next(chnk))
print(next(chnk))

输出结果是

                 a         b         c         d
0.000000  0.560627  0.665897  0.683055  0.611884
0.769231  0.241871  0.357080  0.841945  0.340778
1.538462  0.065009  0.234621  0.250644  0.552410
2.307692  0.431394  0.235463  0.755084  0.114852
3.076923  0.173748  0.189739  0.148856  0.031171
3.846154  0.772352  0.697762  0.557806  0.254476
                 a         b         c         d
4.615385  0.901200  0.977844  0.250316  0.957408
5.384615  0.400939  0.520841  0.863015  0.177043
6.153846  0.356927  0.344220  0.863067  0.400573
6.923077  0.375417  0.156420  0.897889  0.810083
7.692308  0.666371  0.152800  0.482446  0.955556
8.461538  0.242711  0.421591  0.005223  0.200596
                  a         b         c         d
9.230769   0.735748  0.402639  0.527825  0.595952
10.000000  0.420209  0.365231  0.966829  0.514409

- generator object chunker at 0x7f503c9d0ba0

First "next()":
                 a         b         c         d
0.000000  0.560627  0.665897  0.683055  0.611884
0.769231  0.241871  0.357080  0.841945  0.340778
1.538462  0.065009  0.234621  0.250644  0.552410
2.307692  0.431394  0.235463  0.755084  0.114852
3.076923  0.173748  0.189739  0.148856  0.031171
3.846154  0.772352  0.697762  0.557806  0.254476

Second "next()":
                 a         b         c         d
4.615385  0.901200  0.977844  0.250316  0.957408
5.384615  0.400939  0.520841  0.863015  0.177043
6.153846  0.356927  0.344220  0.863067  0.400573
6.923077  0.375417  0.156420  0.897889  0.810083
7.692308  0.666371  0.152800  0.482446  0.955556
8.461538  0.242711  0.421591  0.005223  0.200596

Third "next()":
                  a         b         c         d
9.230769   0.735748  0.402639  0.527825  0.595952
10.000000  0.420209  0.365231  0.966829  0.514409
64

我不确定这是不是你想要的,但我在另一个SO讨论帖上找到了这些分组函数,觉得它们在使用多处理池时挺有用的。

这里有个来自那个讨论帖的简单例子,可能能满足你的需求:

import numpy as np
import pandas as pds

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))

for i in chunker(df,5):
    print i

这个例子会给你类似这样的结果:

          a         b         c         d
0  0.860574  0.059326  0.339192  0.786399
1  0.029196  0.395613  0.524240  0.380265
2  0.235759  0.164282  0.350042  0.877004
3  0.545394  0.881960  0.994079  0.721279
4  0.584504  0.648308  0.655147  0.511390
          a         b         c         d
5  0.276160  0.982803  0.451825  0.845363
6  0.728453  0.246870  0.515770  0.343479
7  0.971947  0.278430  0.006910  0.888512
8  0.044888  0.875791  0.842361  0.890675
9  0.200563  0.246080  0.333202  0.574488
           a         b         c         d
10  0.971125  0.106790  0.274001  0.960579
11  0.722224  0.575325  0.465267  0.258976
12  0.574039  0.258625  0.469209  0.886768
13  0.915423  0.713076  0.073338  0.622967

希望这能帮到你。

编辑

在这个例子中,我用这个函数和处理器池大概是这样使用的:

from multiprocessing import Pool

nprocs = 4

pool = Pool(nprocs)

for chunk in chunker(df, nprocs):
    data = pool.map(myfunction, chunk)
    data.domorestuff()

我想这应该和使用IPython的分布式机制非常相似,但我还没有尝试过。

157

使用numpy的 array_split() 函数:

import numpy as np
import pandas as pd

data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
  assert len(chunk) == len(data) / 5, "This assert may fail for the last chunk if data lenght isn't divisible by 5"
59

在实际操作中,你不能保证每一块的大小都是一样的。比如说,如果行数(N)是一个质数,那么你只能把数据分成1块或者N块,无法做到均匀分配。因此,在现实中,通常会使用固定大小的块,最后一块可以小一点。我一般会把一个数组传给groupby。从这里开始:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
    0         1         2         3         4
0   0  0.746300  0.346277  0.220362  0.172680
0   1  0.657324  0.687169  0.384196  0.214118
0   2  0.016062  0.858784  0.236364  0.963389
[...]
0  13  0.510273  0.051608  0.230402  0.756921
0  14  0.950544  0.576539  0.642602  0.907850

[15 rows x 5 columns]

在这里,我故意把索引设置为0,让它没有太多信息。我们只需决定每块的大小(这里是10),然后用整数除法把数组分成几块:

>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
...     print(k,g)
...     
0    0         1         2         3         4
0  0  0.746300  0.346277  0.220362  0.172680
0  1  0.657324  0.687169  0.384196  0.214118
0  2  0.016062  0.858784  0.236364  0.963389
[...]
0  8  0.241049  0.246149  0.241935  0.563428
0  9  0.493819  0.918858  0.193236  0.266257

[10 rows x 5 columns]
1     0         1         2         3         4
0  10  0.037693  0.370789  0.369117  0.401041
0  11  0.721843  0.862295  0.671733  0.605006
[...]
0  14  0.950544  0.576539  0.642602  0.907850

[5 rows x 5 columns]

基于切片的DataFrame方法在索引不兼容时可能会失败,不过你总是可以使用.iloc[a:b]来忽略索引值,按位置访问数据。

撰写回答