将concurrent.futures.ProcessPoolExecutor与DataFrame.GroupBy一起使用

2024-05-14 14:58:46 发布

您现在位置:Python中文网/ 问答频道 /正文

这可能是一个常见的问题,但我在网上找不到任何好的/最新的解决方案。我目前正在为n家公司开发一个庞大的数据框架,根据这个数据框架,我们对每家公司进行一些繁重的计算,然后将所有结果汇总为一个新的数据框架。非常简单,我们运行df.groupby('company').apply(function),在它运行时去喝杯咖啡,因为这是一个单线程操作

现在事情已经失去控制了,(3h+等待时间),我们正在试验多处理。我们已经实现了下面的小“概念”,通过它我们将DataFrame.GroupBy传递给executor.map回调函数并等待结果

问题是,它似乎需要很长的时间才能运行,并且对每个线程中实际发生的事情没有反馈。不确定这是否是正确的实现,我所能看到的只是CPU和内存以100%的使用率运行,但执行器从未完成

下面是数据库中每个公司的简化计算版本。建议如何正确使用多处理groupby是非常感谢的

import time
import concurrent

def append_new_company_technicals(group):
    '''
    Takes a dataframe and build new columns with technical information
    '''
    print(group['ticker'].unique())
    group.sort_values(by='date', inplace=True)

    group['halfvol_30_abs'] = group['px'].rolling(30,min_periods = 21).apply(func)
    group['halfvol_180_abs'] = group['px1'].rolling(180,min_periods = 135).apply(func)
    group['halfvol_30_rel'] = group['px2'].rolling(30,min_periods = 21).apply(func)
    group['halfvol_180_rel'] = group['px3'].rolling(180,min_periods = 135).apply(func)
    return group
    
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = {executor.map(append_new_company_technicals, df_merged.groupby('ticker'))}
end = time.time()
print("MultiProcessing computation: {} secs ".format(end - start))

Tags: 数据框架newtimegroup公司mincompany

热门问题