并行化在pandas groupby之后应用

2024-05-16 11:30:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用了rosetta.parallel.pandas_易于并行化在分组后应用,例如:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)

但是,有人知道如何并行化返回数据帧的函数吗?如预期的那样,这段代码对于rosetta是失败的。

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)

Tags: totruepandasdfindexparallelusejobs
3条回答

Ivan的回答很好,但看起来可以稍微简化一下,也不需要依赖joblib:

from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pandas.concat(ret_list)

顺便说一下:这不能替换任何groupby.apply(),但它将覆盖典型的情况:例如,它应该覆盖情况2和3in the documentation,而您应该通过将参数axis=1赋给最后的pandas.concat()调用来获得情况1的行为。

这似乎是可行的,尽管它真的应该建立在熊猫身上

import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

if __name__ == '__main__':
    df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
    print 'parallel version: '
    print applyParallel(df.groupby(df.index), tmpFunc)

    print 'regular version: '
    print df.groupby(df.index).apply(tmpFunc)

    print 'ideal version (does not work): '
    print df.groupby(df.index).applyParallel(tmpFunc)

我有一个在熊猫身上实现并行化的方法。我将数据帧分成块,将每个块放入列表元素中,然后使用ipython的并行位对数据帧列表执行并行应用。然后我使用pandasconcat函数将列表重新组合在一起。

然而,这通常并不适用。它对我有效,因为我要应用到数据帧的每个块的函数大约需要一分钟。把我的数据拆开放在一起不会花那么长时间。所以这显然是个骗局。尽管如此,这里有一个例子。我正在使用Ipython笔记本,所以您将在我的代码中看到%%time魔力:

## make some example data
import pandas as pd

np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 
                   'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')

在这个例子中,我将基于上面的groupby生成“chunks”,但这不一定是数据的分块方式。虽然这是一个很常见的模式。

dflist = []
for name, group in grouped:
    dflist.append(group)

设置并行位

from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True

编写一个愚蠢的函数来应用于我们的数据

def myFunc(inDf):
    inDf['newCol'] = inDf.data ** 10
    return inDf

现在让我们先串行运行代码,然后并行运行代码。 序列号第一:

%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s

现在平行

%%time
parallel_list = lview.map(myFunc, dflist)

CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s

然后只需几毫秒就可以将它们合并回一个数据帧

%%time
combinedDf = pd.concat(parallel_list)
 CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms

我在我的MacBook上运行6个IPython引擎,但是你可以看到它将执行时间从14秒减少到2秒

对于真正长时间运行的随机模拟,我可以通过使用StarCluster启动集群来使用AWS后端。然而,大多数时候,我在MBP上只并行8个cpu。

相关问题 更多 >