如何使用多处理在一个非常大的列表中删除重复项?

2024-05-15 04:01:56 发布

您现在位置:Python中文网/ 问答频道 /正文

比如说,我有一个包含随机数的巨大列表

L = [random.randrange(0,25000000000) for _ in range(1000000000)]

我需要去掉这个列表中的重复项

我为包含较少元素的列表编写了这段代码

def remove_duplicates(list_to_deduplicate):
seen = set()
result=[]
for i in list_to_deduplicate:
    if i not in seen:
        result.append(i)
        seen.add(i)
return result

在上面的代码中,我创建了一个集合,这样我就可以记住我正在处理的列表中已经出现的数字。如果该数字不在集合中,那么我将其添加到结果列表中,我需要返回并将其保存在集合中,这样它就不会再次添加到结果列表中

现在对于列表中的1000000个数字,一切都很好,我可以很快得到结果,但是对于优于100000000的数字,比如说出现100000000个问题,我需要在我的机器上使用不同的内核来尝试解决问题,然后合并多个进程的结果

我的第一个猜测是让所有流程都可以访问一个集合,但会出现许多复杂情况 一个进程如何读取,而另一个进程正在添加到集合中,我甚至不知道是否可以在进程之间共享一个集合我知道我们可以使用队列或管道,但我不确定如何使用它

有人能给我一个建议,什么是解决这个问题的最好方法 我愿意接受任何新想法


Tags: to代码in元素列表for进程range
2条回答

我怀疑即使是你最棒的列表也足够大,以至于多处理可以改善计时。使用numpy和多线程可能是最好的选择

多处理引入了相当多的开销并增加了内存消耗,就像前面正确提到的@Frank Merrow一样。 不过,对于多线程来说,情况并非如此。重要的是不要混淆这些术语,因为进程和线程是不同的。 同一进程内的线程共享内存,不同进程则不共享

在Python中使用多核的问题是GIL,它不允许多个线程(在同一进程中)并行执行Python字节码。一些C扩展(如numpy)可以释放GIL,这可以通过多线程从多核并行中获益。在这里,您有机会通过使用numpy在巨大改进的基础上获得一些加速

from multiprocessing.dummy import Pool  # .dummy uses threads
import numpy as np

r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
n_threads = 8

result = np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()

使用numpy和线程池,拆分数组,使子数组在单独的线程中唯一,然后连接子数组并使重新组合的数组再次唯一。 由于在子阵列中只能识别本地重复项,因此有必要最终删除重组阵列的重复项

对于低熵数据(许多重复项),使用^{}代替numpy.unique可以快得多。与^{}不同,它还保留了外观的顺序

请注意,只有当numpy函数通过调用低级数学库而不是多线程时,使用上述线程池才有意义。所以,总是测试它是否真的提高了性能,不要想当然


在以下范围内使用100M随机生成的整数进行测试:

  • 高熵:0-25_000_000_000(199560个副本)
  • 低熵:0-1000

代码

import time
import timeit
from multiprocessing.dummy import Pool  # .dummy uses threads

import numpy as np
import pandas as pd


def time_stmt(stmt, title=None):
    t = timeit.repeat(
        stmt=stmt,
        timer=time.perf_counter_ns, repeat=3, number=1, globals=globals()
    )
    print(f"\t{title or stmt}")
    print(f"\t\t{min(t) / 1e9:.2f} s")


if __name__ == '__main__':

    n_threads = 8  # machine with 8 cores (4 physical cores)

    stmt_np_unique_pool = \
"""
np.unique(np.concatenate(
    Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
"""

    stmt_pd_unique_pool = \
"""
pd.unique(np.concatenate(
    Pool(n_threads).map(pd.unique, np.array_split(r, n_threads)))
).tolist()
"""
    #                                     -

    print(f"\nhigh entropy (few duplicates) {'-' * 30}\n")
    r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")    
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

    #  -
    print(f"\nlow entropy (many duplicates) {'-' * 30}\n")
    r = np.random.RandomState(42).randint(0, 1000, 100_000_000)

    r = list(r)
    time_stmt("list(set(r))")

    r = np.asarray(r)
    # numpy.unique
    time_stmt("np.unique(r).tolist()")
    # pandas.unique
    time_stmt("pd.unique(r).tolist()")
    # numpy.unique & Pool
    time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
    # pandas.unique() & Pool
    time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

正如您在下面的计时中所看到的,仅使用numpy而不使用多线程已经是最大的性能改进。还请注意pandas.unique()对于许多重复项来说比numpy.unique()(仅)快

high entropy (few duplicates)                

    list(set(r))
        32.76 s
    np.unique(r).tolist()
        12.32 s
    pd.unique(r).tolist()
        23.01 s
    numpy.unique() & Pool
        9.75 s
    pandas.unique() & Pool
        28.91 s

low entropy (many duplicates)                

    list(set(r))
        5.66 s
    np.unique(r).tolist()
        4.59 s
    pd.unique(r).tolist()
        0.75 s
    numpy.unique() & Pool
        1.17 s
    pandas.unique() & Pool
        0.19 s

我不能说我喜欢这个,但它应该会起作用的

将数据分成N只读部分。为每个员工分发一份调查数据。所有内容都是只读的,因此可以共享。每个工人i 1…N对照所有其他“未来”列表检查其列表i+1…N

每个workeri为其i+1…N列表维护一个位表,记录其任何项是否命中任何未来项

当每个人都完成后,workeri将其位表发送回master,在那里可以编辑tit。然后删除零。没有排序没有集合。不过,检查速度不快

如果您不想麻烦处理多个位表,您可以让每个工人i在他们自己的责任区域上方发现dup时写入零。但是,现在您遇到了真正的共享内存问题。对于这一点,您甚至可以让每个工作只删除其区域上方的dup,但同上

甚至把工作分成两半也回避了这个问题。对于每个员工来说,为每个条目浏览其他人的列表是很昂贵的*(N-1)len(region)/2。每个工人可以创建一组其区域,或对其区域进行排序。这两种方法都可以加快检查速度,但成本会增加

相关问题 更多 >

    热门问题