在保留最小频率的同时减少集合大小

6 投票
4 回答
569 浏览
提问于 2025-04-21 06:03

假设我有以下一组数据:

{(2,), (3,), (1, 4), (1, 2, 3), (2, 3), (3, 4), (2, 4)}

这组数据中每个数字的出现频率如下:

2: 4, 3: 4, 4: 3, 1: 2

你能想出一种方法来减少这组数据吗?要求每个数字至少出现两次,但同时要尽量减少数据组中的元组数量。

比如,可以把元组 (3, 4) 从数据中去掉,这样剩下的频率就变成:

2: 4, 3: 3, 4: 2, 1: 2

这是我尝试解决这个问题的一个很简单的办法:

def reduce(a, limit):
    while True:
       remove = None
       for i in a:
          c = Counter([i for s in a for i in s])

          if c.most_common(1)[0][0] in i:
             if min([c[j] for j in i]) > limit:
                remove = i
                break

       if remove:
          a.remove(remove)
       else:
          break

reduce(a, 2) # we want at least two of each number

这个解决方案的问题在于,它可能会减少数据集的大小,但不一定能让我得到最小的可能数据集。

以我这个例子为例,我想减少的数据集包含字符串,假设像这样:

a = [("one","eighty one","three"), ("eighty five","sixty one","three", "eleven"), ...] 其中 a 的长度是1000。每个元组的长度在3到9之间。元组可以由100个独特的值组成,比如“one”就是其中一个值。我希望在减少数据集后,每个独特的值至少出现25次。计算这个减少后的数据集可能需要多长时间?是几秒钟,还是很多分钟?

4 个回答

1

这是我的解决方案。我注意到你使用了最多1000个元素的集合,所以我决定用递归的方式来实现这个算法。

首先,我们来定义一个函数,用来获取每个数字在元组中的出现频率:

def get_frequency(tuple_list):
    frequency = {}
    def mapper(element):
        if frequency.has_key(element):
            frequency[element] += 1
        else:
            frequency[element] = 1
    map(lambda t: map(mapper, t), tuple_list)
    return frequency

这个部分相对简单,所以我不会花太多时间在这里。接下来,我决定实现一个主要的函数,叫做 recursive。这个函数会返回一个元组,里面包含可以删除的元素列表和算法能达到的最大深度。

这是我在实现之前写的预算法:

if tuple_list is null : return ([], iteration)
best_deletion = None
for elements:
     if element can't be deleted : continue
     launch the next recursion without the current element in the list
     if the iteration is better than best_iteration or best_iteration is None :
         set the result of recursion in best_deletion
if best_deletion is None : return ([], iteration)
return the best_iteration with adding the current Node inside, and increment the iteration

下面是结果:

def recursive(tuple_list, limit, iteration):
    if tuple_list == []:
        return ([], iteration)

    frequency = get_frequency(tuple_list)

    value = None

    for i in xrange(len(tuple_list)):

        impossible_to_delete = False
        for number in tuple_list[i]:
            frequency[number] -= 1
            if frequency[number] < limit:
                impossible_to_delete = True
                break

        if impossible_to_delete:
            continue

        next_recursion_list = tuple_list[:]
        next_recursion_list.pop(i)

        maximum_deletion = recursive(next_recursion_list, limit, iteration + 1)

        if value == None:
            maximum_deletion[0].insert(0, tuple_list[i])
            value = (maximum_deletion[0], maximum_deletion[1] + 1)
        else:
            if value[1] < maximum_deletion[1]:
                maximum_deletion[0].insert(0, tuple_list[i])
                value = (maximum_deletion[0], maximum_deletion[1] + 1)

    if value == None:
        return ([], iteration)
    return value

之后,只需这样调用这个函数:

items_to_delete = recursive(list(tuple_set), 2, 0)

希望这能帮到你。如果有时间的话,我会测试一下之前提出的算法中哪个是最快的。

1

这个问题至少是 NP 难的,这意味着你无法找到一个高效的(多项式时间)算法。不过,有一些方法可以减少常数时间的影响。除了使用更好的算法外,还可以考虑使用更快的运行环境,比如 PyPy。

下面的代码,如果完整运行,会返回一个尽可能小的子集。此外,它只考虑有效的输入,并且可以逐步输出越来越小的覆盖子集。

from collections import defaultdict
from itertools import product, combinations

