尝试对值进行分组?

5 投票
10 回答
808 浏览
提问于 2025-04-16 04:54

我有一些数据,像这样:

1 2
3 4
5 9
2 6
3 7

我想要的输出是这样的(显示组的ID和该组的成员):

1: 1 2 6
2: 3 4 7
3: 5 9

第一行是因为1和2是“连接”的,2又连接到6。第二行是因为3连接到4,3又连接到7。

这让我觉得像是在遍历图形,但最终的顺序并不重要,所以我在想有没有人能推荐一个更简单的解决方案,适合处理大数据集(有数十亿条记录)。


根据评论:

  • 问题是要找到一组不相交的子图,给定一组边。
  • 这些边是无向的;比如'1 2'意味着1和2是连接的,2也连接到1。
  • 示例输出中的'1:'可以改成'A:',而不改变答案的意思。

编辑 1:

问题现在看起来解决了。感谢大家的帮助。我还需要一些帮助,来选择一个可以处理数十亿条记录的最佳解决方案。

编辑 2:

测试输入文件:

1 27
1 134
1 137
1 161
1 171
1 275
1 309
1 413
1 464
1 627
1 744
2 135
2 398
2 437
2 548
2 594
2 717
2 738
2 783
2 798
2 912
5 74
5 223
7 53
7 65
7 122
7 237
7 314
7 701
7 730
7 755
7 821
7 875
7 884
7 898
7 900
7 930
8 115
9 207
9 305
9 342
9 364
9 493
9 600
9 676
9 830
9 941
10 164
10 283
10 380
10 423
10 468
10 577
11 72
11 132
11 276
11 306
11 401
11 515
11 599
12 95
12 126
12 294
13 64
13 172
13 528
14 396
15 35
15 66
15 210
15 226
15 360
15 588
17 263
17 415
17 474
17 648
17 986
21 543
21 771
22 47
23 70
23 203
23 427
23 590
24 286
24 565
25 175
26 678
27 137
27 161
27 171
27 275
27 309
27 413
27 464
27 627
27 684
27 744
29 787

基准测试:

我尝试了所有的方案,TokenMacGuy发布的版本在我测试的样本数据集中是最快的。这个数据集大约有100万条记录,我在一台2.4GHz的双四核机器上花了大约6秒钟。我还没有机会在整个数据集上运行它,但一旦有结果我会尽快发布基准测试的结果。

10 个回答

1

好的,我在@Jonathan发布的另一个解决方案的基础上,做了一些并行的工作(首先,非常感谢你的时间)。我的解决方案看起来很简单,但我希望能得到一些建议,看看这是否正确(也许我在某个边界情况下遗漏了什么?)因为它似乎能产生我想要的输出,但我需要再处理一次来把相同的组号归在一起,这个过程其实很简单。我的逻辑是,每次找到一个不在数组中的新数字时,就增加一个组号计数器:

我的PHP代码:

<?php

//$fp = fopen("./resemblance.1.out", "r");
$fp = fopen("./wrong", "r");

$groups = array();
$group["-1"] = 1;
$groups[] = $group;

$map = array();

//Maintain a count
$group = 1;

while(!feof($fp)) {
        $source = trim(fgets($fp, 4096));
        //echo $source."\n";

        $source = explode(" ", $source);

        if(array_key_exists($source[0], $map) && !array_key_exists($source[1], $map)) {
                $map[$source[1]] = $map[$source[0]];
        } else if(array_key_exists($source[1], $map) && !array_key_exists($source[0], $map)) {
                $map[$source[0]] = $map[$source[1]];
        } else if(array_key_exists($source[1], $map) && array_key_exists($source[0], $map) && $map[$source[1]] != $map[$source[0]]) {
                // Adjust the groups - change the groups of one of the elements to the other
                $keys = array_keys($map, $map[$source[1]]);
                print_r($keys);
                foreach($keys as $key) {
                        $map[$key] = $map[$source[0]];
                }
        } else {
                $group++;
                $map[$source[0]] = $group;
                $map[$source[1]] = $group;
        }
}

print_r($map);
?>

输出:

Array
(
    [1] => 2
    [2] => 2
    [3] => 3
    [4] => 3
    [5] => 4
    [9] => 4
    [6] => 2
    [7] => 3
    [] => 5
)

编辑:修复了评论中提到的错误。出于好奇,我只是随便玩玩 :) 如果你发现其他错误,请随时指出。我目前正在测试哪个解决方案更快。

1

这里有一个可以在原始数据集上运行的Perl示例解决方案:

1 2
3 4
5 9
2 6
3 7

Group 1: 1 2 6
Group 2: 3 4 7
Group 3: 5 9

在大数据集上,它会产生以下输出:

