<h2>TL;博士</h2>
<p>我们在这里提出了一种有效的算法,虽然最坏的情况仍然可能是指数时间(尽管形式分析可能证明即使最坏的情况也比指数时间好得多),但实际上它具有<code>O(n^2)</code>中值时间,并且适用于数百列。作为测试,在单个核上的<strong>15min31s中发现了随机<code>n=120</code>(选择器列)x<code>nrows=3500</code>{<cd4>}的解决方案</p>
<p>警告:下面的答案相当长</p>
<h2>更好的算法解</h2>
<p>虽然我的<a href="https://stackoverflow.com/a/65361140/758174">other answer</a>集中讨论了如何使用pandas解决这个问题这一琐碎的问题,但它只提供了一个蛮力算法(简单且适用于少量列)。在这个答案中,我讨论了计算复杂性,以及如何降低计算复杂性以处理OP提到的那种规模(超过100列,数千行)</p>
<p>我最初的直觉是,问题可能是<a href="https://en.wikipedia.org/wiki/NP-hardness" rel="nofollow noreferrer">NP-hard</a>。蛮力方法具有指数复杂性(它明确地考虑了<code>O(2^n)</code>列的<code>n</code>组合)。这显然是不可行的,即使是中等规模的<code>n</code>。其他经典问题也属于这一类,例如<a href="https://en.wikipedia.org/wiki/Travelling_salesman_problem" rel="nofollow noreferrer">Travelling Salesman</a>和看似简单的<a href="https://en.wikipedia.org/wiki/Subset_sum_problem" rel="nofollow noreferrer">Subset sum</a></p>
<p>没有明显的方法可以确定所有组合的搜索路径中的部分解将导致最优解。事实上,观察暴力所发现的解决方案,按分数递减排序,表明它们的排列可能相当复杂。以下是随机生成的<code>df</code>和<code>n=9</code>(和500行)的<em>所有</em>列组合的热图。行已按原始<code>df.profit</code>降序排序。列是所有512个解决方案,按从最佳(左)到最差(右)的顺序排列。热图的内容是生成的有效过滤器(或对该解决方案选择的所有布尔列进行筛选),<code>True</code>显示为黑色,<code>False</code>显示为白色:</p>
<p><a href="https://i.stack.imgur.com/QVsvk.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/QVsvk.png" alt="solutions can be intricate"/></a></p>
<p>然而,我们可以通过使用下一节描述的算法找到避免探索完整组合的方法</p>
<h2>波束搜索算法</h2>
<p>A<a href="https://en.wikipedia.org/wiki/Beam_search" rel="nofollow noreferrer">Beam search</a>是“<em>一种启发式算法,通过在有限的集合中展开最有希望的节点来探索一个图。</em>”</p>
<p>我们使用这种方法来维护一组值得探索的候选项(每个候选项都是有前途的列子集)。虽然经常被用作寻找合理好的解决方案(但不一定是最好的)的启发式方法,<strong>在我们的例子中,我们只放弃那些可以证明不会导致最优的路径。因此,只要我们让算法完成,<strong>我们就可以保证提供最佳解决方案。我们还观察到,中途中断搜索仍然会产生很好的解决方案,通常是最优解本身</p>
<h2>候选人的定义</h2>
<p>A<code>candidate</code>是一个特定的列子集(用于筛选<code>df</code>的列的选择),其得分对应于它产生的利润。除了本身是一个解决方案之外,它还可以通过将其与另一个候选项合并,作为“后代”候选项的前缀。例如,一个候选<code>{a,c}</code>可以与另一个<code>{d,z}</code>合并以产生一个新的候选<code>{a,c,d,z}</code></p>
<p>对于候选<code>x</code>,我们存储以下数量:</p>
<ul>
<li><code>x.k</code>:列集合</li>
<li><code>x.p</code>:行的指示符,其中<code>profit > 0</code></li>
<li><code>x.n</code>:行的指示符,其中<code>profit < 0</code>(我们忽略<code>profit == 0</code>)</li>
<li><code>x.tot</code>:<code>sum(profit | (x.p + x.n))</code>(该候选人的总利润)</li>
<li><code>x.totp</code>:<code>sum(profit | x.p)</code>(该候选人的正利润之和)</li>
</ul>
<h2>开辟探索之路</h2>
<p>一些观察结果引导我们积极缩短勘探路径:</p>
<ol>
<li>候选<code>x</code>及其所有后代所能获得的最大利润是<code>x.totp</code>:在列集中添加列时,可能会减少负行集,但永远不会增加一组正数。因此<code>x.totp</code>是该候选人勘探路径上任何利润的上界</李>
<li>每个候选者都应该被唯一访问:在通过将<code>{a}</code>与<code>{b}</code>合并来访问<code>{a,b}</code>之后,我们可能会无意中通过将<code>{b}</code>与<code>{a}</code>合并来重新访问同一候选者。因此,当将<code>x</code>与<code>y</code>合并时,我们要求<code>x.k</code>中的所有列严格小于每一列<code>y.k</code>(<code>max(x.k) < min(y.k)</code>)</李>
<li>当考虑两个候选项的合并时,如果一个中的任何一组负列是另一个的子集,则没有改进的机会:如果<code>x.n < y.n or y.n < x.n</code>(<code><</code>这里是“is strict subset”集合运算符),则拒绝</李>
</ol>
<p>这将导致以下实施:</p>
<pre class="lang-py prettyprint-override"><code>class candidate(namedtuple('Cand', 'tot totp k p n')):
"""
A candidate solution or partial solution.
k: (colset) set of columns (e.g. {'a', 'b', 'c'})
tot: sum(profit | (p | n)) (sum of profit for all rows selected by this colset)
totp: sum(profit | p) (sum of profit for positive-only rows selected by this colset)
p: bool np.array indicating where profit > 0 for this colset
n: bool np.array indicating where profit < 0 for this colset
"""
def name(self):
cols = ''.join(sorted(self.k))
return cols
def __str__(self):
cols = ''.join(sorted(self.k))
return f'{cols} ({self.tot:.2f}, max {self.totp:.2f}, '
f'|p| = {self.p.sum()}, |n| = {self.n.sum()})'
def __repr__(self):
return str(self)
def make_candidate(df, k):
truth = df[k].values if k else np.ones(df.shape[0], dtype=bool)
xk = frozenset({k} if k else {}) # frozenset can be used as dict key, if needed
xp = (truth & (df['profit'] > 0)).values
xn = (truth & (df['profit'] < 0)).values
xtotp = df.loc[xp, 'profit'].sum()
xtot = xtotp + df.loc[xn, 'profit'].sum()
return candidate(xtot, xtotp, xk, xp, xn)
def merge(beam, x, y):
"""merge two candidates x, y if deemed viable, else return None"""
if max(x.k) >= min(y.k):
return None # avoid visiting same colset several times
if (x.totp < y.tot or y.totp < x.tot:
return None # z could never best x or y
zn = x.n * y.n # intersection
zp = x.p * y.p
ztotp = beam.df.loc[zp, 'profit'].sum()
if ztotp < beam.best.tot or ztotp <= x.tot or ztotp <= y.tot:
return None # z could never best the beam's best so far, or x or y
ztot = ztotp + beam.df.loc[zn, 'profit'].sum()
z = candidate(ztot, ztotp, x.k.union(y.k), zp, zn)
return z
class Beam:
def __init__(self, df, best, singles, sol):
self.df = df
self.best = best
self.singles = singles
self.sol = sol
self.loops = 0
@classmethod
def from_df(cls, df):
cols = [k for k in df.columns if k != 'profit']
# make solutions, first: empty set, then single-column ones
oa = make_candidate(df, None)
singles = [make_candidate(df, k) for k in cols]
best = max([oa] + singles, key=lambda x: x.tot)
singles = [x for x in singles if x.totp > best.tot]
return cls(df, best, singles, singles.copy())
def add_candidate(self, z):
if z is None:
return False
self.sol.append(z)
if z.tot > self.best.tot:
self.best = z
self.prune()
return True
def prune(self):
"""remove solutions that cannot become better than the current best"""
self.sol = [x for x in self.sol if x.totp >= self.best.tot]
def __str__(self):
return f'Beam: best: {self.best}, |sol| = {len(self.sol)}, ' \
f'|singles| = {len(self.singles)}, loops = {self.loops}'
def __repr__(self):
return str(self)
def optimize(self, max_iters=None, report_freq=None):
i = 0
while self.sol and (max_iters is None or i < max_iters):
if report_freq is not None and i % report_freq == 0:
print(f'loop {i:5d}, beam = {self}')
x = self.sol.pop(0)
for y in self.singles:
self.add_candidate(merge(self, x, y))
i += 1
self.loops += 1
if report_freq:
print(f'done {i:5d}, beam = {self}')
</code></pre>
<p>对于实验,我们可以生成随机帧:</p>
<pre class="lang-py prettyprint-override"><code>def gen_colnames():
for n in count(1):
for t in combinations(ascii_lowercase, n):
yield ''.join(t)
def colnames(n):
if n > len(ascii_lowercase):
return [f'c{k}' for k in range(n)]
else:
return list(islice(gen_colnames(), n))
def generate_df(ncols, nrows = 500):
df = pd.DataFrame({'profit': np.random.normal(scale=100, size=nrows)})
cols = pd.DataFrame(np.random.choice([False, True], size=(nrows, ncols)), columns=colnames(ncols))
df = pd.concat([df, cols], axis=1)
return df
</code></pre>
<p>然后我们可以测试它:</p>
<pre class="lang-py prettyprint-override"><code>df = generate_df(15)
%%time
res = brute_force_all(df)
# CPU times: user 40.9 s, sys: 68 ms, total: 41 s
%%time
beam = Beam.from_df(df)
beam.optimize()
# CPU times: user 165 ms, sys: 3 µs, total: 165 ms
# verify that we have the same solution as the brute-force
assert beam.best.k == res.colset.iloc[0]
# summary of the beam result:
beam
# Beam: best: bf (2567.64, max 5930.26, |p| = 68, |n| = 51), |sol| = 0, |singles| = 15, loops = 228
</code></pre>
<h2>演出</h2>
<p>我们不提供解决方案的时间或空间复杂性的正式分析(我们将留给读者作为练习…),但以下是在各种大小的随机帧上进行的一系列运行中获得的一些经验观察结果:</p>
<p><a href="https://i.stack.imgur.com/oezFu.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/oezFu.png" alt="log-log plot of times"/></a></p>
<p>此日志图显示了蛮力和波束搜索的单个运行时间(点)和中间时间(线)。正如预期的那样,暴力时间是指数级的。波束搜索速度快得多,中值时间似乎是多项式<code>n</code>(对数图上的一条直线)。使用2次多项式(因此,<code>O(n^2)</code>)获得了对中值时间的合理拟合:</p>
<p><a href="https://i.stack.imgur.com/ND2le.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/ND2le.png" alt="polyfit degree-2"/></a></p>
<p>请注意,直到大小<code>n=64</code>,在数千次运行中,我们从未达到我们设置的最大预算<code>max_iters = 100_000</code>。因此,所有的解决方案都是最优的</p>
<p>还请注意,根据搜索曲面的复杂性,实际时间存在长尾离散。以下是<code>n = 64</code>的单个时间直方图:</p>
<p><a href="https://i.stack.imgur.com/Av1xa.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/Av1xa.png" alt="histogram n=64 beam times"/></a></p>
<p>但提前停止搜索的能力缓解了这一问题。事实上,我们的观察结果是,即使在最长的情况下,非常好的解决方案(或者通常是最优方案本身)也会在搜索的早期出现</p>