将正数列表分配到指定数量的集合中,使其和尽可能接近

0 投票
1 回答
77 浏览
提问于 2025-04-12 05:27

我发这个帖子的目的是想把我之前的一个帖子链接在这里,在那个帖子里,我被要求提供一个简单的可运行示例,同时也想请教一下代码本身的问题,看看有没有人能建议更好、更快或更可靠的方法。所以我单独发了这个帖子(和可重复性帖子没有直接关系)。

你可以想象这个概念有很多应用场景。
比如,你可能有几个文件需要处理,每个文件的记录数不同,而你想把这些文件的处理分配到N条并行的线路上,确保每条线路处理的记录数大致相同。
或者,你可能有几个不同重量的物体,还有N个人要把它们从A点搬到B点,你希望每个人搬的重量大致相同。

下面是我的代码,基于线性规划(其实就是一个分配问题,目的是最小化绝对差值的总和)。

出于某种原因,当线程数大于1时,运行结果在不同的情况下会有所不同,正如上面提到的帖子所说。
但如果我只用一个线程运行,速度会慢很多,所以这就成了一个两难的局面。

如果有任何改进建议或替代方法,仍然能提供绝对最优解(而不是贪心算法),我将非常感激。

# Take a list of numbers and partition them into a desired number of sets,
# ensuring that the sum within each set is as close as possible
# to the sum of all numbers divided by the number of sets.

# USER SETTINGS
list_of_numbers = [  21614,   22716, 1344708,    8948,  136944,     819,    7109,
         255182,  556354, 1898763, 1239808,  925193,  173237,   64301,
         147896,  824564,   16028, 1021326,  108042,   72221,  368270,
          17467,    2953,   52942,    1855,  739627,  460833,   30955]
N_sets_to_make = 4

import numpy as np
import pulp
from pulp import *

# Calculate the desired size of each set
S = N_sets_to_make
average_size = sum(list_of_numbers) / S
sizes = [average_size] * S

# Create the coefficient matrix
N = len(list_of_numbers)
A = np.array(list_of_numbers, ndmin = 2)

# Create the pulp model
prob = LpProblem("Partitioning", LpMinimize)

# Create the pulp variables
# x_names : binary variables encoding the presence of each initial number in each final set
x_names = ['x_'+str(i) for i in range(N * S)]
x = [LpVariable(x_names[i], lowBound = 0, upBound = 1, cat = 'Integer') for i in range(N * S)]
# X_names : continuous positive variables encoding the absolute difference between each final set sum and the desired size
X_names = ['X_'+str(i) for i in range(S)]
X = [LpVariable(X_names[i], lowBound = 0, cat = 'Continuous') for i in range(S)]

# Add the objective to the model (mimimal sum of X_i)
prob += LpAffineExpression([(X[i], 1) for i in range(S) ])

# Add the constraints to the model

# Constraints forcing each initial number to be in one and only one final set
for c in range(N):
    prob += LpAffineExpression([(x[c+m*N],+1) for m in range(S)]) == 1

# Constraints forcing each final set to be non-empty
for m in range(S):
    prob += LpAffineExpression([(x[i],+1) for i in range(m,(m+1)*N)]) >= 1

# Constraints encoding the absolute values
for m in range(S):    
        cs = [c for c in range(N) if A[0,c] != 0]
        prob += LpAffineExpression([(x[c+m*N],A[0,c]) for c in cs]) - X[m] <= sizes[m]
        prob += LpAffineExpression([(x[c+m*N],A[0,c]) for c in cs]) + X[m] >= sizes[m]

# Solve the model
prob.solve(PULP_CBC_CMD(gapRel = 0, timeLimit = 3600, threads = 1))

# Extract the solution
values_of_x_i = [value(x[i]) for i in range(N * S)]
selected_ind_initial_numbers = [(list(range(N)) * S)[i] for i,l in enumerate(values_of_x_i) if l == 1]
selected_ind_final_sets = [(list((1 + np.repeat(range(S), N)).astype('int64')))[i] for i,l in enumerate(values_of_x_i) if l == 1]
ind_final_set_for_each_initial_number = [x for _, x in sorted(zip(selected_ind_initial_numbers, selected_ind_final_sets))]

# Find the numbers that ended up in each final set
d = dict()
for m, n in sorted(zip(ind_final_set_for_each_initial_number, list_of_numbers)) :
    if m in d :
        d[m].append(n)
    else :
        d[m] = [n]
print(d)

# And their sums
s = [sum(l) for i, l in enumerate(d.values())]
print(s)

# And the absolute differences of their sums from the desired sum
absdiffs = [np.abs(s[i] - sizes[i]) for i in range(len(s))]
print(absdiffs)

# And the absolute fractional differences
print([absdiffs[i]/sizes[i]/S for i in range(len(absdiffs))])