def covering_set(sets, min_freq, print_approximations=False):

    # dictionary mapping each unique value to the sets that contain it
    d = defaultdict(list)
    for set_ in sets:
        for elem in set_:
            d[elem].append(set_)

    # we need min_freq number of each unique values
    combos = [combinations(x, min_freq) for x in d.values()]

    #initial solution
    min_cover = sets
    min_length = len(sets)

    #iterate through valid solutions
    #cover is a list of list of sets
    for cover in product(*combos):

        #we must flatten and remove the duplicates in the cover
        covering_set = set()
        for elem_cover in cover:
            for set_ in elem_cover:
                if set_ not in covering_set:
                    covering_set.add(set_)

        #now, we check if it the smallest current solution            
        if len(covering_set) < min_length:
            min_cover = covering_set
            min_length = len(covering_set)
            if print_approximations:
                print(min_length, min_cover)

    return min_cover
2

这里有一个简单粗暴的方法,希望能帮你入门。

不过要注意,这个方法不一定能得到最小的结果集。它会先去掉较小的元组。所以如果你有很多小元组而长元组比较少,这个方法可能对你有用。

一开始是一个有序的集合(列表),但我还没来得及恢复顺序。至少在函数里需要保持顺序,这样计算出来的值才能正确对应。我想把它整理一下,但现在已经很晚了。

def reduce(source, min_count=2):
    print "source: {}".format(source)
    # [(2,), (3,), (1, 4), (1, 2, 3), (2, 3), (3, 4), (2, 4)]
    answer = []

    freq = {}
    lens = []
    for t in source:
        lens.append(len(t))
        for i in t:
            freq[i] = freq.get(i, 0) + 1
    print "freq: {}".format(freq) # {1: 2, 2: 4, 3: 4, 4: 3}
    print "lens: {}".format(lens) # [1, 1, 2, 3, 2, 2, 2]

    from collections import defaultdict
    slens = defaultdict(list)
    for l, t in zip(lens, source):
        slens[l].append(t)
    print "slens: {}".format(slens)
    # {1: [(2,), (3,)], 2: [(1, 4), (2, 3), (3, 4), (2, 4)], 3: [(1, 2, 3)]}

    for l in sorted(slens.keys()):
        for t in slens[l]:
            save = False
            for i in t:
                if (freq[i] <= min_count):
                    save = True
                freq[i] -= 1
            if save:
                answer.append(t)
    print "answer: {}".format(answer) # [(1, 4), (1, 2, 3), (3, 4), (2, 4)]

    freq = {}
    for t in answer:
        for i in t:
            freq[i] = freq.get(i, 0) + 1
    print "freq: {}".format(freq) # {1: 2, 2: 2, 3: 2, 4: 3}

我最初的想法是遍历一遍,保存那些小于最小计数的元组,然后减少工作集。接着对剩下的元组进行评分,频率较低的元素得分更高。然后丢弃那些最低得分的元组,只要它们被移除后不会让任何元素的频率低于最小计数。最后重新计算频率,再从头开始。

7

正如评论中提到的,NP难题中的集合覆盖问题是这个问题的一个特殊情况,其中最小频率是k = 1,这也使得这个问题变得NP难。建议使用像PuLP这样的库,下面是一个整数规划的例子。

minimize sum over tuples T of x(T)
subject to
y(e): for all elements e, (sum over tuples T of (count of e in T) * x(T)) >= k
z(T): for all tuples T, x(T) in {0, 1}

PuLP的一个缺点是它需要一个外部求解器。不过,我当时想动手试试,所以我写了一个(测试非常少的)纯Python求解器。它使用深度优先搜索和最佳优先回溯的方法,并有一个简单的传播策略来确定哪些元组必须选择或不能选择,还有一个基于原始-对偶近似的启发式函数,用于之前程序的对偶问题(所以这只是一个复杂的玩具,但仍然是个玩具)。

maximize (sum over elements e of k * y(e)) - (sum over tuples T of z(T))
subject to
x(T): for all tuples T, (sum over elements e in T of y(e)) - z(T) <= 1
for all elements e, y(e) >= 0
for all tuples T, z(T) >= 0

原始-对偶策略是以相同的速度增加那些y的值,这样增加y并不需要相应地增加z,而且不会造成损失。

from collections import Counter, defaultdict, namedtuple
from fractions import Fraction
from heapq import heappop, heappush
from math import ceil
from operator import itemgetter


