我使用python的ortools优化库建立了一个工作车间调度算法,问题是当我做了一个灵活的工作车间模型与设置时间,它不工作,我认为这是由于我做的弧线,如果有任何人在这里谁可以解释更多的电路约束,这将帮助我。顺便说一下,当我使用一台机器,它的工作。
代码。
from __future__ import print_function
import collections
from google.protobuf import text_format
from ortools.sat.python import cp_model
# Import Python wrapper for or-tools CP-SAT solver.
from ortools.sat.python import cp_model
# Intermediate solution printer
class SolutionPrinter(cp_model.CpSolverSolutionCallback):
"""Print intermediate solutions."""
def __init__(self):
cp_model.CpSolverSolutionCallback.__init__(self)
self.__solution_count = 0
def on_solution_callback(self):
"""Called after each new solution found."""
print('Solution %i, time = %f s, objective = %i' %
(self.__solution_count, self.WallTime(), self.ObjectiveValue()))
self.__solution_count += 1
def MinimalJobshopSat():
"""Minimal jobshop problem."""
# Create the model.
model = cp_model.CpModel()
jobs_data = [[(0, 2546), (1, 2000), (2, 1400)],
[(0, 1289), (1, 2546), (2, 2546)],
[(0, 2839), (1, 1576), (2, 1200)]
]
setup_times = [
[
[3559, 1638, 2000],
[1442, 3010, 1641],
[1728, 3583, 3243]],
[
[3559, 1638, 2000],
[1442, 3010, 1641],
[1728, 3583, 3243]],
[
[3559, 1638, 2000],
[1442, 3010, 1641],
[1728, 3583, 3243]]
]
all_jobs = range(len(jobs_data))
machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)
for machine in all_machines:
for job_id in all_jobs:
min_incoming_setup = min(
setup_times[machine][j][job_id] for j in all_jobs)
if min_incoming_setup == 0:
continue
print('job %i at machine %i has a min incoming setup of %i' %
(job_id, machine, min_incoming_setup))
# We can transfer some setup times to the duration of the job.
jobs_data[job_id][machine] = (machine, jobs_data[job_id][machine][1] + min_incoming_setup)
# Decrease corresponding incoming setup times.
for j in all_jobs:
setup_times[machine][j][job_id] -= min_incoming_setup
# Computes horizon dynamically as the sum of all durations.
horizon = sum(task[1] for job in jobs_data for task in job)
for times in setup_times:
for time in times:
horizon += max(time)
# Named tuple to store information about created variables.
task_type = collections.namedtuple('task_type', 'start end interval')
# Named tuple to manipulate solution information.
assigned_task_type = collections.namedtuple('assigned_task_type',
'start job index duration')
# Creates job intervals and add to the corresponding machine lists.
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)
starts = collections.defaultdict(list)
ends = collections.defaultdict(list)
for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
duration = task[1]
suffix = '_%i_%i' % (job_id, task_id)
start_var = model.NewIntVar(0, horizon, 'start' + suffix)
end_var = model.NewIntVar(0, horizon, 'end' + suffix)
interval_var = model.NewIntervalVar(start_var, duration, end_var,
'interval' + suffix)
all_tasks[job_id, task_id] = task_type(
start=start_var, end=end_var, interval=interval_var)
machine_to_intervals[machine].append(interval_var)
starts[machine].append(start_var)
ends[machine].append(end_var)
# Create and add disjunctive constraints.
for machine in all_machines:
model.AddNoOverlap(machine_to_intervals[machine])
#----------------------------------------------------------------------------
# Transition times using a circuit constraint.
list_arcs = []
for machine in all_machines:
arcs = []
for i in all_jobs:
# Initial arc from the dummy node (0) to a task.
start_lit = model.NewBoolVar('')
arcs.append([0, i + 1, start_lit])
# If this task is the first, set to minimum starting time.
min_start_time = min(0,setup_times[machine][0][i])
model.Add(starts[machine][i] == min_start_time).OnlyEnforceIf(start_lit)
# Final arc from an arc to the dummy node.
arcs.append([i + 1, 0, model.NewBoolVar('')])
for j in all_jobs:
if i == j:
continue
lit = model.NewBoolVar('%i_%i follows %i_%i' % (j, machine, i, machine))
arcs.append([i + 1, j + 1, lit])
# We add the reified precedence to link the literal with the times of the
# two tasks.
# If release_dates[j] == 0, we can strenghten this precedence into an
# equality as we are minimizing the makespan.
model.Add(starts[machine][j] >=
ends[machine][i] + setup_times[machine][i][j]).OnlyEnforceIf(lit)
list_arcs.append(arcs)
model.AddCircuit(arcs)
#----------------------------------------------------------------------------
# Precedences inside a job.
for job_id, job in enumerate(jobs_data):
for task_id in range(len(job) - 1):
model.Add(all_tasks[job_id, task_id +
1].start >= all_tasks[job_id, task_id].end)
# Makespan objective.
obj_var = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(obj_var, [
all_tasks[job_id, len(job) - 1].end
for job_id, job in enumerate(jobs_data)
])
model.Minimize(obj_var)
# Solve model.
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = 60
status=solver.Solve(model)
solver.parameters
solution_printer = SolutionPrinter()
solver.SolveWithSolutionCallback(model, solution_printer)
print(solver.ResponseStats())
if status == cp_model.FEASIBLE:
# Create one list of assigned tasks per machine.
assigned_jobs = collections.defaultdict(list)
for job_id, job in enumerate(jobs_data):
for task_id, task in enumerate(job):
machine = task[0]
assigned_jobs[machine].append(
assigned_task_type(
start=solver.Value(all_tasks[job_id, task_id].start),
job=job_id,
index=task_id,
duration=task[1]))
# Create per machine output lines.
output = ''
for machine in all_machines:
# Sort by starting time.
assigned_jobs[machine].sort()
sol_line_tasks = 'Machine ' + str(machine) + ': '
sol_line = ' '
for assigned_task in assigned_jobs[machine]:
name = 'job_%i_%i' % (assigned_task.job, assigned_task.index)
# Add spaces to output to align columns.
sol_line_tasks += '%-10s' % name
start = assigned_task.start
duration = assigned_task.duration
sol_tmp = '[%i,%i]' % (start, start + duration)
# Add spaces to output to align columns.
sol_line += '%-10s' % sol_tmp
sol_line += '\n'
sol_line_tasks += '\n'
output += sol_line_tasks
output += sol_line
# Finally print the solution found.
print('Optimal Schedule Length: %i' % solver.ObjectiveValue())
print(output)
if __name__ == '__main__':
MinimalJobshopSat()()
如果一个任务是可选的,你需要在这个弧线对应的节点上添加一个自循环弧线。
因此,让我们假设task_i的布尔存在字面lit_i,你需要添加以下内容
arcs.append([i + 1, i + 1, lit_i.Not()])