如何在ipyparallel客户端和远程引擎之间最好地共享静态数据?

2024-06-01 01:14:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我用不同的参数在一个循环中运行相同的模拟。每个模拟都使用一个pandas数据帧(data),它只读取,从不修改。使用ipyparallel(IPython parallel),我可以在模拟开始之前将这些数据帧放入我视图中每个引擎的全局变量空间:

view['data'] = data

然后,引擎可以访问在其上运行的所有模拟的数据帧。复制数据的过程(如果pickle,data是40MB)只需几秒钟。但是,如果模拟的数量增加,内存使用量就会增长得非常大。我想这个共享数据被复制到每个任务中,而不仅仅是每个引擎的拷贝。使用引擎共享来自客户端的静态只读数据的最佳实践是什么?每个引擎复制一次是可以接受的,但理想情况下,每个主机只需要复制一次(我在host1上有4个引擎,host2上有8个引擎)。在

我的代码是:

^{pr2}$

如果模拟计数很小(~50),则需要一段时间才能开始,但我开始看到progress print语句。奇怪的是,多个任务将被分配给同一个引擎,我在该引擎的所有分配任务都完成之前看不到响应。我希望每次完成一个模拟任务时都能看到enumerate(ar)的响应。在

如果模拟计数很大(约1000),则需要很长时间才能开始,我看到所有引擎的CPU都加速了,但很长时间(约40分钟)才看到进度打印语句,当我do查看进度时,似乎有一大块(大于100)的任务进入了同一个引擎,并等待着该引擎的完成提供一些进展。当一个引擎完成时,我看到ar对象每4秒提供一个新的响应-这可能是写入输出pickle文件的时间延迟。在

最后,host1还运行ipycontroller任务,它的内存使用量急剧上升(Python任务显示使用的是>;6gbram,内核任务显示使用3GB)。host2引擎实际上根本没有显示多少内存使用情况。是什么导致了记忆的激增?在


Tags: 数据内存引擎pandasdata参数情况语句
2条回答

有时您需要按类别分散数据分组,以便确保每个子组都完全包含在单个集群中。在

我通常是这样做的:

# Connect to the clusters
import ipyparallel as ipp
client = ipp.Client()
lview  = client.load_balanced_view()
lview.block = True
CORES = len(client[:])

# Define the scatter_by function
def scatter_by(df,grouper,name='df'):
    sz = df.groupby([grouper]).size().sort_values().index.unique()
    for core in range(CORES):
        ids = sz[core::CORES]
        print("Pushing {0} {1}s into cluster {2}...".format(size(ids),grouper,core))
        client[core].push({name:df[df[grouper].isin(ids)]})

# Scatter the dataframe df grouping by `year`
scatter_by(df,'year')

请注意,我建议的scatters函数可以确保每个簇将承载相似数量的观测,这通常是一个好主意。在

几年前,我在一个代码中使用了这个逻辑,我开始使用this。我的代码是这样的:

shared_dict = {
    # big dict with ~10k keys, each with a list of dicts
}

balancer = engines.load_balanced_view()

with engines[:].sync_imports(): # your 'view' variable 
    import pandas as pd
    import ujson as json

engines[:].push(shared_dict)

results = balancer.map(lambda i: (i, my_func(i)), id)
results_data = results.get()

If simulation counts are small (~50), then it takes a while to get started, but i start to see progress print statements. Strangely, multiple tasks will get assigned to the same engine and I don't see a response until all of those assigned tasks are completed for that engine. I would expect to see a response from enumerate(ar) every time a single simulation task completes.

在我的例子中,my_func()是一个复杂的方法,我把许多日志消息写入一个文件,所以我有了print语句。在

关于任务分配,正如我使用load_balanced_view(),我离开了库找到它的方式,它做得很好。在

If simulation counts are large (~1000), it takes a long time to get started, i see the CPUs throttle up on all engines, but no progress print statements are seen until a long time (~40mins), and when I do see progress, it appears a large block (>100) of tasks went to same engine, and awaited completion from that one engine before providing some progress. When that one engine did complete, i saw the ar object provided new responses ever 4 secs - this may have been the time delay to write the output pickle files.

很长时间以来,我都没有经历过,所以我不能说什么。在

我希望这能对你的问题有所启示。在


注:正如我在评论中所说,你可以试试multiprocessing.Pool。我想我还没有尝试过将一个大的只读数据作为一个全局变量来使用它。我想试试看,因为it seems to work。在

相关问题 更多 >