class _BestFirstSearchDepthFirstBacktracking:
    def optimize(self):
        node = self._make_root_node()
        heap = []
        upper_bound = None
        while True:
            lower_bound = ceil(node.lower_bound)
            if upper_bound is None or lower_bound < upper_bound:
                child_nodes = list(self._make_child_nodes(node))
                if child_nodes:
                    i, node = min(enumerate(child_nodes), key=itemgetter(1))
                    del child_nodes[i]
                    for child_node in child_nodes:
                        heappush(heap, child_node)
                    continue
                upper_bound = lower_bound
                solution = node
            if not heap:
                return (upper_bound, solution)
            node = heappop(heap)


Node = namedtuple('Node', ('lower_bound', 'index', 'maybes', 'yeses', 'variable'))


class UnsolvableException(Exception):
    pass


class _Optimizer(_BestFirstSearchDepthFirstBacktracking):
    def __init__(self, tuples, min_freq):
        self._index = 0
        self._tuples = set(tuples)
        self._min_freq = min_freq
        self._elements = set()
        for t in self._tuples:
            self._elements.update(t)

    def _propagate(self, maybes, yeses):
        upper_count = Counter()
        for t in maybes:
            upper_count.update(t)
        for t in yeses:
            upper_count.update(t)
        if any(upper_count[e] < self._min_freq for e in self._elements):
            raise UnsolvableException()
        forced_yeses = set()
        forced_yeses = {t for t in maybes if any(upper_count[e] - k < self._min_freq for e, k in Counter(t).items())}
        maybes = maybes - forced_yeses
        yeses = yeses | forced_yeses
        lower_count = Counter()
        for t in yeses:
            lower_count.update(t)
        residual = {e for e in self._elements if lower_count[e] < self._min_freq}
        maybes = {t for t in maybes if any(e in residual for e in t)}
        return (maybes, yeses)

    def _compute_heuristic(self, maybes, yeses):
        lower_count = Counter()
        for t in yeses:
            lower_count.update(t)
        residual_count = {e: max(self._min_freq - lower_count[e], 0) for e in self._elements}
        y = defaultdict(int)
        z = defaultdict(int)
        variable = None
        while True:
            slack = {t: 1 + z[t] - sum(y[e] for e in t) for t in maybes}
            assert all(s >= 0 for s in slack.values())
            inactive_maybes = {t for t, s in slack.items() if s > 0}
            if not inactive_maybes:
                break
            active_maybes = {t for t, s in slack.items() if s == 0}
            active_count = Counter()
            for t in active_maybes:
                active_count.update(t)
            dy = {e: 1 for e, k in residual_count.items() if active_count[e] < k}
            if not dy:
                break
            delta_inverse, variable = max(((Fraction(sum(dy.get(e, 0) for e in t), slack[t]), t) for t in inactive_maybes), key=itemgetter(0))
            delta = Fraction(1, delta_inverse)
            for e, dy_e in dy.items():
                y[e] += delta * dy_e
            for t in active_maybes:
                z[t] += delta * sum(dy.get(e, 0) for e in t)
        return (sum(residual_count[e] * y_e for e, y_e in y.items()) - sum(z.values()), variable)

    def _make_node(self, maybes, yeses):
        maybes, yeses = self._propagate(maybes, yeses)
        heuristic, variable = self._compute_heuristic(maybes, yeses)
        node = Node(len(yeses) + heuristic, self._index, maybes, yeses, variable)
        self._index += 1
        return node

    def _make_root_node(self):
        return self._make_node(self._tuples, set())

    def _make_child_nodes(self, node):
        if node.variable is None:
            return
        variable = {node.variable}
        maybes = node.maybes - variable
        yield self._make_node(maybes, node.yeses)
        yield self._make_node(maybes, node.yeses | variable)


def optimize(tuples, min_freq):
    optimizer = _Optimizer(tuples, min_freq)
    node = optimizer.optimize()[1]
    print('Nodes examined:', optimizer._index)
    return node.yeses


print(optimize({(2,), (3,), (1, 4), (1, 2, 3), (2, 3), (3, 4), (2, 4)}, 2))
print(optimize({(1, 2, 3, 4, 5, 6, 7), (8, 9, 10, 11, 12, 13, 14), (1, 2, 3, 4, 8, 9, 10, 11), (5, 6, 12, 13), (7, 14)}, 1))

撰写回答