本次解读的代码来自:
https://github.com/samy-barrech/Flexible-Job-Shop-Scheduling-Problem/tree/master/app
关于生产排程的问题。
6 6 1
6 1 3 1 1 1 3 1 2 6 1 4 7 1 6 3 1 5 6
6 1 2 8 1 3 5 1 5 10 1 6 10 1 1 10 1 4 4
6 1 3 5 1 4 4 1 6 8 1 1 9 1 2 1 1 5 7
6 1 2 5 1 1 5 1 3 5 1 4 3 1 5 8 1 6 9
6 1 3 9 1 2 3 1 5 5 1 6 4 1 1 3 1 4 1
6 1 2 3 1 4 3 1 6 9 1 1 10 1 5 4 1 3 1
第一行:代表6个job,6个机器,每个工序平均需要1台机器
之后的6行代表其中的6job,第二行是第一个job,需要6个工序来完成,第一个工序可以1台机器完成,机器3,需要1个单位时间。
1 parse => jobs_list, machines_list
2 heuristic = Heuristic().select
3 Scheduler(machine_list, jobs_list)
4 Scheduler.run(heuristic)
def run(self, heuristic, verbose=True):# Disable print if verbose is Falseif not verbose:sys.stdout = Nonecurrent_step = 0while len(self.__jobs_to_be_done) > 0: # 只要有未完成工作current_step += 1best_candidates = heuristic(self.__jobs_to_be_done, self.__max_operations, current_step) # 不同规则返回最佳candidatefor id_machine, candidates in best_candidates.items(): # 最佳候选里,是关于多个机器,其能承受最大工序的machine = self.__machines[id_machine - 1] # id转化为对象for activity, operation in candidates:if not (machine.is_working_at_max_capacity() or activity.is_pending): # 符合条件的分配给机器machine.add_operation(activity, operation)for machine in self.__machines: # 机器时间开始流动machine.work()for job in self.__jobs_to_be_done: # 检查更新job是否完整状态if job.is_done:self.__jobs_to_be_done = list(filter(lambda element: element.id_job != job.id_job, self.__jobs_to_be_done))self.__jobs_done.append(job)print(colored("[SCHEDULER]", "green"), "Done in " + str(current_step) + " units of time")# Reenable stdoutif not verbose:sys.stdout = self.__original_stdoutreturn current_step
class Heuristics:# When a choice between multiple operations is available, always pick the first one@staticmethoddef select_first_operation(jobs_to_be_done, max_operations, _):best_candidates = {}for job in jobs_to_be_done: # 从待完成的job里找到best_candidatecurrent_activity = job.current_activity # 向下选择activity/ operation,由于activity要按顺序完成,选择每个job当前best_operation = current_activity.shortest_operation # 当前里最短的工序if best_candidates.get(best_operation.id_machine) is None: # 如果最佳候选里暂时没有这台机器的安排best_candidates.update({best_operation.id_machine: [(current_activity, best_operation)]})elif len(best_candidates.get(best_operation.id_machine)) < max_operations: # 如果最佳候选还能继续安排工作,加上去best_candidates.get(best_operation.id_machine).append((current_activity, best_operation))else: # 机器已经满载了,那么就从里面选list_operations = best_candidates.get(best_operation.id_machine) for key, (_, operation) in enumerate(list_operations):if operation.duration < best_operation.duration: # 原本候选集里比最新这个时间更短的,pop一个出去list_operations.pop(key)breakif len(list_operations) < max_operations: # 如果有pop,就把最新的加上去。否则其实还是原本的候选list_operations.append((current_activity, best_operation))return best_candidates