OR工具在柔性工作车间调度中带转换的CP模型性能

0 投票
1 回答
33 浏览
提问于 2025-04-12 00:27

我正在评估OR工具的cp模型性能,参考的例子可以在这个链接找到:https://github.com/google/or-tools/blob/stable/examples/contrib/scheduling_with_transitions_sat.py

我对代码进行了修改,以生成随机的工作任务,并了解获取第一个和“可接受”解决方案的运行时间——“可接受”是指在时间表中没有太多或太大的空白。

# -*- coding: utf-8 -*-
"""Scheduling problem with transition time between tasks and transitions costs.

@author: CSLiu2
"""


import argparse
import collections
import random

from ortools.sat.python import cp_model
from google.protobuf import text_format

#----------------------------------------------------------------------------
# Command line arguments.
PARSER = argparse.ArgumentParser()
PARSER.add_argument(
    '--problem_instance', default=0, type=int, help='Problem instance.')
PARSER.add_argument(
    '--output_proto',
    default='',
    help='Output file to write the cp_model proto to.')
PARSER.add_argument('--params', default='', help='Sat solver parameters.')


#----------------------------------------------------------------------------
# Intermediate solution printer
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
  """Print intermediate solutions."""

  def __init__(self, makespan):
    cp_model.CpSolverSolutionCallback.__init__(self)
    self.__solution_count = 0
    self.__makespan = makespan

  def OnSolutionCallback(self):
    print('Solution %i, time = %f s, objective = %i, makespan = %i' %
          (self.__solution_count, self.WallTime(), self.ObjectiveValue(),
           self.Value(self.__makespan)))
    self.__solution_count += 1


