类似于pandas的cogroup操作
我在用pandas分析一个相当大的数据集(大约5GB)。我想把这些数据分成几个组,然后对每个组做一个笛卡尔积,最后再把结果汇总起来。
pandas的apply
操作非常灵活,我可以先用group
把数据分组,然后在每个组上用apply
做笛卡尔积,最后用sum
来汇总结果。不过,这种方法有个问题,就是apply
不是懒惰的,它会在汇总之前计算所有中间结果,而这些中间结果(每个组的笛卡尔积)会非常大。
我在研究Apache Spark时发现了一个很有趣的操作叫cogroup
。它的定义是:
当在类型为(K, V)和(K, W)的数据集上调用时,返回一个(K, Iterable, Iterable)的元组数据集。这个操作也叫做groupWith。
这似乎正是我想要的。如果我能先用cogroup
,然后再用sum
,那么中间结果就不会被扩展(假设cogroup
的工作方式和group
一样是懒惰的)。
在pandas中有没有类似cogroup
的操作,或者我该如何高效地实现我的目标呢?
这是我的例子:
我想按id
分组数据,然后对每个组做笛卡尔积,再按cluster_x
和cluster_y
分组,最后用sum
汇总count_x
和count_y
。下面的代码可以运行,但速度非常慢,而且消耗的内存太多。
# add dummy_key to do Cartesian product by merge
df['dummy_key'] = 1
def join_group(g):
return pandas.merge(g, g, on='dummy_key')\
[['cache_cluster_x', 'count_x', 'cache_cluster_y', 'count_y']]
df_count_stats = df.groupby(['id'], as_index=True).apply(join_group).\
groupby(['cache_cluster_x', 'cache_cluster_y'], as_index=False)\
[['count_x', 'count_y']].sum()
一个玩具数据集
id cluster count
0 i1 A 2
1 i1 B 3
2 i2 A 1
3 i2 B 4
在apply
之后的中间结果(可能很大)
cluster_x count_x cluster_y count_y
id
i1 0 A 2 A 2
1 A 2 B 3
2 B 3 A 2
3 B 3 B 3
i2 0 A 1 A 1
1 A 1 B 4
2 B 4 A 1
3 B 4 B 4
期望的最终结果
cluster_x cluster_y count_x count_y
0 A A 3 3
1 A B 3 7
2 B A 7 3
3 B B 7 7
1 个回答
0
我第一次尝试没有成功,算是失败了一点:虽然我能通过在每个组内对笛卡尔积进行求和来限制内存使用,但速度比原来的要慢很多。不过针对你想要的输出,我觉得我们可以大大简化这个问题:
import numpy as np, pandas as pd
def fake_data(nids, nclusters, ntile):
ids = ["i{}".format(i) for i in range(1,nids+1)]
clusters = ["A{}".format(i) for i in range(nclusters)]
df = pd.DataFrame(index=pd.MultiIndex.from_product([ids, clusters], names=["id", "cluster"]))
df = df.reset_index()
df = pd.concat([df]*ntile)
df["count"] = np.random.randint(0, 10, size=len(df))
return df
def join_group(g):
m= pd.merge(g, g, on='dummy_key')
return m[['cluster_x', 'count_x', 'cluster_y', 'count_y']]
def old_method(df):
df["dummy_key"] = 1
h1 = df.groupby(['id'], as_index=True).apply(join_group)
h2 = h1.groupby(['cluster_x', 'cluster_y'], as_index=False)
h3 = h2[['count_x', 'count_y']].sum()
return h3
def new_method1(df):
m1 = df.groupby("cluster", as_index=False)["count"].sum()
m1["dummy_key"] = 1
m2 = m1.merge(m1, on="dummy_key")
m2 = m2.sort_index(axis=1).drop(["dummy_key"], axis=1)
return m2
这段代码会给出(假设df
是你的测试数据框):
>>> new_method1(df)
cluster_x cluster_y count_x count_y
0 A A 3 3
1 A B 3 7
2 B A 7 3
3 B B 7 7
>>> df2 = fake_data(100, 100, 1)
>>> %timeit old_method(df2)
1 loops, best of 3: 954 ms per loop
>>> %timeit new_method1(df2)
100 loops, best of 3: 8.58 ms per loop
>>> (old_method(df2) == new_method1(df2)).all().all()
True
甚至可以这样做:
>>> df2 = fake_data(100, 100, 100)
>>> %timeit new_method1(df2)
10 loops, best of 3: 88.8 ms per loop
至于这是否能在你的实际情况中带来足够的改进,我就不太确定了。