类似于pandas的cogroup操作

1 投票
1 回答
902 浏览
提问于 2025-04-18 14:02

我在用pandas分析一个相当大的数据集(大约5GB)。我想把这些数据分成几个组,然后对每个组做一个笛卡尔积,最后再把结果汇总起来。

pandas的apply操作非常灵活,我可以先用group把数据分组,然后在每个组上用apply做笛卡尔积,最后用sum来汇总结果。不过,这种方法有个问题,就是apply不是懒惰的,它会在汇总之前计算所有中间结果,而这些中间结果(每个组的笛卡尔积)会非常大。

我在研究Apache Spark时发现了一个很有趣的操作叫cogroup。它的定义是:

当在类型为(K, V)和(K, W)的数据集上调用时,返回一个(K, Iterable, Iterable)的元组数据集。这个操作也叫做groupWith。

这似乎正是我想要的。如果我能先用cogroup,然后再用sum,那么中间结果就不会被扩展(假设cogroup的工作方式和group一样是懒惰的)。

在pandas中有没有类似cogroup的操作,或者我该如何高效地实现我的目标呢?

这是我的例子:

我想按id分组数据,然后对每个组做笛卡尔积,再按cluster_xcluster_y分组,最后用sum汇总count_xcount_y。下面的代码可以运行,但速度非常慢,而且消耗的内存太多。

# add dummy_key to do Cartesian product by merge
df['dummy_key'] = 1

def join_group(g):
    return pandas.merge(g, g, on='dummy_key')\
    [['cache_cluster_x', 'count_x', 'cache_cluster_y', 'count_y']]

df_count_stats = df.groupby(['id'], as_index=True).apply(join_group).\
    groupby(['cache_cluster_x', 'cache_cluster_y'], as_index=False)\
    [['count_x', 'count_y']].sum()

一个玩具数据集

   id cluster  count
0  i1       A      2
1  i1       B      3
2  i2       A      1
3  i2       B      4

apply之后的中间结果(可能很大)

     cluster_x  count_x cluster_y  count_y
id                                        
i1 0         A        2         A        2
   1         A        2         B        3
   2         B        3         A        2
   3         B        3         B        3
i2 0         A        1         A        1
   1         A        1         B        4
   2         B        4         A        1
   3         B        4         B        4

期望的最终结果

  cluster_x cluster_y  count_x  count_y
0         A         A        3        3
1         A         B        3        7
2         B         A        7        3
3         B         B        7        7

1 个回答

0

我第一次尝试没有成功,算是失败了一点:虽然我能通过在每个组内对笛卡尔积进行求和来限制内存使用,但速度比原来的要慢很多。不过针对你想要的输出,我觉得我们可以大大简化这个问题:

import numpy as np, pandas as pd

def fake_data(nids, nclusters, ntile):
    ids = ["i{}".format(i) for i in range(1,nids+1)]
    clusters = ["A{}".format(i) for i in range(nclusters)]
    df = pd.DataFrame(index=pd.MultiIndex.from_product([ids, clusters], names=["id", "cluster"]))
    df = df.reset_index()
    df = pd.concat([df]*ntile)
    df["count"] = np.random.randint(0, 10, size=len(df))
    return df


def join_group(g):
    m= pd.merge(g, g, on='dummy_key')
    return m[['cluster_x', 'count_x', 'cluster_y', 'count_y']]

def old_method(df):
    df["dummy_key"] = 1
    h1 = df.groupby(['id'], as_index=True).apply(join_group)
    h2 = h1.groupby(['cluster_x', 'cluster_y'], as_index=False)
    h3 = h2[['count_x', 'count_y']].sum()
    return h3

def new_method1(df):
    m1 = df.groupby("cluster", as_index=False)["count"].sum()
    m1["dummy_key"] = 1
    m2 = m1.merge(m1, on="dummy_key")
    m2 = m2.sort_index(axis=1).drop(["dummy_key"], axis=1)
    return m2

这段代码会给出(假设df是你的测试数据框):

>>> new_method1(df)
  cluster_x cluster_y  count_x  count_y
0         A         A        3        3
1         A         B        3        7
2         B         A        7        3
3         B         B        7        7
>>> df2 = fake_data(100, 100, 1)
>>> %timeit old_method(df2)
1 loops, best of 3: 954 ms per loop
>>> %timeit new_method1(df2)
100 loops, best of 3: 8.58 ms per loop
>>> (old_method(df2) == new_method1(df2)).all().all()
True

甚至可以这样做:

>>> df2 = fake_data(100, 100, 100)
>>> %timeit new_method1(df2)
10 loops, best of 3: 88.8 ms per loop

至于这是否能在你的实际情况中带来足够的改进,我就不太确定了。

撰写回答