编辑 修改了代码,使用了期望大小的行归一化版本和系数矩阵,通常能更快执行(并且使问题不再依赖于数字的总和)

之后:

sizes = [average_size] * S

添加:

fracs = [1 / S] * S

之后:

A = np.array(list_of_numbers, ndmin = 2)

添加:

An = A / np.sum(A, axis = 1, keepdims = True)

替换:

prob += LpAffineExpression([(x[c+m*N],A[0,c]) for c in cs]) - X[m] <= sizes[m]
prob += LpAffineExpression([(x[c+m*N],A[0,c]) for c in cs]) + X[m] >= sizes[m]

为:

prob += LpAffineExpression([(x[c+m*N],An[0,c]) for c in cs]) - X[m] <= fracs[m]
prob += LpAffineExpression([(x[c+m*N],An[0,c]) for c in cs]) + X[m] >= fracs[m]

此外,出乎我意料的是:

prob.solve(PULP_CBC_CMD(gapRel = 0, timeLimit = 3600, threads = None))

结果比指定线程数时的解决方案快得多(在时间限制到达之前就得到了结果)。


编辑2 报告了使用AirSquid建议的更改和第一版编辑中展示的新代码运行结果。

修改后的代码版本,使用了fracsAn代替sizesA,并且threads = None:运行时间非常快(大约10秒):

{1: [8948, 255182, 1021326, 1344708], 2: [2953, 7109, 17467, 64301, 72221, 108042, 136944, 556354, 739627, 925193], 3: [819, 1855, 21614, 173237, 368270, 824564, 1239808], 4: [16028, 22716, 30955, 52942, 147896, 460833, 1898763]}
[2630164, 2630211, 2630167, 2630133]
[4.75, 42.25, 1.75, 35.75]
[4.514919432450865e-07, 4.015902021495769e-06, 1.6633913698503185e-07, 3.3980709412656508e-06]

我原来的代码,使用sizesA,但没有X变量,按照AirSquid的建议,gapRel = 0.0001threads = None:几乎瞬间运行:

{1: [819, 2953, 64301, 72221, 739627, 824564, 925193], 2: [108042, 255182, 368270, 1898763], 3: [7109, 16028, 22716, 1239808, 1344708], 4: [1855, 8948, 17467, 21614, 30955, 52942, 136944, 147896, 173237, 460833, 556354, 1021326]}
[2629678, 2630257, 2630369, 2630371]
[490.75, 88.25, 200.25, 202.25]
[4.664624655737393e-05, 8.388245050816607e-06, 1.9033949817858644e-05, 1.922405168869868e-05]

两者的结合(fracsAn,没有X变量,gapRel = 0.0001threads = None):几乎瞬间运行:

{1: [819, 17467, 21614, 22716, 64301, 136944, 1021326, 1344708], 2: [1855, 7109, 8948, 30955, 460833, 556354, 739627, 824564], 3: [108042, 255182, 368270, 1898763], 4: [2953, 16028, 52942, 72221, 147896, 173237, 925193, 1239808]}
[2629895, 2630245, 2630257, 2630278]
[273.75, 76.25, 88.25, 109.25]
[2.6020193571229983e-05, 7.247633825776388e-06, 8.388245050816607e-06, 1.0384314694636988e-05]

结论:如果使用max_sum代替绝对差值的单独总和,总是能接近最优解(不仅仅是最大的那一组),那么这个方法可能会非常有利,特别是如果结合归一化,前提是可以接受一个小的非零相对差距。


附录 在进一步测试后

结果发现,即使让max_sum方法运行到完成(即允许找到最优解,而不是被差距或时间限制停止),也并不能得到期望的最优解

举个例子:

list_of_numbers = [10000] + [100, 200, 300, 400, 500] * 5
N_sets_to_make = 3

使用原始方法(最小化绝对差值的总和)基于分数(fracsAn):

# Final sets
{1: [10000],
 2: [200, 300, 300, 400, 500],
 3: [100,
  100,
  100,
  100,
  100,
  200,
  200,
  200,
  200,
  300,
  300,
  300,
  400,
  400,
  400,
  400,
  500,
  500,
  500,
  500]}

# Their sums
[10000, 1700, 5800]

# Absolute differences
[4166.666666666667, 4133.333333333333, 33.33333333333303]

# Fractional differences
[0.23809523809523814, 0.23619047619047617, 0.0019047619047618874]

# Their sum
0.4761904761904762

使用max_sum方法(最小化分数的最大总和):

# Final sets
{1: [10000],
 2: [500],
 3: [100,
  100,
  100,
  100,
  100,
  200,
  200,
  200,
  200,
  200,
  300,
  300,
  300,
  300,
  300,
  400,
  400,
  400,
  400,
  400,
  500,
  500,
  500,
  500]}

# Their sums
[10000, 500, 7000]

