工作者/时间段排列/约束过滤算法
希望大家能帮我一下。这不是工作上的帮助,而是为了一个非常努力的志愿者慈善组织,他们真的需要一个比现在更简单、更不烦人的排班系统。
如果有人知道一个好的第三方应用程序可以自动化这个,那就太好了。请不要推荐一些随机的排班工具,比如教室预定的那些,因为我觉得它们无法满足我们的需求。
提前感谢大家的阅读;我知道这篇帖子很长。我尽量把事情描述得清楚,并且展示我自己也做了一些努力。
问题
我需要一个工人/时间段排班算法,能够为工人生成班次,并满足以下条件:
输入数据
import datetime.datetime as dt
class DateRange:
def __init__(self, start, end):
self.start = start
self.end = end
class Shift:
def __init__(self, range, min, max):
self.range = range
self.min_workers = min
self.max_workers = max
tue_9th_10pm = dt(2009, 1, 9, 22, 0)
wed_10th_4am = dt(2009, 1, 10, 4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)
shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shift_2_times = Range(wed_10th_4am, wed_10th_10am)
shift_3_times = Range(wed_10th_10am, wed_10th_2pm)
shift_1 = Shift(shift_1_times, 2,3) # allows 3, requires 2, but only 2 available
shift_2 = Shift(shift_2_times, 2,2) # allows 2
shift_3 = Shift(shift_3_times, 2,3) # allows 3, requires 2, 3 available
shifts = ( shift_1, shift_2, shift_3 )
joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]
joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)
workers = ( joe, bob, sam, ned, max, amy, jim )
处理过程
从上面来看,班次和工人是两个主要的输入变量。
每个班次都有最少和最多需要的工人数。满足班次的最少工人要求是成功的关键,但如果实在不行,有空缺的排班总比“出错”要好 :) 主要的算法问题是,当有足够的工人时,不应该出现不必要的空缺。
理想情况下,班次的最大工人数量应该被填满,但这相对于其他限制来说优先级最低,所以如果必须妥协,应该优先考虑这一点。
灵活的限制
这些限制有点灵活,如果找不到“完美”的解决方案,可以稍微调整一下边界。不过,这种灵活性应该是最后的手段,而不是随意利用。理想情况下,灵活性可以通过一个“调整因子”变量来配置。
- 两个班次之间有最小的时间间隔。例如,一个工人不应该在同一天被安排两个班次。
- 在给定的时间段内(比如一个月),一个工人可以做的班次数量是有限的。
- 在一个月内,可以做的某些班次(比如夜班)的最大数量也是有限的。
最好有,但不是必要的
如果你能想出一个算法,能够完成上述要求并包含这些附加功能,我会非常感激。即使是一个单独处理这些部分的附加脚本也很好。
重叠班次。例如,能够指定一个“前台”班次和一个“后台”班次同时进行会很好。可以通过不同的班次数据分别调用程序来实现,但这样就会错过在给定时间段内安排多人班次的限制。
可以为每个工人单独指定的最小重新安排时间。例如,如果某个工人感觉工作过于繁重,或者正在处理个人问题,或者是一个刚入门的新人,我们可能希望给他安排的班次少于其他工人。
一种自动化/随机/公平的方式来选择员工,以填补最少班次人数的空缺,当没有合适的工人时。
处理突发取消的方式,只填补空缺,而不重新安排其他班次。
输出测试
算法应该尽可能生成多个匹配的解决方案,每个解决方案看起来像这样:
class Solution:
def __init__(self, shifts_workers):
"""shifts_workers -- a dictionary of shift objects as keys, and a
a lists of workers filling the shift as values."""
assert isinstance(dict, shifts_workers)
self.shifts_workers = shifts_workers
这是一个针对单个解决方案的测试函数,基于上述数据。我认为这是正确的,但我也希望能得到一些同行的审查。
def check_solution(solution):
assert isinstance(Solution, solution)
def shift_check(shift, workers, workers_allowed):
assert isinstance(Shift, shift):
assert isinstance(list, workers):
assert isinstance(list, workers_allowed)
num_workers = len(workers)
assert num_workers >= shift.min_workers
assert num_workers <= shift.max_workers
for w in workers_allowed:
assert w in workers
shifts_workers = solution.shifts_workers
# all shifts should be covered
assert len(shifts_workers.keys()) == 3
assert shift1 in shifts_workers.keys()
assert shift2 in shifts_workers.keys()
assert shift3 in shifts_workers.keys()
# shift_1 should be covered by 2 people - joe, and bob
shift_check(shift_1, shifts_workers[shift_1], (joe, bob))
# shift_2 should be covered by 2 people - sam and amy
shift_check(shift_2, shifts_workers[shift_2], (sam, amy))
# shift_3 should be covered by 3 people - ned, max, and jim
shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))
尝试
我尝试用遗传算法来实现这个,但似乎无法调整到位,所以虽然基本原理在单个班次上似乎有效,但即使是几个班次和几个工人的简单情况也无法解决。
我最近的尝试是生成每一个可能的排列作为解决方案,然后剔除不符合限制的排列。这似乎运行得更快,也让我更进一步,但我在使用python 2.6的itertools.product()来帮助生成排列时,无法完全搞定。如果有很多错误也不奇怪,老实说,这个问题我脑子里没法完全理解 :)
目前我的代码分为两个文件:models.py和rota.py。models.py看起来是:
# -*- coding: utf-8 -*-
class Shift:
def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
self.start = start_datetime
self.end = end_datetime
self.duration = self.end - self.start
self.min_coverage = min_coverage
self.max_coverage = max_coverage
def __repr__(self):
return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)
class Duty:
def __init__(self, worker, shift, slot):
self.worker = worker
self.shift = shift
self.slot = slot
def __repr__(self):
return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
self.worker.dump(indent=indent, depth=depth+1)
print ind + ">"
class Avail:
def __init__(self, start_time, end_time):
self.start = start_time
self.end = end_time
def __repr__(self):
return "<%s to %s>" % (self.start, self.end)
class Worker:
def __init__(self, name, availabilities):
self.name = name
self.availabilities = availabilities
def __repr__(self):
return "<Worker %s Avail=%r>" % (self.name, self.availabilities)
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Worker %s" % self.name
for avail in self.availabilities:
print ind + " " * indent + repr(avail)
print ind + ">"
def available_for_shift(self, shift):
for a in self.availabilities:
if shift.start >= a.start and shift.end <= a.end:
return True
print "Worker %s not available for %r (Availability: %r)" % (self.name, shift, self.availabilities)
return False
class Solution:
def __init__(self, shifts):
self._shifts = list(shifts)
def __repr__(self):
return "<Solution: shifts=%r>" % self._shifts
def duties(self):
d = []
for s in self._shifts:
for x in s:
yield x
def shifts(self):
return list(set([ d.shift for d in self.duties() ]))
def dump_shift(self, s, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<ShiftList"
for duty in s:
duty.dump(indent=indent, depth=depth+1)
print ind + ">"
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Solution"
for s in self._shifts:
self.dump_shift(s, indent=indent, depth=depth+1)
print ind + ">"
class Env:
def __init__(self, shifts, workers):
self.shifts = shifts
self.workers = workers
self.fittest = None
self.generation = 0
class DisplayContext:
def __init__(self, env):
self.env = env
def status(self, msg, *args):
raise NotImplementedError()
def cleanup(self):
pass
def update(self):
pass
而rota.py看起来是:
#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-
from datetime import datetime as dt
am2 = dt(2009, 10, 1, 2, 0)
am8 = dt(2009, 10, 1, 8, 0)
pm12 = dt(2009, 10, 1, 12, 0)
def duties_for_all_workers(shifts, workers):
from models import Duty
duties = []
# for all shifts
for shift in shifts:
# for all slots
for cov in range(shift.min_coverage, shift.max_coverage):
for slot in range(cov):
# for all workers
for worker in workers:
# generate a duty
duty = Duty(worker, shift, slot+1)
duties.append(duty)
return duties
def filter_duties_for_shift(duties, shift):
matching_duties = [ d for d in duties if d.shift == shift ]
for m in matching_duties:
yield m
def duty_permutations(shifts, duties):
from itertools import product
# build a list of shifts
shift_perms = []
for shift in shifts:
shift_duty_perms = []
for slot in range(shift.max_coverage):
slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
shift_duty_perms.append(slot_duties)
shift_perms.append(shift_duty_perms)
all_perms = ( shift_perms, shift_duty_perms )
# generate all possible duties for all shifts
perms = list(product(*shift_perms))
return perms
def solutions_for_duty_permutations(permutations):
from models import Solution
res = []
for duties in permutations:
sol = Solution(duties)
res.append(sol)
return res
def find_clashing_duties(duty, duties):
"""Find duties for the same worker that are too close together"""
from datetime import timedelta
one_day = timedelta(days=1)
one_day_before = duty.shift.start - one_day
one_day_after = duty.shift.end + one_day
for d in [ ds for ds in duties if ds.worker == duty.worker ]:
# skip the duty we're considering, as it can't clash with itself
if duty == d:
continue
clashes = False
# check if dates are too close to another shift
if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
clashes = True
# check if slots collide with another shift
if d.slot == duty.slot:
clashes = True
if clashes:
yield d
def filter_unwanted_shifts(solutions):
from models import Solution
print "possibly unwanted:", solutions
new_solutions = []
new_duties = []
for sol in solutions:
for duty in sol.duties():
duty_ok = True
if not duty.worker.available_for_shift(duty.shift):
duty_ok = False
if duty_ok:
print "duty OK:"
duty.dump(depth=1)
new_duties.append(duty)
else:
print "duty **NOT** OK:"
duty.dump(depth=1)
shifts = set([ d.shift for d in new_duties ])
shift_lists = []
for s in shifts:
shift_duties = [ d for d in new_duties if d.shift == s ]
shift_lists.append(shift_duties)
new_solutions.append(Solution(shift_lists))
return new_solutions
def filter_clashing_duties(solutions):
new_solutions = []
for sol in solutions:
solution_ok = True
for duty in sol.duties():
num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))
# check if many duties collide with this one (and thus we should delete this one
if num_clashing_duties > 0:
solution_ok = False
break
if solution_ok:
new_solutions.append(sol)
return new_solutions
def filter_incomplete_shifts(solutions):
new_solutions = []
shift_duty_count = {}
for sol in solutions:
solution_ok = True
for shift in set([ duty.shift for duty in sol.duties() ]):
shift_duties = [ d for d in sol.duties() if d.shift == shift ]
num_workers = len(set([ d.worker for d in shift_duties ]))
if num_workers < shift.min_coverage:
solution_ok = False
if solution_ok:
new_solutions.append(sol)
return new_solutions
def filter_solutions(solutions, workers):
# filter permutations ############################
# for each solution
solutions = filter_unwanted_shifts(solutions)
solutions = filter_clashing_duties(solutions)
solutions = filter_incomplete_shifts(solutions)
return solutions
def prioritise_solutions(solutions):
# TODO: not implemented!
return solutions
# prioritise solutions ############################
# for all solutions
# score according to number of staff on a duty
# score according to male/female staff
# score according to skill/background diversity
# score according to when staff last on shift
# sort all solutions by score
def solve_duties(shifts, duties, workers):
# ramify all possible duties #########################
perms = duty_permutations(shifts, duties)
solutions = solutions_for_duty_permutations(perms)
solutions = filter_solutions(solutions, workers)
solutions = prioritise_solutions(solutions)
return solutions
def load_shifts():
from models import Shift
shifts = [
Shift(am2, am8, 2, 3),
Shift(am8, pm12, 2, 3),
]
return shifts
def load_workers():
from models import Avail, Worker
joe_avail = ( Avail(am2, am8), )
sam_avail = ( Avail(am2, am8), )
ned_avail = ( Avail(am2, am8), )
bob_avail = ( Avail(am8, pm12), )
max_avail = ( Avail(am8, pm12), )
joe = Worker("joe", joe_avail)
sam = Worker("sam", sam_avail)
ned = Worker("ned", sam_avail)
bob = Worker("bob", bob_avail)
max = Worker("max", max_avail)
return (joe, sam, ned, bob, max)
def main():
import sys
shifts = load_shifts()
workers = load_workers()
duties = duties_for_all_workers(shifts, workers)
solutions = solve_duties(shifts, duties, workers)
if len(solutions) == 0:
print "Sorry, can't solve this. Perhaps you need more staff available, or"
print "simpler duty constraints?"
sys.exit(20)
else:
print "Solved. Solutions found:"
for sol in solutions:
sol.dump()
if __name__ == "__main__":
main()
在结果之前剪掉调试输出,目前给出的结果是:
Solved. Solutions found:
<Solution
<ShiftList
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker joe
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker sam
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker ned
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
>
<ShiftList
<Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
<Worker bob
<2009-10-01 08:00:00 to 2009-10-01 12:00:00>
>
>
<Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
<Worker max
<2009-10-01 08:00:00 to 2009-10-01 12:00:00>
>
>
>
>
3 个回答
我没有具体的算法选择,但可以分享一些实际的考虑因素。
因为这个算法是处理取消的,所以每当出现调度异常时,它都需要运行,以便重新安排每个人的时间。
要注意,有些算法并不是很线性,可能会从那一刻起大幅度重新安排每个人的时间。你可能想要避免这种情况,因为大家都喜欢提前知道自己的日程安排。
你可以在不重新运行算法的情况下处理一些取消,因为它可以提前安排下一个可用的人或两个人。
虽然可能并不总是能生成最优的解决方案,但你可以记录每个工作人员的“次优”事件数量,并在需要分配另一个“糟糕选择”时,总是选择数量最少的工作人员。这是人们通常关心的事情(频繁或不公平的几个“糟糕”调度决策)。
我尝试用遗传算法来实现这个,但总是调试不好。虽然基本原理在单个班次上似乎有效,但对于几个班次和几个工人,即使是简单的情况也无法解决。
总之,别这样做!除非你对遗传算法有很多经验,否则你很难做到这一点。
- 遗传算法是一种近似方法,不能保证能找到一个可行的解决方案。
- 它们只有在你能合理评估当前解决方案的质量时才能有效(比如,未满足的标准数量)。
- 它们的效果严重依赖于你用来组合或变异之前解决方案的操作方法的质量。
如果你对遗传算法几乎没有经验,想在小的Python程序中搞定这个问题是非常困难的。如果你只有一小组人,穷举搜索其实是个不错的选择。问题是,对于n
个人可能还行,但对于n+1
个人就会变得很慢,而对于n+2
个人则会慢得让人无法忍受,可能你的n
最终会低到10。
你正在处理一个NP完全问题,没有简单的解决方案。如果你选择的复杂排班问题效果不好,那么用你的Python脚本得到更好的结果的可能性非常小。
如果你坚持要用自己的代码来做,使用最小-最大算法或者模拟退火算法会简单得多,能得到一些结果。
好的,我对某个特定的算法不太了解,但我可以告诉你一些需要考虑的事情。
评估
无论你用什么方法,都需要一个函数来评估你的解决方案有多符合要求。你可以采取“比较”的方式(没有全局评分,但可以比较两个解决方案),不过我建议还是进行评估。
如果你能在较短的时间内得到一个评分,比如每天评估一次,那就太好了。这样在算法中,如果你能从部分解决方案(比如仅仅是7天中的前3天)预测最终评分的范围,那就很有帮助。这样一来,如果这个部分解决方案的评分已经低到达不到你的期望,你就可以提前停止计算。
对称性
在这200个人中,很可能有一些人有相似的特征,比如可用性、经验、意愿等等。如果你拿两个有相同特征的人,他们的角色是可以互换的:
- 方案1: (班次1: 乔)(班次2: 吉姆)...其他工人...
- 方案2: (班次1: 吉姆)(班次2: 乔)...其他工人...
从你的角度来看,这实际上是同一个解决方案。
好消息是,通常情况下,特征的数量会少于人数,这样可以大大减少计算所需的时间!
比如,假设你是基于方案1生成所有解决方案的,那么就不需要再基于方案2进行任何计算。
迭代
与其一次性生成整个日程,不如考虑逐步生成,比如每次生成一周的安排。这样做的好处是,每周的复杂度降低了(可能性减少了)。
然后,一旦你有了这一周的安排,就可以计算下一周,当然要注意把第一周的安排考虑进去。
这样做的好处是,你可以明确设计你的算法,使其考虑到已经使用的解决方案,这样在下次生成日程时,就能确保不会让某个人连续工作24小时!
序列化
你应该考虑对你的解决方案对象进行序列化(选择你喜欢的方式,Python中的pickle就不错)。在生成新的日程时,你需要之前的安排,我敢打赌你不想为200个人手动输入这些信息。
穷举
经过这些考虑后,我其实更倾向于使用穷举搜索,因为利用对称性和评估后,可能性并不会太多(不过这个问题仍然是NP完全的,没有绝对的解决办法)。
你可以尝试一下回溯算法。
另外,你还应该看看以下链接,它们涉及类似的问题:
- Python数独求解器
- 使用Knuth舞蹈链接的Java数独求解器(使用回溯算法)
这两个链接都讨论了在实现过程中遇到的问题,看看它们应该会对你有帮助。