def main(args):
  """Solves the scheduling with transitions problem."""

  instance = args.problem_instance
  parameters = args.params
  output_proto = args.output_proto

  #----------------------------------------------------------------------------
  # Data.
  # get random jobs
  jobs = []
  sum_tasks = 0

  num_machines = 8
  for i in range(0, 30): # create jobs
      tasks = []
      for j in range(0, random.randint(3, 5)): # create tasks per job
          alternatives = []
          sum_tasks += 1
          machines = list(range(0, num_machines)) # list of machines to randomly pick one for each task
          for k in range(0, 2): # create two alternatives per task
              machineInd = random.randint(0,len(machines)-1)
              alternatives.append((random.randint(6, 35), machines[machineInd], 'R'+str(random.randint(0, 6))))
              del machines[machineInd]
          tasks.append(alternatives)
      jobs.extend([tasks])

  print('total number of tasks: %i' % sum_tasks)
  print('total number of jobs: %i' % len(jobs))
  #----------------------------------------------------------------------------
  # Helper data.
  num_jobs = len(jobs)
  all_jobs = range(num_jobs)
  all_machines = range(num_machines)

  #----------------------------------------------------------------------------
  # Model.
  model = cp_model.CpModel()

  #----------------------------------------------------------------------------
  # Compute a maximum makespan greedily.
  horizon = 0
  for job in jobs:
    for task in job:
      max_task_duration = 0
      for alternative in task:
        max_task_duration = max(max_task_duration, alternative[0])
      horizon += max_task_duration

  print('Horizon = %i' % horizon)

  #----------------------------------------------------------------------------
  # Global storage of variables.
  intervals_per_machines = collections.defaultdict(list)
  presences_per_machines = collections.defaultdict(list)
  starts_per_machines = collections.defaultdict(list)
  ends_per_machines = collections.defaultdict(list)
  resources_per_machines = collections.defaultdict(list)
  ranks_per_machines = collections.defaultdict(list)

  job_starts = {}  # indexed by (job_id, task_id).
  job_presences = {}  # indexed by (job_id, task_id, alt_id).
  job_ranks = {}  # indexed by (job_id, task_id, alt_id).
  job_ends = []  # indexed by job_id

  #----------------------------------------------------------------------------
  # Scan the jobs and create the relevant variables and intervals.
  for job_id in all_jobs:
    job = jobs[job_id]
    num_tasks = len(job)
    previous_end = None
    for task_id in range(num_tasks):
      task = job[task_id]

      min_duration = task[0][0]
      max_duration = task[0][0]

      num_alternatives = len(task)
      all_alternatives = range(num_alternatives)

      for alt_id in range(1, num_alternatives):
        alt_duration = task[alt_id][0]
        min_duration = min(min_duration, alt_duration)
        max_duration = max(max_duration, alt_duration)

      # Create main interval for the task.
      suffix_name = '_j%i_t%i' % (job_id, task_id)
      start = model.NewIntVar(0, horizon, 'start' + suffix_name)
      duration = model.NewIntVar(min_duration, max_duration,
                                 'duration' + suffix_name)
      end = model.NewIntVar(0, horizon, 'end' + suffix_name)

      # Store the start for the solution.
      job_starts[(job_id, task_id)] = start

      # Add precedence with previous task in the same job.
      if previous_end is not None:
        model.Add(start >= previous_end)
      previous_end = end

      # Create alternative intervals.
      l_presences = []
      for alt_id in all_alternatives:
        ### add to link interval with eqp constraint.
        ### process time = -1 cannot be processed at this machine.
        if jobs[job_id][task_id][alt_id][0] == -1:
          continue
        alt_suffix = '_j%i_t%i_a%i' % (job_id, task_id, alt_id)
        l_presence = model.NewBoolVar('presence' + alt_suffix)
        l_start = model.NewIntVar(0, horizon, 'start' + alt_suffix)
        l_duration = task[alt_id][0]
        l_end = model.NewIntVar(0, horizon, 'end' + alt_suffix)
        l_interval = model.NewOptionalIntervalVar(
            l_start, l_duration, l_end, l_presence, 'interval' + alt_suffix)
        l_rank = model.NewIntVar(-1, 1000, 'rank' + alt_suffix)
        l_presences.append(l_presence)
        l_machine = task[alt_id][1]
        l_type = task[alt_id][2]

        # Link the original variables with the local ones.
        model.Add(start == l_start).OnlyEnforceIf(l_presence)
        model.Add(duration == l_duration).OnlyEnforceIf(l_presence)
        model.Add(end == l_end).OnlyEnforceIf(l_presence)

        # Add the local variables to the right machine.
        intervals_per_machines[l_machine].append(l_interval)
        starts_per_machines[l_machine].append(l_start)
        ends_per_machines[l_machine].append(l_end)
        presences_per_machines[l_machine].append(l_presence)
        resources_per_machines[l_machine].append(l_type)
        ranks_per_machines[l_machine].append(l_rank)

        # Store the variables for the solution.
        job_presences[(job_id, task_id, alt_id)] = l_presence
        job_ranks[(job_id, task_id, alt_id)] = l_rank

      # Only one machine can process each lot.
      model.Add(sum(l_presences) == 1)

    job_ends.append(previous_end)

  #----------------------------------------------------------------------------
  # Create machines constraints nonoverlap process
  for machine_id in all_machines:
    intervals = intervals_per_machines[machine_id]
    if len(intervals) > 1:
      model.AddNoOverlap(intervals)

  #----------------------------------------------------------------------------
  # Transition times and transition costs using a circuit constraint.
  switch_literals = []
  for machine_id in all_machines:
    machine_starts = starts_per_machines[machine_id]
    machine_ends = ends_per_machines[machine_id]
    machine_presences = presences_per_machines[machine_id]
    machine_resources = resources_per_machines[machine_id]
    machine_ranks = ranks_per_machines[machine_id]
    intervals = intervals_per_machines[machine_id]
    arcs = []
    num_machine_tasks = len(machine_starts)
    all_machine_tasks = range(num_machine_tasks)

    for i in all_machine_tasks:
      # Initial arc from the dummy node (0) to a task.
      start_lit = model.NewBoolVar('')
      arcs.append([0, i + 1, start_lit])
      # If this task is the first, set both rank and start to 0.
      model.Add(machine_ranks[i] == 0).OnlyEnforceIf(start_lit)
      model.Add(machine_starts[i] == 0).OnlyEnforceIf(start_lit)
      # Final arc from an arc to the dummy node.
      arcs.append([i + 1, 0, model.NewBoolVar('')])
      # Self arc if the task is not performed.
      arcs.append([i + 1, i + 1, machine_presences[i].Not()])
      model.Add(machine_ranks[i] == -1).OnlyEnforceIf(
          machine_presences[i].Not())

      for j in all_machine_tasks:
        if i == j:
          continue

        lit = model.NewBoolVar('%i follows %i' % (j, i))
        arcs.append([i + 1, j + 1, lit])
        model.AddImplication(lit, machine_presences[i])
        model.AddImplication(lit, machine_presences[j])

        # Maintain rank incrementally.
        model.Add(machine_ranks[j] == machine_ranks[i] + 1).OnlyEnforceIf(lit)

        # Compute the transition time if task j is the successor of task i.
        if machine_resources[i] != machine_resources[j]:
          transition_time = 3
          switch_literals.append(lit)
        else:
          transition_time = 0
        # We add the reified transition to link the literals with the times
        # of the tasks.
        model.Add(machine_starts[j] == machine_ends[i] +
                  transition_time).OnlyEnforceIf(lit)
    if arcs:
        model.AddCircuit(arcs)

  #----------------------------------------------------------------------------
  # Objective.
  makespan = model.NewIntVar(0, horizon, 'makespan')
  model.AddMaxEquality(makespan, job_ends)
  makespan_weight = 1
  transition_weight = 5
  #model.Minimize(makespan * makespan_weight +
  #               sum(switch_literals) * transition_weight)
  model.Minimize(makespan)

  #----------------------------------------------------------------------------
  # Write problem to file.
  if output_proto:
    print('Writing proto to %s' % output_proto)
    with open(output_proto, 'w') as text_file:
      text_file.write(str(model))

  #----------------------------------------------------------------------------
  # Solve.
  solver = cp_model.CpSolver()
  solver.parameters.num_search_workers = 16
  solver.parameters.max_time_in_seconds = 60 * 60 * 2
  if parameters:
    text_format.Merge(parameters, solver.parameters)
  solution_printer = SolutionPrinter(makespan)
  status = solver.Solve(model, solution_printer)

  #----------------------------------------------------------------------------
  # Print solution.
  if status == cp_model.FEASIBLE or status == cp_model.OPTIMAL:
    for job_id in all_jobs:
      for task_id in range(len(jobs[job_id])):
        start_value = solver.Value(job_starts[(job_id, task_id)])
        machine = 0
        duration = 0
        select = 0
        rank = -1

        for alt_id in range(len(jobs[job_id][task_id])):
          if jobs[job_id][task_id][alt_id][0] == -1:
            continue

          if solver.BooleanValue(job_presences[(job_id, task_id, alt_id)]):
            duration = jobs[job_id][task_id][alt_id][0]
            machine = jobs[job_id][task_id][alt_id][1]
            select = alt_id
            rank = solver.Value(job_ranks[(job_id, task_id, alt_id)])

        print(
            '  Job %i starts at %i (alt %i, duration %i) with rank %i on machine %i'
            % (job_id, start_value, select, duration, rank, machine))

    print('Solve status: %s' % solver.StatusName(status))
    print('Objective value: %i' % solver.ObjectiveValue())
    print('Makespan: %i' % solver.Value(makespan))


