Python:基于交集的简单列表合并

2024-04-26 12:45:15 发布

您现在位置:Python中文网/ 问答频道 /正文

假设有一些整数列表:

#--------------------------------------
0 [0,1,3]
1 [1,0,3,4,5,10,...]
2 [2,8]
3 [3,1,0,...]
...
n []
#--------------------------------------

问题是合并至少有一个公共元素的列表。因此,仅给定部分的结果如下:

#--------------------------------------
0 [0,1,3,4,5,10,...]
2 [2,8]
#--------------------------------------

在大数据(元素只是数字)上执行此操作的最有效方法是什么?tree结构是否值得考虑? 我现在通过将列表转换成sets并迭代交叉点来完成这项工作,但是速度很慢!此外,我有一种感觉,是如此基本!此外,实现缺少某些东西(未知),因为有些列表有时仍然未合并!话虽如此,如果你提议自我实现,请慷慨大方,并提供一个简单的示例代码[显然,Python是我的最爱:)]或pesudo代码。
更新1: 下面是我使用的代码:

#--------------------------------------
lsts = [[0,1,3],
        [1,0,3,4,5,10,11],
        [2,8],
        [3,1,0,16]];
#--------------------------------------

功能是(小车!!):

#--------------------------------------
def merge(lsts):
    sts = [set(l) for l in lsts]
    i = 0
    while i < len(sts):
        j = i+1
        while j < len(sts):
            if len(sts[i].intersection(sts[j])) > 0:
                sts[i] = sts[i].union(sts[j])
                sts.pop(j)
            else: j += 1                        #---corrected
        i += 1
    lst = [list(s) for s in sts]
    return lst
#--------------------------------------

结果是:

#--------------------------------------
>>> merge(lsts)
>>> [0, 1, 3, 4, 5, 10, 11, 16], [8, 2]]
#--------------------------------------

更新2: 根据我的经验,下面由Niklas Baumstark给出的代码对于简单的例子来说要快一点。还没有测试“Hooked”给出的方法,因为它是完全不同的方法(顺便说一下,它看起来很有趣)。 所有这些的测试程序可能很难或不可能保证结果。我将使用的实际数据集是如此庞大和复杂,因此不可能仅仅通过重复来跟踪任何错误。这是我需要100%满足的可靠性的方法之前,推动它在一个大的代码作为一个模块的位置。简单来说,Niklas的方法更快,简单集合的答案当然是正确的 但是,我如何确保它对真正的大型数据集有效?因为我无法直观地跟踪错误!

更新3: 注意,对于这个问题,方法的可靠性比速度重要得多。我希望最终能够将Python代码转换为Fortran以获得最大的性能。

更新4:
在这篇文章中有许多有趣的观点,并慷慨地给出了答案,建设性的评论。我建议大家通读一遍。请接受我对这个问题的发展、令人惊奇的答案以及建设性的评论和讨论的赞赏。


Tags: 数据方法答案代码in元素列表for
3条回答

我试着总结在这个问题和duplicate one中关于这个主题所说和所做的一切。

我试着测试每个解决方案(所有代码here)。

测试

这是来自测试模块的TestCase

class MergeTestCase(unittest.TestCase):

    def setUp(self):
        with open('./lists/test_list.txt') as f:
            self.lsts = json.loads(f.read())
        self.merged = self.merge_func(deepcopy(self.lsts))

    def test_disjoint(self):
        """Check disjoint-ness of merged results"""
        from itertools import combinations
        for a,b in combinations(self.merged, 2):
            self.assertTrue(a.isdisjoint(b))

    def test_coverage(self):    # Credit to katrielalex
        """Check coverage original data"""
        merged_flat = set()
        for s in self.merged:
            merged_flat |= s

        original_flat = set()
        for lst in self.lsts:
            original_flat |= set(lst)

        self.assertTrue(merged_flat == original_flat)

    def test_subset(self):      # Credit to WolframH
        """Check that every original data is a subset"""
        for lst in self.lsts:
            self.assertTrue(any(set(lst) <= e for e in self.merged))

这个测试是假设一个集合列表作为结果,所以我不能测试两个与列表一起工作的suluons。

我无法测试以下各项:

katrielalex
steabert

在我能测试的那些中,有两个失败了:

  -- Going to test: agf (optimized) --
Check disjoint-ness of merged results ... FAIL

  -- Going to test: robert king --
Check disjoint-ness of merged results ... FAIL

时机

性能与所采用的数据测试密切相关。

到目前为止,有三个答案试图为他们和其他人的解决方案计时。因为他们使用了不同的测试数据,所以他们得到了不同的结果。

  1. Niklas benchmark很荒谬。有了banchmark,人们可以做不同的测试来改变一些参数。

    我使用了他在自己的答案中使用的三组参数,我将它们放在三个不同的文件中:

    filename = './lists/timing_1.txt'
    class_count = 50,
    class_size = 1000,
    list_count_per_class = 100,
    large_list_sizes = (100, 1000),
    small_list_sizes = (0, 100),
    large_list_probability = 0.5,
    
    filename = './lists/timing_2.txt'
    class_count = 15,
    class_size = 1000,
    list_count_per_class = 300,
    large_list_sizes = (100, 1000),
    small_list_sizes = (0, 100),
    large_list_probability = 0.5,
    
    filename = './lists/timing_3.txt'
    class_count = 15,
    class_size = 1000,
    list_count_per_class = 300,
    large_list_sizes = (100, 1000),
    small_list_sizes = (0, 100),
    large_list_probability = 0.1,
    

    这是我得到的结果:

    来自文件:timing_1.txt

    Timing with: >> Niklas << Benchmark
    Info: 5000 lists, average size 305, max size 999
    
    Timing Results:
    10.434  -- alexis
    11.476  -- agf
    11.555  -- Niklas B.
    13.622  -- Rik. Poggi
    14.016  -- agf (optimized)
    14.057  -- ChessMaster
    20.208  -- katrielalex
    21.697  -- steabert
    25.101  -- robert king
    76.870  -- Sven Marnach
    133.399  -- hochl
    

    来自文件:timing_2.txt

    Timing with: >> Niklas << Benchmark
    Info: 4500 lists, average size 305, max size 999
    
    Timing Results:
    8.247  -- Niklas B.
    8.286  -- agf
    8.637  -- Rik. Poggi
    8.967  -- alexis
    9.090  -- ChessMaster
    9.091  -- agf (optimized)
    18.186  -- katrielalex
    19.543  -- steabert
    22.852  -- robert king
    70.486  -- Sven Marnach
    104.405  -- hochl
    

    来自文件:timing_3.txt

    Timing with: >> Niklas << Benchmark
    Info: 4500 lists, average size 98, max size 999
    
    Timing Results:
    2.746  -- agf
    2.850  -- Niklas B.
    2.887  -- Rik. Poggi
    2.972  -- alexis
    3.077  -- ChessMaster
    3.174  -- agf (optimized)
    5.811  -- katrielalex
    7.208  -- robert king
    9.193  -- steabert
    23.536  -- Sven Marnach
    37.436  -- hochl
    
  2. 根据Sven的测试数据,我得到了以下结果:

    Timing with: >> Sven << Benchmark
    Info: 200 lists, average size 10, max size 10
    
    Timing Results:
    2.053  -- alexis
    2.199  -- ChessMaster
    2.410  -- agf (optimized)
    3.394  -- agf
    3.398  -- Rik. Poggi
    3.640  -- robert king
    3.719  -- steabert
    3.776  -- Niklas B.
    3.888  -- hochl
    4.610  -- Sven Marnach
    5.018  -- katrielalex
    
  3. 最后通过Agf的基准测试,我得到了:

    Timing with: >> Agf << Benchmark
    Info: 2000 lists, average size 246, max size 500
    
    Timing Results:
    3.446  -- Rik. Poggi
    3.500  -- ChessMaster
    3.520  -- agf (optimized)
    3.527  -- Niklas B.
    3.527  -- agf
    3.902  -- hochl
    5.080  -- alexis
    15.997  -- steabert
    16.422  -- katrielalex
    18.317  -- robert king
    1257.152  -- Sven Marnach
    