# Absolute differences
[4166.666666666667, 5333.333333333333, 1166.666666666667]

# Fractional differences
[0.23809523809523814, 0.30476190476190473, 0.0666666666666667]

# Their sum
0.6095238095238096

1 个回答

2

首先,要明白你的这个小问题在组合上是非常庞大的。

一个非常粗略的下限是:

c(28, 14) * c(14, 7) * c(7,6) * 1

这个数字超过了1e+11。实际上,可能还要大得多。

为了稍微减小模型的规模,可以采用一种迷你-最大策略,意思是尽量减少最大集合的大小。根据你如何定义“比较总和”,这个方法可能是可行的。

这样可以从你的公式中去掉S-1个变量,并大大简化约束矩阵。你的绝对值部分减少了一半,而每个约束只包含1个变量,而不是S个变量。

这样做后,把RelGap调整到一个合理的值,你就能获得几乎瞬间的性能。设置RelGap为0.0001时,几乎是立刻解决。如果你必须保证100%的最优解,那你还是得花费很长时间,因为问题的组合复杂性和底层的整数结构。

我会稍微不同地写这个,但没必要太担心。平面索引和numpy并不是必须的。我会为了清晰起见使用双重索引,而不是像你那样使用仿射表达式,因为模型构建得很快,而且更容易阅读。(可以看看我其他关于pulp的帖子,里面有例子)

稍微的修改

# USER SETTINGS
list_of_numbers = [  21614,   22716, 1344708,    8948,  136944,     819,    7109,
         255182,  556354, 1898763, 1239808,  925193,  173237,   64301,
         147896,  824564,   16028, 1021326,  108042,   72221,  368270,
          17467,    2953,   52942,    1855,  739627,  460833,   30955]
N_sets_to_make = 4

import numpy as np
import pulp
from pulp import *

# Calculate the desired size of each set
S = N_sets_to_make
average_size = sum(list_of_numbers) / S
sizes = [average_size] * S

# Create the coefficient matrix
N = len(list_of_numbers)
A = np.array(list_of_numbers, ndmin = 2)

# Create the pulp model
prob = LpProblem("Partitioning", LpMinimize)

# Create the pulp variables
# x_names : binary variables encoding the presence of each initial number in each final set
x_names = ['x_'+str(i) for i in range(N * S)]
x = [LpVariable(x_names[i], lowBound = 0, upBound = 1, cat = 'Integer') for i in range(N * S)]
# X_names : continuous positive variables encoding the absolute difference between each final set sum and the desired size
X_names = ['X_'+str(i) for i in range(S)]
# X = [LpVariable(X_names[i], lowBound = 0, cat = 'Continuous') for i in range(S)]
max_sum = LpVariable(name='max_sum', cat='Continuous')
# Add the objective to the model (mimimal sum of X_i)
prob += LpAffineExpression(max_sum)

# Add the constraints to the model

# Constraints forcing each initial number to be in one and only one final set
for c in range(N):
    prob += LpAffineExpression([(x[c+m*N],+1) for m in range(S)]) == 1

# Constraints forcing each final set to be non-empty
for m in range(S):
    prob += LpAffineExpression([(x[i],+1) for i in range(m,(m+1)*N)]) >= 1

# Constraints encoding the absolute values
for m in range(S):
        cs = [c for c in range(N) if A[0,c] != 0]
        prob += LpAffineExpression([(x[c+m*N],A[0,c]) for c in cs]) <= max_sum
        # prob += LpAffineExpression([(x[c+m*N],A[0,c]) for c in cs]) + X[m] >= sizes[m]

# Solve the model
prob.solve(PULP_CBC_CMD(gapRel = 0.0001, timeLimit = 3600, threads = 1))

# Extract the solution
values_of_x_i = [value(x[i]) for i in range(N * S)]
selected_ind_initial_numbers = [(list(range(N)) * S)[i] for i,l in enumerate(values_of_x_i) if l == 1]
selected_ind_final_sets = [(list((1 + np.repeat(range(S), N)).astype('int64')))[i] for i,l in enumerate(values_of_x_i) if l == 1]
ind_final_set_for_each_initial_number = [x for _, x in sorted(zip(selected_ind_initial_numbers, selected_ind_final_sets))]

# Find the numbers that ended up in each final set
d = dict()
for m, n in sorted(zip(ind_final_set_for_each_initial_number, list_of_numbers)) :
    if m in d :
        d[m].append(n)
    else :
        d[m] = [n]
print(d)

# And their sums
s = [sum(l) for i, l in enumerate(d.values())]
print(s)

# And the absolute differences of their sums from the desired sum
absdiffs = [np.abs(s[i] - sizes[i]) for i in range(len(s))]
print(absdiffs)

# And the absolute fractional differences
print([absdiffs[i]/sizes[i]/S for i in range(len(absdiffs))])

撰写回答