if __name__ == '__main__':
  main(PARSER.parse_args())

我在一台有16个虚拟CPU的机器上进行测试,观察到以下性能表现:

  • 当任务总数稍微低于100时,优化器通常能在几秒钟内给出第一个解决方案。但有时也会需要60到90秒才能得到第一个解决方案。
  • 当任务总数稍微超过100时(在上述参数下很可能会出现这种情况),性能就下降了,优化器可能需要很多分钟才能返回第一个解决方案。我让它运行了一次,结果在500秒后才得到了第一个解决方案。

现在我有几个问题:

  1. 这种性能表现正常吗?还是我在代码中忽略了什么配置问题?

  2. 一般来说,在这个例子中,有什么可以做的性能调优吗?

  3. 在这样的工作车间调度任务中,使用16个CPU的机器进行优化,你认为“可实现”的性能是什么?更具体地说,我希望优化器能在大约15秒内返回第一个解决方案,并在60到90秒内返回一个“可接受”的解决方案。在这个灵活的工作车间调度任务中,考虑到转换时间(也许还包括每个任务的截止日期),你认为优化器大约能处理多少个工作、任务、选择和机器,以便在这个特定机器上在期望的时间范围内做出响应?

非常感谢你的反馈!

1 个回答

0

序列依赖的设置时间(灵活的)作业车间是非常复杂的问题。即使是中等规模的问题(比如100个任务),也不要指望能找到最优解。

使用CP-SAT时,最有效的做法是先写一个贪心的初步解决方案,并给求解器一些提示。因为调度的启发式方法并不适合这类问题。

注意:有一些技巧可以提高性能。

  • 一个好的技巧是计算一个时间段与其他所有时间段之间的最小过渡时间,然后把这个最小时间从过渡时间转移到该时间段的持续时间上。
  • 第二个技巧在jobshop_sat.cc中实现。它创建了一个总工期的时间段(从总工期到时间范围的尽头)。这有助于进行更有效的推理。

撰写回答