如何并行化调用函数的for循环?

2024-04-29 14:46:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我有下面的代码段,它遍历.csv文件列表,然后使用insert_csv_data函数读取、预处理.csv文件的数据并将其插入.hyper文件(Hyper是Tableau新的内存数据引擎技术,专为大型或复杂数据集的快速数据摄取和分析查询处理而设计):

有关insert_csv_data函数的详细解释,请参见here

for csv in csv_list:
            insert_csv_data(hyper)

上述代码的问题是,它一次将一个.csv文件插入.hyper文件,目前速度相当慢

我想知道当我使用ApacheSpark在Databricks上进行处理时,是否有更快或并行的解决方法。我做了一些研究,发现了类似multiprocessing的模块, joblibasyncio这可能适用于我的场景,但我不确定如何正确实现它们

请告知

编辑:

并行代码:

from joblib import Parallel, delayed
element_run = Parallel(n_jobs=1)(delayed(insert_csv_data)(csv) for csv in csv_list)

Tags: 文件csv数据函数代码infordata
1条回答
网友
1楼 · 发布于 2024-04-29 14:46:14

这并没有直接回答这个问题,而是演示了如何使用concurrent.futures模块轻松地互换多处理和多线程。请注意,这两个循环实现了完全相同的功能,代码两部分之间的唯一区别是work manager类

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def tfunc(n):
    return n * n


N = 1_000


def main():
    with ThreadPoolExecutor() as executor:
        for future in [executor.submit(tfunc, n) for n in range(N)]:
            future.result()

    with ProcessPoolExecutor() as executor:
        for future in [executor.submit(tfunc, n) for n in range(N)]:
            future.result()


if __name__ == '__main__':  
    main()

相关问题 更多 >