在保留最小频率的同时减少集合大小
假设我有以下一组数据:
{(2,), (3,), (1, 4), (1, 2, 3), (2, 3), (3, 4), (2, 4)}
这组数据中每个数字的出现频率如下:
2: 4, 3: 4, 4: 3, 1: 2
你能想出一种方法来减少这组数据吗?要求每个数字至少出现两次,但同时要尽量减少数据组中的元组数量。
比如,可以把元组 (3, 4) 从数据中去掉,这样剩下的频率就变成:
2: 4, 3: 3, 4: 2, 1: 2
这是我尝试解决这个问题的一个很简单的办法:
def reduce(a, limit):
while True:
remove = None
for i in a:
c = Counter([i for s in a for i in s])
if c.most_common(1)[0][0] in i:
if min([c[j] for j in i]) > limit:
remove = i
break
if remove:
a.remove(remove)
else:
break
reduce(a, 2) # we want at least two of each number
这个解决方案的问题在于,它可能会减少数据集的大小,但不一定能让我得到最小的可能数据集。
以我这个例子为例,我想减少的数据集包含字符串,假设像这样:
a = [("one","eighty one","three"), ("eighty five","sixty one","three", "eleven"), ...]
其中 a
的长度是1000。每个元组的长度在3到9之间。元组可以由100个独特的值组成,比如“one”就是其中一个值。我希望在减少数据集后,每个独特的值至少出现25次。计算这个减少后的数据集可能需要多长时间?是几秒钟,还是很多分钟?
4 个回答
这是我的解决方案。我注意到你使用了最多1000个元素的集合,所以我决定用递归的方式来实现这个算法。
首先,我们来定义一个函数,用来获取每个数字在元组中的出现频率:
def get_frequency(tuple_list):
frequency = {}
def mapper(element):
if frequency.has_key(element):
frequency[element] += 1
else:
frequency[element] = 1
map(lambda t: map(mapper, t), tuple_list)
return frequency
这个部分相对简单,所以我不会花太多时间在这里。接下来,我决定实现一个主要的函数,叫做 recursive
。这个函数会返回一个元组,里面包含可以删除的元素列表和算法能达到的最大深度。
这是我在实现之前写的预算法:
if tuple_list is null : return ([], iteration)
best_deletion = None
for elements:
if element can't be deleted : continue
launch the next recursion without the current element in the list
if the iteration is better than best_iteration or best_iteration is None :
set the result of recursion in best_deletion
if best_deletion is None : return ([], iteration)
return the best_iteration with adding the current Node inside, and increment the iteration
下面是结果:
def recursive(tuple_list, limit, iteration):
if tuple_list == []:
return ([], iteration)
frequency = get_frequency(tuple_list)
value = None
for i in xrange(len(tuple_list)):
impossible_to_delete = False
for number in tuple_list[i]:
frequency[number] -= 1
if frequency[number] < limit:
impossible_to_delete = True
break
if impossible_to_delete:
continue
next_recursion_list = tuple_list[:]
next_recursion_list.pop(i)
maximum_deletion = recursive(next_recursion_list, limit, iteration + 1)
if value == None:
maximum_deletion[0].insert(0, tuple_list[i])
value = (maximum_deletion[0], maximum_deletion[1] + 1)
else:
if value[1] < maximum_deletion[1]:
maximum_deletion[0].insert(0, tuple_list[i])
value = (maximum_deletion[0], maximum_deletion[1] + 1)
if value == None:
return ([], iteration)
return value
之后,只需这样调用这个函数:
items_to_delete = recursive(list(tuple_set), 2, 0)
希望这能帮到你。如果有时间的话,我会测试一下之前提出的算法中哪个是最快的。
这个问题至少是 NP 难的,这意味着你无法找到一个高效的(多项式时间)算法。不过,有一些方法可以减少常数时间的影响。除了使用更好的算法外,还可以考虑使用更快的运行环境,比如 PyPy。
下面的代码,如果完整运行,会返回一个尽可能小的子集。此外,它只考虑有效的输入,并且可以逐步输出越来越小的覆盖子集。
from collections import defaultdict
from itertools import product, combinations
def covering_set(sets, min_freq, print_approximations=False):
# dictionary mapping each unique value to the sets that contain it
d = defaultdict(list)
for set_ in sets:
for elem in set_:
d[elem].append(set_)
# we need min_freq number of each unique values
combos = [combinations(x, min_freq) for x in d.values()]
#initial solution
min_cover = sets
min_length = len(sets)
#iterate through valid solutions
#cover is a list of list of sets
for cover in product(*combos):
#we must flatten and remove the duplicates in the cover
covering_set = set()
for elem_cover in cover:
for set_ in elem_cover:
if set_ not in covering_set:
covering_set.add(set_)
#now, we check if it the smallest current solution
if len(covering_set) < min_length:
min_cover = covering_set
min_length = len(covering_set)
if print_approximations:
print(min_length, min_cover)
return min_cover
这里有一个简单粗暴的方法,希望能帮你入门。
不过要注意,这个方法不一定能得到最小的结果集。它会先去掉较小的元组。所以如果你有很多小元组而长元组比较少,这个方法可能对你有用。
一开始是一个有序的集合(列表),但我还没来得及恢复顺序。至少在函数里需要保持顺序,这样计算出来的值才能正确对应。我想把它整理一下,但现在已经很晚了。
def reduce(source, min_count=2):
print "source: {}".format(source)
# [(2,), (3,), (1, 4), (1, 2, 3), (2, 3), (3, 4), (2, 4)]
answer = []
freq = {}
lens = []
for t in source:
lens.append(len(t))
for i in t:
freq[i] = freq.get(i, 0) + 1
print "freq: {}".format(freq) # {1: 2, 2: 4, 3: 4, 4: 3}
print "lens: {}".format(lens) # [1, 1, 2, 3, 2, 2, 2]
from collections import defaultdict
slens = defaultdict(list)
for l, t in zip(lens, source):
slens[l].append(t)
print "slens: {}".format(slens)
# {1: [(2,), (3,)], 2: [(1, 4), (2, 3), (3, 4), (2, 4)], 3: [(1, 2, 3)]}
for l in sorted(slens.keys()):
for t in slens[l]:
save = False
for i in t:
if (freq[i] <= min_count):
save = True
freq[i] -= 1
if save:
answer.append(t)
print "answer: {}".format(answer) # [(1, 4), (1, 2, 3), (3, 4), (2, 4)]
freq = {}
for t in answer:
for i in t:
freq[i] = freq.get(i, 0) + 1
print "freq: {}".format(freq) # {1: 2, 2: 2, 3: 2, 4: 3}
我最初的想法是遍历一遍,保存那些小于最小计数的元组,然后减少工作集。接着对剩下的元组进行评分,频率较低的元素得分更高。然后丢弃那些最低得分的元组,只要它们被移除后不会让任何元素的频率低于最小计数。最后重新计算频率,再从头开始。
正如评论中提到的,NP难题中的集合覆盖问题是这个问题的一个特殊情况,其中最小频率是k = 1
,这也使得这个问题变得NP难。建议使用像PuLP这样的库,下面是一个整数规划的例子。
minimize sum over tuples T of x(T)
subject to
y(e): for all elements e, (sum over tuples T of (count of e in T) * x(T)) >= k
z(T): for all tuples T, x(T) in {0, 1}
PuLP的一个缺点是它需要一个外部求解器。不过,我当时想动手试试,所以我写了一个(测试非常少的)纯Python求解器。它使用深度优先搜索和最佳优先回溯的方法,并有一个简单的传播策略来确定哪些元组必须选择或不能选择,还有一个基于原始-对偶近似的启发式函数,用于之前程序的对偶问题(所以这只是一个复杂的玩具,但仍然是个玩具)。
maximize (sum over elements e of k * y(e)) - (sum over tuples T of z(T))
subject to
x(T): for all tuples T, (sum over elements e in T of y(e)) - z(T) <= 1
for all elements e, y(e) >= 0
for all tuples T, z(T) >= 0
原始-对偶策略是以相同的速度增加那些y
的值,这样增加y
并不需要相应地增加z
,而且不会造成损失。
from collections import Counter, defaultdict, namedtuple
from fractions import Fraction
from heapq import heappop, heappush
from math import ceil
from operator import itemgetter
class _BestFirstSearchDepthFirstBacktracking:
def optimize(self):
node = self._make_root_node()
heap = []
upper_bound = None
while True:
lower_bound = ceil(node.lower_bound)
if upper_bound is None or lower_bound < upper_bound:
child_nodes = list(self._make_child_nodes(node))
if child_nodes:
i, node = min(enumerate(child_nodes), key=itemgetter(1))
del child_nodes[i]
for child_node in child_nodes:
heappush(heap, child_node)
continue
upper_bound = lower_bound
solution = node
if not heap:
return (upper_bound, solution)
node = heappop(heap)
Node = namedtuple('Node', ('lower_bound', 'index', 'maybes', 'yeses', 'variable'))
class UnsolvableException(Exception):
pass
class _Optimizer(_BestFirstSearchDepthFirstBacktracking):
def __init__(self, tuples, min_freq):
self._index = 0
self._tuples = set(tuples)
self._min_freq = min_freq
self._elements = set()
for t in self._tuples:
self._elements.update(t)
def _propagate(self, maybes, yeses):
upper_count = Counter()
for t in maybes:
upper_count.update(t)
for t in yeses:
upper_count.update(t)
if any(upper_count[e] < self._min_freq for e in self._elements):
raise UnsolvableException()
forced_yeses = set()
forced_yeses = {t for t in maybes if any(upper_count[e] - k < self._min_freq for e, k in Counter(t).items())}
maybes = maybes - forced_yeses
yeses = yeses | forced_yeses
lower_count = Counter()
for t in yeses:
lower_count.update(t)
residual = {e for e in self._elements if lower_count[e] < self._min_freq}
maybes = {t for t in maybes if any(e in residual for e in t)}
return (maybes, yeses)
def _compute_heuristic(self, maybes, yeses):
lower_count = Counter()
for t in yeses:
lower_count.update(t)
residual_count = {e: max(self._min_freq - lower_count[e], 0) for e in self._elements}
y = defaultdict(int)
z = defaultdict(int)
variable = None
while True:
slack = {t: 1 + z[t] - sum(y[e] for e in t) for t in maybes}
assert all(s >= 0 for s in slack.values())
inactive_maybes = {t for t, s in slack.items() if s > 0}
if not inactive_maybes:
break
active_maybes = {t for t, s in slack.items() if s == 0}
active_count = Counter()
for t in active_maybes:
active_count.update(t)
dy = {e: 1 for e, k in residual_count.items() if active_count[e] < k}
if not dy:
break
delta_inverse, variable = max(((Fraction(sum(dy.get(e, 0) for e in t), slack[t]), t) for t in inactive_maybes), key=itemgetter(0))
delta = Fraction(1, delta_inverse)
for e, dy_e in dy.items():
y[e] += delta * dy_e
for t in active_maybes:
z[t] += delta * sum(dy.get(e, 0) for e in t)
return (sum(residual_count[e] * y_e for e, y_e in y.items()) - sum(z.values()), variable)
def _make_node(self, maybes, yeses):
maybes, yeses = self._propagate(maybes, yeses)
heuristic, variable = self._compute_heuristic(maybes, yeses)
node = Node(len(yeses) + heuristic, self._index, maybes, yeses, variable)
self._index += 1
return node
def _make_root_node(self):
return self._make_node(self._tuples, set())
def _make_child_nodes(self, node):
if node.variable is None:
return
variable = {node.variable}
maybes = node.maybes - variable
yield self._make_node(maybes, node.yeses)
yield self._make_node(maybes, node.yeses | variable)
def optimize(tuples, min_freq):
optimizer = _Optimizer(tuples, min_freq)
node = optimizer.optimize()[1]
print('Nodes examined:', optimizer._index)
return node.yeses
print(optimize({(2,), (3,), (1, 4), (1, 2, 3), (2, 3), (3, 4), (2, 4)}, 2))
print(optimize({(1, 2, 3, 4, 5, 6, 7), (8, 9, 10, 11, 12, 13, 14), (1, 2, 3, 4, 8, 9, 10, 11), (5, 6, 12, 13), (7, 14)}, 1))