正如我在开头所说的,所有代码都可以在this git repository获得。所有合并函数都在一个名为core.py的文件中,其中每个以_merge结尾的函数都将在测试期间自动加载,因此添加/测试/改进您自己的解决方案应该不难。

如果有什么不对劲的地方,也请告诉我,这是一个很大的编码过程,我可以用一些新的眼光:)

我的尝试:

def merge(lsts):
    sets = [set(lst) for lst in lsts if lst]
    merged = True
    while merged:
        merged = False
        results = []
        while sets:
            common, rest = sets[0], sets[1:]
            sets = []
            for x in rest:
                if x.isdisjoint(common):
                    sets.append(x)
                else:
                    merged = True
                    common |= x
            results.append(common)
        sets = results
    return sets

lst = [[65, 17, 5, 30, 79, 56, 48, 62],
       [6, 97, 32, 93, 55, 14, 70, 32],
       [75, 37, 83, 34, 9, 19, 14, 64],
       [43, 71],
       [],
       [89, 49, 1, 30, 28, 3, 63],
       [35, 21, 68, 94, 57, 94, 9, 3],
       [16],
       [29, 9, 97, 43],
       [17, 63, 24]]
print merge(lst)

基准:

import random

# adapt parameters to your own usage scenario
class_count = 50
class_size = 1000
list_count_per_class = 100
large_list_sizes = list(range(100, 1000))
small_list_sizes = list(range(0, 100))
large_list_probability = 0.5

if False:  # change to true to generate the test data file (takes a while)
    with open("/tmp/test.txt", "w") as f:
        lists = []
        classes = [
            range(class_size * i, class_size * (i + 1)) for i in range(class_count)
        ]
        for c in classes:
            # distribute each class across ~300 lists
            for i in xrange(list_count_per_class):
                lst = []
                if random.random() < large_list_probability:
                    size = random.choice(large_list_sizes)
                else:
                    size = random.choice(small_list_sizes)
                nums = set(c)
                for j in xrange(size):
                    x = random.choice(list(nums))
                    lst.append(x)
                    nums.remove(x)
                random.shuffle(lst)
                lists.append(lst)
        random.shuffle(lists)
        for lst in lists:
            f.write(" ".join(str(x) for x in lst) + "\n")

setup = """
# Niklas'
def merge_niklas(lsts):
    sets = [set(lst) for lst in lsts if lst]
    merged = 1
    while merged:
        merged = 0
        results = []
        while sets:
            common, rest = sets[0], sets[1:]
            sets = []
            for x in rest:
                if x.isdisjoint(common):
                    sets.append(x)
                else:
                    merged = 1
                    common |= x
            results.append(common)
        sets = results
    return sets

# Rik's
def merge_rik(data):
    sets = (set(e) for e in data if e)
    results = [next(sets)]
    for e_set in sets:
        to_update = []
        for i, res in enumerate(results):
            if not e_set.isdisjoint(res):
                to_update.insert(0, i)

        if not to_update:
            results.append(e_set)
        else:
            last = results[to_update.pop(-1)]
            for i in to_update:
                last |= results[i]
                del results[i]
            last |= e_set
    return results

# katrielalex's
def pairs(lst):
    i = iter(lst)
    first = prev = item = i.next()
    for item in i:
        yield prev, item
        prev = item
    yield item, first

import networkx

def merge_katrielalex(lsts):
    g = networkx.Graph()
    for lst in lsts:
        for edge in pairs(lst):
            g.add_edge(*edge)
    return networkx.connected_components(g)

# agf's (optimized)
from collections import deque

def merge_agf_optimized(lists):
    sets = deque(set(lst) for lst in lists if lst)
    results = []
    disjoint = 0
    current = sets.pop()
    while True:
        merged = False
        newsets = deque()
        for _ in xrange(disjoint, len(sets)):
            this = sets.pop()
            if not current.isdisjoint(this):
                current.update(this)
                merged = True
                disjoint = 0
            else:
                newsets.append(this)
                disjoint += 1
        if sets:
            newsets.extendleft(sets)
        if not merged:
            results.append(current)
            try:
                current = newsets.pop()
            except IndexError:
                break
            disjoint = 0
        sets = newsets
    return results

# agf's (simple)
def merge_agf_simple(lists):
    newsets, sets = [set(lst) for lst in lists if lst], []
    while len(sets) != len(newsets):
        sets, newsets = newsets, []
        for aset in sets:
            for eachset in newsets:
                if not aset.isdisjoint(eachset):
                    eachset.update(aset)
                    break
            else:
                newsets.append(aset)
    return newsets

# alexis'
def merge_alexis(data):
    bins = range(len(data))  # Initialize each bin[n] == n
    nums = dict()

    data = [set(m) for m in data]  # Convert to sets
    for r, row in enumerate(data):
        for num in row:
            if num not in nums:
                # New number: tag it with a pointer to this row's bin
                nums[num] = r
                continue
            else:
                dest = locatebin(bins, nums[num])
                if dest == r:
                    continue  # already in the same bin

                if dest > r:
                    dest, r = r, dest  # always merge into the smallest bin

                data[dest].update(data[r])
                data[r] = None
                # Update our indices to reflect the move
                bins[r] = dest
                r = dest

    # Filter out the empty bins
    have = [m for m in data if m]
    return have

def locatebin(bins, n):
    while bins[n] != n:
        n = bins[n]
    return n

lsts = []
size = 0
num = 0
max = 0
for line in open("/tmp/test.txt", "r"):
    lst = [int(x) for x in line.split()]
    size += len(lst)
    if len(lst) > max:
        max = len(lst)
    num += 1
    lsts.append(lst)
"""

