如何跨Dask分布式工作线程共享大型只读对象

2024-06-17 09:56:00 发布

您现在位置:Python中文网/ 问答频道 /正文

问题

我试图通过apply()向dask分布式工作线程发送一个2GB的CPython只读对象(可以被pickle)。进程消耗了14 GB的内存。在

有没有一种方法可以只将对象加载到内存中一次,并让工作人员同时使用该对象?在

关于这个问题的更多细节

我有2个Dask系列源代码清单和模式清单,分别包含700万和300万个字符串。我试图从Pattern_list(3M)中找到Source_list(7M)中的所有子字符串匹配。在

为了加速子字符串搜索,我使用pyahocorasick包从Pattern_list(该对象是可pickle的)创建一个Cpython数据结构(类对象)。在

我试过了

  1. 使用单个dask调度程序运行大约需要2.5小时来处理,但最终得到正确的结果。在
  2. 正常分布的dask运行会导致:
distributed.worker - WARNING - Memory use is high but worker has no data to 
store to disk. Perhaps some other process is leaking memory? Process memory:  
2.85 GB -- Worker memory limit: 3.00 GB
  1. 使用分布式dask运行,内存限制增加到8GB/16GB:

    • 线程

      distributed.worker - WARNING - Memory use is high but worker has no 
      data to  store to disk. Perhaps some other process is leaking 
      memory? 
      Process memory:  14.5 GB -- Worker memory limit: 16.00 GB 
      distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
      
    • 流程 需要超过2.5小时来处理,我从来没有看到它完成(让它运行了8个多小时,然后取消)。它还消耗10+GB的 内存

  2. 使用矢量化字符串操作Source_list.str.find_all(Pattern_list)需要2.5个小时以上。在
  3. 将对象存储在全局变量中并调用它会导致与第3点中进程和线程相同的错误。在
  4. 在Source_list上使用map_partitions+loop/map会得到和第3点相同的结果。在

分布式代码

# OS = Windows 10
# RAM = 16 GB
# CPU cores = 8
# dask version 1.1.1

import dask.dataframe as dd
import ahocorasick
from dask.distributed import Client, progress

def create_ahocorasick_trie(pattern_list):
    A = ahocorasick.Automaton()
    for index, item in pattern_list.iteritems():
         A.add_word(item,item)
    A.make_automaton()
    return A 

if __name__ == '__main__':
    client = Client(memory_limit="12GB",processes=False)

    # Using Threading, because, the large_object seems to get copied in memory 
    # for each process when processes = True

    Source_list = dd.read_parquet("source_list.parquet") 
    Pattern_list = dd.read_parquet("pattern_list.parquet")

    # Note: 'source_list.parquet' and 'pattern_list.parquet' are generated via dask

    large_object = create_ahocorasick_trie(Pattern_list)

    result = Source_list.apply(lambda source_text: {large_object.iter(source_text)}, meta=(None,'O'))

    # iter() is an ahocorasick Cpython method

    progress(result.head(10))

    client.close()





Tags: to对象内存字符串sourceislistdask
1条回答
网友
1楼 · 发布于 2024-06-17 09:56:00

简单的回答是用达斯克。延迟呼叫

big = dask.delayed(big)
df.apply(func, extra=big)

Dask将根据需要移动它,并将其视为自己的一段数据。也就是说,它需要存在于每个工人身上,所以每个工人的RAM应该比这个东西占用的内存多得多。(至少4倍或更多)。在

相关问题 更多 >