Group 1: 1 27 134 137 161 171 275 309 413 464 627 684 744
Group 2: 2 135 398 437 548 594 717 738 783 798 912
Group 3: 5 74 223
Group 4: 7 53 65 122 237 314 701 730 755 821 875 884 898 900 930
Group 5: 8 115
Group 6: 9 207 305 342 364 493 600 676 830 941
Group 7: 10 164 283 380 423 468 577
Group 8: 11 72 132 276 306 401 515 599
Group 9: 12 95 126 294
Group 10: 13 64 172 528
Group 11: 14 396
Group 12: 15 35 66 210 226 360 588
Group 13: 17 263 415 474 648 986
Group 14: 21 543 771
Group 15: 22 47
Group 16: 23 70 203 427 590
Group 17: 24 286 565
Group 18: 25 175
Group 19: 26 678
Group 20: 29 787

至于它是否足够高效,那就是另一个问题了……

use strict;
use warnings;
my %cache = ();
while (<>)
{
    chomp;
    my($x,$y) = split /\s+/;
    #print "$x $y\n";
    $cache{$x}{$y} = 1;
    $cache{$y}{$x} = 1;
}

my $grp = 1;
foreach my $key (sort { $a <=> $b } keys %cache)
{
    #print "key: $key\n";
    if (defined $cache{$key})
    {
        my %result = ();
        subkey_search(\%result, $key);
        print "Group $grp:";
        $grp++;
        foreach my $val (sort { $a <=> $b } keys %result)
        {
            print " $val";
        }
        print "\n";
    }
}

sub subkey_search
{
    my($resultref, $key) = @_;
    my %hash = %{$cache{$key}};
    delete $cache{$key};
    $resultref->{$key} = 1;
    foreach my $subkey (sort keys %hash)
    {
        #print "subkey: $subkey\n";
        subkey_search($resultref, $subkey) if (defined $cache{$subkey});
    }
}
4

我已经搞定了 O(n log n) 的复杂度。

这里有一个(稍微复杂一点的)C++ 实现:

#include <boost/pending/disjoint_sets.hpp>
#include <boost/property_map/property_map.hpp>

#include <map>
#include <set>
#include <iostream>


typedef std::map<int, int> rank_t;
typedef std::map<int, int> parent_t;

typedef boost::associative_property_map< rank_t > rank_pmap_t;
typedef boost::associative_property_map< parent_t > parent_pmap_t;

typedef boost::disjoint_sets< rank_pmap_t, parent_pmap_t > group_sets_t;

typedef std::set<int> key_set;
typedef std::map<int, std::set<int> > output;

在处理了一些类型定义后,接下来就是重点了。我使用了 boost::disjoint_sets,这个工具非常适合解决这个问题。第一个函数是用来检查给定的两个键是否之前出现过,如果没有,就把它们加入到集合中。最重要的部分是 union_set(a, b),它把两个集合连接在一起。如果其中一个集合已经在 groups 集合中,它们也会被连接。

void add_data(int a, int b, group_sets_t & groups, key_set & keys)
{
  if (keys.count(a) < 1) groups.make_set(a);
  if (keys.count(b) < 1) groups.make_set(b);
  groups.union_set(a, b);
  keys.insert(a);
  keys.insert(b);
}

这部分不算太复杂,它只是遍历我们见过的所有键,找到每个键的代表键,然后把这个代表键和当前键的配对(代表键, 键)加入到一个映射中。完成后,就打印出这个映射。

void build_output(group_sets_t & groups, key_set & keys)
{
  output out;
  for (key_set::iterator i(keys.begin()); i != keys.end(); i++)
    out[groups.find_set(*i)].insert(*i);

  for (output::iterator i(out.begin()); i != out.end(); i++)
  {
    std::cout << i->first << ": ";
    for (output::mapped_type::iterator j(i->second.begin()); j != i->second.end(); j++)
      std::cout << *j << " ";
    std::cout << std::endl;
  }
}

int main()
{

  rank_t rank;
  parent_t parent;
  rank_pmap_t rank_index(rank);
  parent_pmap_t parent_index(parent);


  group_sets_t groups( rank_index, parent_index );
  key_set keys;

  int a, b;
  while (std::cin >> a)
  {
    std::cin >> b;
    add_data(a, b, groups, keys);
  }  


  build_output(groups, keys);
  //std::cout << "number of sets: " << 
  //  groups.count_sets(keys.begin()), keys.end()) << std::endl;

}

我花了很多时间学习如何在这个问题上使用 boost::disjoint_sets。似乎没有太多相关的文档。

关于性能方面,disjoint_sets 结构在其主要操作(make_setfind_setunion_set)上的复杂度是 O(α(n)),这个值接近常数,所以如果只是构建这个结构,整个算法的复杂度会是 O(n α(n))(实际上接近 O(n)),但我们还需要打印出来。这就意味着我们需要构建一些关联容器,而这些容器的性能不能好于 O(n log n)。通过选择不同的关联容器(比如 hash_set 等),可能会实现一定的速度提升,因为一旦你填充了初始列表,就可以预留一个最优的空间。

撰写回答