setup += """
print "%i lists, {class_count} equally distributed classes, average size %i, max size %i" % (num, size/num, max)
""".format(class_count=class_count)

import timeit
print "niklas"
print timeit.timeit("merge_niklas(lsts)", setup=setup, number=3)
print "rik"
print timeit.timeit("merge_rik(lsts)", setup=setup, number=3)
print "katrielalex"
print timeit.timeit("merge_katrielalex(lsts)", setup=setup, number=3)
print "agf (1)"
print timeit.timeit("merge_agf_optimized(lsts)", setup=setup, number=3)
print "agf (2)"
print timeit.timeit("merge_agf_simple(lsts)", setup=setup, number=3)
print "alexis"
print timeit.timeit("merge_alexis(lsts)", setup=setup, number=3)

这些计时显然取决于基准测试的特定参数,如类数、列表数、列表大小等。请根据需要调整这些参数,以获得更有用的结果。

下面是我的机器上不同参数的一些输出示例。它们表明,所有算法都有其优缺点,这取决于它们获得的输入类型:

=====================
# many disjoint classes, large lists
class_count = 50
class_size = 1000
list_count_per_class = 100
large_list_sizes = list(range(100, 1000))
small_list_sizes = list(range(0, 100))
large_list_probability = 0.5
=====================

niklas
5000 lists, 50 equally distributed classes, average size 298, max size 999
4.80084705353
rik
5000 lists, 50 equally distributed classes, average size 298, max size 999
9.49251699448
katrielalex
5000 lists, 50 equally distributed classes, average size 298, max size 999
21.5317108631
agf (1)
5000 lists, 50 equally distributed classes, average size 298, max size 999
8.61671280861
agf (2)
5000 lists, 50 equally distributed classes, average size 298, max size 999
5.18117713928
=> alexis
=> 5000 lists, 50 equally distributed classes, average size 298, max size 999
=> 3.73504281044

===================
# less number of classes, large lists
class_count = 15
class_size = 1000
list_count_per_class = 300
large_list_sizes = list(range(100, 1000))
small_list_sizes = list(range(0, 100))
large_list_probability = 0.5
===================

niklas
4500 lists, 15 equally distributed classes, average size 296, max size 999
1.79993700981
rik
4500 lists, 15 equally distributed classes, average size 296, max size 999
2.58237695694
katrielalex
4500 lists, 15 equally distributed classes, average size 296, max size 999
19.5465381145
agf (1)
4500 lists, 15 equally distributed classes, average size 296, max size 999
2.75445604324
=> agf (2)
=> 4500 lists, 15 equally distributed classes, average size 296, max size 999
=> 1.77850699425
alexis
4500 lists, 15 equally distributed classes, average size 296, max size 999
3.23530197144

