赞
踩
针对带AGV的作业车间调度问题,以最小化完工时间为目标,考虑AGV在装载站、机器、卸载站之间的有效负载时间和空载时间,采用遗传算法进行求解,设计了面向工件运输次数的一维编码,和面向工件运输的驱动解码,以此来联动工件排序和AGV指派两个调度子问题,达到接近最优解的效果。算法采用Bilge和Ulusoy等人设计的40个算例进行验证。
算例下载可前往个人CSDN:机器人作业车间的算例JSP_Transbot.zip-制造文档类资源-CSDN文库进行下载。
带AGV的作业车间调度问题可描述为:n个工件在m台机器上加工,w台AGV负责工件在机器之间的搬运;每个工件至少有一道工序,每道工序在加工完成后由AGV搬运至下一台机器上进行加工。
对此类问题做如下假设:
(1)机器旁出入缓冲区容量无限大;
(2)所有工件和AGV起始位置和加工完成回到的位置都在L/U站;
(3)不考虑AGV运输过程中的碰撞和故障;
编码形式如下,染色体长度为各工序搬运次数的总和,其中VT11表示工件1由L/U站搬运至第一道工序加工处,VT1E表示工序加工完最后一道工序返回L/U站。
解码采用驱动的方式+间隙解码方法,间隙解码可参考北京科技大学 吴秀丽的某些论文中有提到,贪婪解码的另一种表述。
怎么一个驱动方式呢?
那就是如果这个工件分配了运输任务,那么在运输至下一工序加工机器前,当前工序必须加工完成,以此思想,便可在搬运前驱动该工序加工完成。
此处由于偷了个小懒,AGV的分配采用随机分配的方式,或者采用最早可用的AGV作为当前搬运工序使用的AGV,这也是算法在某些算例中会陷入局部最优的情形,感兴趣的可加入一维关于AGV的编码。
EX53的算例甘特图如下:
- import random
- import numpy as np
- import os
- from utils import RJSP_Env
- from Gantt_graph import Gantt
- import time
- import pickle
-
- class GA:
- def __init__(self, args):
- # self.Shop_Floor,self.args=RFSP_Env(args)
- self.Shop_Floor, self.args = RJSP_Env(args)
- self.Pm = 0.2
- self.Pc = 0.9
- self.Pop_size =200
-
- # 随机产生染色体
- def RCH(self):
- Chromo1=[]
- for i in range(len(self.args.MT)):
- Chromo1.extend([i for _ in range(len(self.args.MT[i])-1)])
- random.shuffle(Chromo1)
- self.args.Chromo_len=len(Chromo1)
- Chromo2=[]
- # print(Chromo1)
-
- return Chromo1
-
- # 生成初始种群
- def CHS(self):
- CHS = []
- for i in range(self.Pop_size):
- CHS.append(self.RCH())
- return CHS
-
- # 选择
- def Select(self, Fit_value):
- Fit = []
- for i in range(len(Fit_value)):
- fit = 1 / Fit_value[i]
- Fit.append(fit)
- Fit = np.array(Fit)
- idx = np.random.choice(np.arange(len(Fit_value)), size=len(Fit_value), replace=True,
- p=(Fit) / (Fit.sum()))
- return idx
-
- # 交叉
- def Crossover(self, CHS1, CHS2):
- Job_list = [i for i in range(self.args.n)]
- random.shuffle(Job_list)
- r = random.randint(1, self.args.n - 1)
- Set1 = Job_list[0:r]
- new_CHS1 = list(np.zeros(self.args.Chromo_len, dtype=int))
- new_CHS2 = list(np.zeros(self.args.Chromo_len, dtype=int))
- for k, v in enumerate(CHS1):
- if v in Set1:
- new_CHS1[k] = v+1
- for i in CHS2:
- if i not in Set1:
- Site = new_CHS1.index(0)
- new_CHS1[Site] = i + 1
-
- for k, v in enumerate(CHS2):
- if v not in Set1:
- new_CHS2[k] = v + 1
- for i in CHS2:
- if i in Set1:
- Site = new_CHS2.index(0)
- new_CHS2[Site] = i + 1
-
- new_CHS1= np.array([j - 1 for j in new_CHS1])
- new_CHS2 = np.array([j - 1 for j in new_CHS2])
- return new_CHS1,new_CHS2
-
- # 变异
- def Mutation(self, CHS):
- Tr = [i_num for i_num in range(self.args.Chromo_len)]
- # 机器选择部分
- r = random.randint(1, self.args.Chromo_len) # 在变异染色体中选择r个位置
- random.shuffle(Tr)
- T_r = Tr[0:r]
- K = []
- for i in T_r:
- K.append(CHS[i])
- random.shuffle(K)
- k = 0
- for i in T_r:
- CHS[i] = K[k]
- k += 1
- return CHS
-
- #解码
- def decode(self,chs,):
- self.Shop_Floor.reset()
- k = 0
- for i in chs:
- agv=random.choice([0,1]) #这里目前写的随机,所以得到的结果不是很好,可改成两层编码
- K = self.Shop_Floor.scheduling(i,agv)
- # Gantt(self.Shop_Floor.Machines, self.Shop_Floor.AGVs)
- k += 1
- # if done:
- # break
- return self.Shop_Floor.C_max()
-
- def main(self,target_C=None,file=None):
- start=time.time()
- BF = []
- C = self.CHS()
- Fit = []
- for C_i in C:
- Fit.append(self.decode(C_i))
- best_C = None
- best_fit = min(Fit)
- BF.append(best_fit)
- for i in range(100):
- C_id = self.Select(Fit)
- C = [C[_] for _ in C_id]
- for Ci in range(len(C)):
- if random.random() < self.Pc:
- _C = [C[Ci]]
- CHS1, CHS2 = self.Crossover(C[Ci], random.choice(C))
- _C.extend([CHS1, CHS2])
- Fi = []
- for ic in _C:
- Fi.append(self.decode(ic))
- C[Ci] = _C[Fi.index(min(Fi))]
- Fit.append(min(Fi))
- elif random.random() < self.Pm:
- CHS1 = self.Mutation(C[Ci])
- C[Ci] = CHS1
- Fit = []
- for C_i in C:
- fi=self.decode(C_i)
- Fit.append(fi)
- # Gantt(self.Shop_Floor.Machines, self.Shop_Floor.AGVs)
- if fi<=97:
- target_C=fi
- Gantt(self.Shop_Floor.Machines, self.Shop_Floor.AGVs)
- if min(Fit) < best_fit:
- best_fit = min(Fit)
- best_C = C[Fit.index(min(Fit))]
- # print(best_C)
- # Gantt(self.Shop_Floor.Machines, self.Shop_Floor.AGVs)
- BF.append(best_fit)
- # print('迭代次数:',i+1,'完工时间:',best_fit)
- x = [_ for _ in range(len(BF))]
- # plt.plot(x, BF)
- # plt.show()
- # best_C.Gantt()
- end=time.time()
- T=end-start
- print('运行时间--->>>>',T)
- return best_C,best_fit
-
- if __name__ == "__main__":
- import argparse
- from Instance.Text_extract import data, data_for_storer
- from Instance.data_extract import change
- import numpy as np
- K=["11",'21','31','41','51','61','71','81','91','101',"12",'22','32','42','52','62','72','82','92','102',
- "13",'23','33','43','53','63','73','83','93','103',"14",'24','34','44','54','64','74','84','94','104']
- for fi in K:
- f='.\Instance\Bilge_Ulusoy\C1\E'+str(53)+'.pkl'
- print(f)
- n, m, PT, agv_trans, MT, agv_num = data(f)
- # n, m, PT, MT=change('la',1)
- # agv_trans=[]
- # print(agv_trans)
- agv_num = 2
-
-
- parser = argparse.ArgumentParser("Reinforcement Learning experiments for multiagent environments")
- # TWO_AGVs_FS_ENV
- parser.add_argument("--n", type=int, default=n, help="Job number")
- parser.add_argument("--m", type=int, default=m, help="Machine number")
- parser.add_argument("--PT", type=list, default=PT, help="PROCESSING TIME OF JOB")
- parser.add_argument("--MT", type=list, default=MT, help="PROCESSING TIME OF JOB")
- parser.add_argument("--AGV_num", type=int, default=2, help="transportation time between machines")
- parser.add_argument("--agv_trans", type=list, default=agv_trans, help="transportation time between machines")
- args = parser.parse_args()
- print('测试算例:EX',fi)
- for i in range(10):
- # print( PT, MT)
- g = GA(args)
- best_C,best_fit=g.main(96)
- print(best_fit)
- # print(g.RCH())
- # g.decode([4, 4 ,0 ,3, 4, 0, 0, 1, 1, 1, 1, 2, 2, 0, 3, 3, 2, 2])
- # Gantt(g.Shop_Floor.Machines,g.Shop_Floor.AGVs)
- from Flow_Shop_Environment.FS_Env import RFS_Env
- from Job_Shop_Environment.JS_Job import Job
- import copy
-
-
- class JS_Env(RFS_Env):
- def __init__(self,args):
- super().__init__(args)
- self.MT=args.MT
-
- def Create_Job(self):
- self.Jobs = []
- for k in range(self.J_num):
- J = Job(k, self.PT[k],self.MT[k])
- self.Jobs.append(J)
-
- def reset(self):
- self.Create_Item()
- for Ji in self.Jobs:
- self.Machines[Ji.handling_machine[0]].put_into_job(Ji,0)
- Ji.Cur_site=Ji.handling_machine[0]
-
- def scheduling(self,action,agv_num):
- done = False
- # 空载
- agv = self.AGVs[agv_num]
- t = agv.end
- selected_job = self.Jobs[action]
- O_num=copy.copy(selected_job.O_num)
- Cur_Machine = self.Machines[selected_job.Cur_site]
- target_Machine = self.Machines[selected_job.next_machine()]
- trans_t1 = self.agv_trans[agv.Current_Site][Cur_Machine.index]
- trans_t2 = self.agv_trans[Cur_Machine.index][target_Machine.index]
-
- # 等待
- if selected_job in Cur_Machine.OB:
- # 获取AGV的移动开始时间
- JS,JE=selected_job.start,selected_job.end
- t,trans_t1 = agv.ST_move(selected_job.end, Cur_Machine.index,trans_t1,trans_t2)
- else:
- JS,JE=Cur_Machine.handling(selected_job)
- # 获取AGV的移动开始时间
- t,trans_t1 = agv.ST_move(selected_job.end, Cur_Machine.index,trans_t1,trans_t2)
-
- agv.send_to(selected_job.Cur_site, t, trans_t1, load=None)
- t1 = t + trans_t1
- Cur_Machine.pick_up_job(selected_job)
-
- # 负载转移
- agv.send_to(selected_job.next_machine(), t1, trans_t2, load=selected_job.index)
- selected_job.Cur_site = selected_job.next_machine()
- selected_job.finished_move()
- t3 = t1 + trans_t2
- selected_job.arrive = t3
-
- target_Machine.put_into_job(selected_job, t3)
- if selected_job.Cur_site == self.M_num:
- selected_job.J = True
- # self.update()
- if self.judge_state():
- done = True
- self.rest_work()
- return done,O_num,t,t3,JS,JE
-
- def scheduling1(self,action):
- wait_Job_IB, wait_Job_OB=0,0
- done = False
- # 空载
- T=[]
- selected_job = self.Jobs[action]
- Cur_Machine = self.Machines[selected_job.Cur_site]
- target_Machine = self.Machines[selected_job.next_machine()]
- Trans=[]
-
- if selected_job not in Cur_Machine.OB:
- # 获取AGV的移动开始时间
- Cur_Machine.handling(selected_job)
- # 获取AGV的移动开始时间
- for agv in self.AGVs:
- # t = agv.end
- trans_t1 = self.agv_trans[agv.Current_Site][Cur_Machine.index]
- trans_t2 = self.agv_trans[Cur_Machine.index][target_Machine.index]
- Trans.append([trans_t1,trans_t2])
- t,trans_t1 = agv.ST_move(selected_job.end, Cur_Machine.index,trans_t1,trans_t2)
- T.append(t)
- agv=self.AGVs[T.index(min(T))]
- t=min(T)
- trans_t1,trans_t2=Trans[T.index(min(T))][0],Trans[T.index(min(T))][1]
- agv.send_to(selected_job.Cur_site, t, trans_t1, load=None)
- t1 = t + trans_t1
- wait_Job_IB = selected_job.start - selected_job.arrive
- Cur_Machine.pick_up_job(selected_job)
- # 负载转移
- agv.send_to(selected_job.next_machine(), t1, trans_t2, load=selected_job.index)
- # print('这是agv',agv.idx,'动作为',agv._on)
- selected_job.Cur_site = selected_job.next_machine()
- selected_job.finished_move()
- t3 = t1 + trans_t2
- selected_job.arrive = t3
-
- target_Machine.put_into_job(selected_job, t3)
- if selected_job.Cur_site == self.M_num - 1:
- selected_job.J = True
- # self.update()
- if self.judge_state():
- done = True
- self.rest_work()
- return done, wait_Job_IB, wait_Job_OB
-
-
- if __name__=="__main__":
- from Gantt_graph import Gantt
- # from simple_MADRL.Params import args
- import random
- import pickle
- import numpy as np
- import argparse
- from Env.Instance.Text_extract import data
-
- fee = r"..\Instance\Bilge_Ulusoy\C1\E71.PKl"
-
- n, m, PT, agv_trans, MT, agv_num = data(fee)
- # print(PT)
- parser = argparse.ArgumentParser("Reinforcement Learning experiments for multiagent environments")
- # TWO_AGVs_FS_ENV
- parser.add_argument("--n", type=int, default=n, help="Job number")
- parser.add_argument("--m", type=int, default=m, help="Machine number")
- parser.add_argument("--PT", type=list, default=PT, help="PROCESSING TIME OF JOB")
- parser.add_argument("--MT", type=list, default=MT, help="PROCESSING TIME OF JOB")
- parser.add_argument("--AGV_num", type=int, default=2, help="transportation time between machines")
- parser.add_argument("--agv_trans", type=list, default=agv_trans, help="transportation time between machines")
- args = parser.parse_args()
-
-
- fs=JS_Env(args)
- # print(args.PT,args.MT)
- fs.reset()
- L=[0, 7, 6, 0, 5, 7, 2, 6, 4, 5, 1, 7, 3, 5, 6, 5, 3, 4, 1, 7, 3, 2, 0, 1, 6, 2, 4]
- for action in L:
- print(action)
- K=fs.scheduling1(action)
- # Gantt(fs.Machines, fs.AGVs)
- # if done:
- # break
- print(fs.C_max())
- Gantt(fs.Machines, fs.AGVs)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。