如何并行化数据帧的apply()方法

2024-05-16 10:07:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下代码:

import pandas as pd
import time

def enrich_str(str):
        
    val1 = f'{str}_1'
    val2 = f'{str}_2'
    val3 = f'{str}_3'
    time.sleep(3)
    
    return val1, val2, val3
    
def enrich_row(passed_row):
    col_name = str(passed_row['colName'])
    my_string = str(passed_row[col_name])
    
    val1, val2, val3 = enrich_str(my_string)
    
    passed_row['enriched1'] = val1
    passed_row['enriched2'] = val2
    passed_row['enriched3'] = val3
    
    return passed_row


df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, 
                  columns=['numbers', 'colors'])

df['colName'] = 'colors'

tic = time.perf_counter()
enriched_df = df.apply(enrich_row, col_name='colors', axis=1)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")

enriched_df

获取输出数据帧需要15秒,如下所示:

enter image description here

现在我想在我的机器上使用多个线程并行化扩展操作。 我探索了很多解决方案,比如Dasknumba,但对我来说,没有一个是直接的

然后我偶然发现了multiprocessing库及其pool.imaps()方法。因此,我尝试运行以下代码:

import multiprocessing as mp

tic = time.perf_counter()
pool = mp.Pool(5)
result = pool.imap(enrich_row, df.itertuples(), chunksize=1)
pool.close()
pool.join()
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
result

大约需要2秒钟,而且result不是一个数据帧。 我不知道我错在哪里


Tags: dftimecounterticperfrowpoolcolors
2条回答

我接受了@albert的答案,因为它在Linux上工作。不管怎样,我发现Dask dataframe's ^{} method确实向前迈进了。正如我在前面的评论中提到的,最初操作不是在120行的数据集上并行执行的。后来我发现120行只使用了Dask数据帧的一个分区。因此,进行重新分区以获得所需的并行性就足够了Here一个使用Dask的代码示例(它会引发一些奇怪的警告…)

我建议您使用multiprocessingpathos fork,因为它可以更好地处理数据帧的酸洗imap返回迭代器,而不是数据帧,因此必须将其转换回:

def enrich_row(row_tuple):
    passed_row = row_tuple[1]
    col_name = str(passed_row['colName'])
    my_string = str(passed_row[col_name])
    
    val1, val2, val3 = enrich_str(my_string)
    
    passed_row['enriched1'] = val1
    passed_row['enriched2'] = val2
    passed_row['enriched3'] = val3
    
    return passed_row

df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, 
                  columns=['numbers', 'colors'])

df['colName'] = 'colors'

from pathos.multiprocessing import Pool

tic = time.perf_counter()
result = Pool(8).imap(enrich_row, df.iterrows(), chunksize=1)
df = pd.DataFrame(result)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
print(df)

注意,我正在使用df.iterrows()返回元组的迭代器(row_number, row),所以我修改了enrich_row来处理这种格式

相关问题 更多 >