===================
# less number of classes, smaller lists
class_count = 15
class_size = 1000
list_count_per_class = 300
large_list_sizes = list(range(100, 1000))
small_list_sizes = list(range(0, 100))
large_list_probability = 0.1
===================

niklas
4500 lists, 15 equally distributed classes, average size 95, max size 997
0.773697137833
rik
4500 lists, 15 equally distributed classes, average size 95, max size 997
1.0523750782
katrielalex
4500 lists, 15 equally distributed classes, average size 95, max size 997
6.04466891289
agf (1)
4500 lists, 15 equally distributed classes, average size 95, max size 997
1.20285701752
=> agf (2)
=> 4500 lists, 15 equally distributed classes, average size 95, max size 997
=> 0.714507102966
alexis
4500 lists, 15 equally distributed classes, average size 95, max size 997
1.1286110878

使用矩阵操作

请允许我在回答之前加上以下评论:

这样做是错误的。它很容易出现数值不稳定,而且速度比其他方法慢得多,风险自负。

尽管如此,我还是忍不住从动态的角度来解决这个问题(我希望你能对这个问题有一个新的视角)。在理论中,这应该一直有效,但特征值计算常常会失败。我们的想法是将列表看作一个从行到列的。如果两行共享一个公共值,则它们之间存在连接流。如果我们把这些水流看作水,当它们之间有一条连接路径时,我们就会看到这些水流聚集成小水池。为了简单起见,我将使用较小的集合,尽管它也适用于您的数据集:

from numpy import where, newaxis
from scipy import linalg, array, zeros

X = [[0,1,3],[2],[3,1]]

我们需要把数据转换成流程图。如果行i流入值j中,我们将其放入矩阵中。这里有3行和4个唯一值:

A = zeros((4,len(X)), dtype=float)
for i,row in enumerate(X):
    for val in row: A[val,i] = 1

通常,您需要更改4来捕获您拥有的唯一值的数量。如果集合是从0开始的整数列表,您只需将其设为最大数即可。我们现在执行特征值分解。准确地说是SVD,因为我们的矩阵不是平方的。

S  = linalg.svd(A)

我们只想保留这个答案的3x3部分,因为它将代表池的流量。实际上,我们只需要这个矩阵的绝对值;我们只关心这个空间中是否有流。

M  = abs(S[2])

我们可以把这个矩阵M看作一个马尔可夫矩阵,并通过行规范化使它显式。一旦我们得到这个,我们就计算(左)特征值分解。在这个矩阵中。

M /=  M.sum(axis=1)[:,newaxis]
U,V = linalg.eig(M,left=True, right=False)
V = abs(V)

现在,一个不连通(非遍历)马尔可夫矩阵具有一个很好的性质,即对于每个不连通簇,都有一个统一的特征值。与这些单位值相关的特征向量是我们想要的:

idx = where(U > .999)[0]
C = V.T[idx] > 0

由于前面提到的数值不稳定性,我不得不使用.999。现在,我们结束了!现在,每个独立的集群都可以拉出相应的行:

for cluster in C:
    print where(A[:,cluster].sum(axis=1))[0]

其目的是:

[0 1 3]
[2]

X改成lst就可以得到[ 0 1 3 4 5 10 11 16] [2 8]


附录

为什么这可能有用?我不知道你的底层数据来自哪里,但是当连接不是绝对的时会发生什么?假设第1行有80%的时间有条目3-您如何概括这个问题?上面的flow方法可以很好地工作,并且完全由.999值参数化,离单位越远,关联就越松散。


视觉表现

因为一张图片值1K个单词,这里是矩阵a和V的图,分别是我的例子和你的lst。注意在V中如何分成两个簇(它是一个块对角矩阵,排列后有两个块),因为每个示例只有两个唯一的列表!

My ExampleYour sample data


更快的实现

事后看来,我意识到你可以跳过SVD步骤,只计算一次反编译:

M = dot(A.T,A)
M /=  M.sum(axis=1)[:,newaxis]
U,V = linalg.eig(M,left=True, right=False)

这种方法的优点(除了速度)是M现在是对称的,因此计算速度更快、更精确(无需担心虚值)。

相关问题 更多 >