当前位置:   article > 正文

用python实现基于遗传算法求解带AGV的作业车间调度问题_python遗传算法agv调度

python遗传算法agv调度

1 项目描述

针对带AGV的作业车间调度问题,以最小化完工时间为目标,考虑AGV在装载站、机器、卸载站之间的有效负载时间和空载时间,采用遗传算法进行求解,设计了面向工件运输次数的一维编码,和面向工件运输的驱动解码,以此来联动工件排序和AGV指派两个调度子问题,达到接近最优解的效果。算法采用Bilge和Ulusoy等人设计的40个算例进行验证。

算例下载可前往个人CSDN:机器人作业车间的算例JSP_Transbot.zip-制造文档类资源-CSDN文库进行下载。

 2 问题描述

带AGV的作业车间调度问题可描述为:n个工件在m台机器上加工,w台AGV负责工件在机器之间的搬运;每个工件至少有一道工序,每道工序在加工完成后由AGV搬运至下一台机器上进行加工。

对此类问题做如下假设:

(1)机器旁出入缓冲区容量无限大;

(2)所有工件和AGV起始位置和加工完成回到的位置都在L/U站;

(3)不考虑AGV运输过程中的碰撞和故障;

3 算法简述

3.1 编码

编码形式如下,染色体长度为各工序搬运次数的总和,其中VT11表示工件1由L/U站搬运至第一道工序加工处,VT1E表示工序加工完最后一道工序返回L/U站。

3.2 解码

解码采用驱动的方式+间隙解码方法,间隙解码可参考北京科技大学 吴秀丽的某些论文中有提到,贪婪解码的另一种表述。

怎么一个驱动方式呢?

那就是如果这个工件分配了运输任务,那么在运输至下一工序加工机器前,当前工序必须加工完成,以此思想,便可在搬运前驱动该工序加工完成。

3.3 关于AGV的分配

此处由于偷了个小懒,AGV的分配采用随机分配的方式,或者采用最早可用的AGV作为当前搬运工序使用的AGV,这也是算法在某些算例中会陷入局部最优的情形,感兴趣的可加入一维关于AGV的编码。

4 部分算例展示

EX53的算例甘特图如下:

5 部分代码展示

5.1 GA.py

  1. import random
  2. import numpy as np
  3. import os
  4. from utils import RJSP_Env
  5. from Gantt_graph import Gantt
  6. import time
  7. import pickle
  8. class GA:
  9. def __init__(self, args):
  10. # self.Shop_Floor,self.args=RFSP_Env(args)
  11. self.Shop_Floor, self.args = RJSP_Env(args)
  12. self.Pm = 0.2
  13. self.Pc = 0.9
  14. self.Pop_size =200
  15. # 随机产生染色体
  16. def RCH(self):
  17. Chromo1=[]
  18. for i in range(len(self.args.MT)):
  19. Chromo1.extend([i for _ in range(len(self.args.MT[i])-1)])
  20. random.shuffle(Chromo1)
  21. self.args.Chromo_len=len(Chromo1)
  22. Chromo2=[]
  23. # print(Chromo1)
  24. return Chromo1
  25. # 生成初始种群
  26. def CHS(self):
  27. CHS = []
  28. for i in range(self.Pop_size):
  29. CHS.append(self.RCH())
  30. return CHS
  31. # 选择
  32. def Select(self, Fit_value):
  33. Fit = []
  34. for i in range(len(Fit_value)):
  35. fit = 1 / Fit_value[i]
  36. Fit.append(fit)
  37. Fit = np.array(Fit)
  38. idx = np.random.choice(np.arange(len(Fit_value)), size=len(Fit_value), replace=True,
  39. p=(Fit) / (Fit.sum()))
  40. return idx
  41. # 交叉
  42. def Crossover(self, CHS1, CHS2):
  43. Job_list = [i for i in range(self.args.n)]
  44. random.shuffle(Job_list)
  45. r = random.randint(1, self.args.n - 1)
  46. Set1 = Job_list[0:r]
  47. new_CHS1 = list(np.zeros(self.args.Chromo_len, dtype=int))
  48. new_CHS2 = list(np.zeros(self.args.Chromo_len, dtype=int))
  49. for k, v in enumerate(CHS1):
  50. if v in Set1:
  51. new_CHS1[k] = v+1
  52. for i in CHS2:
  53. if i not in Set1:
  54. Site = new_CHS1.index(0)
  55. new_CHS1[Site] = i + 1
  56. for k, v in enumerate(CHS2):
  57. if v not in Set1:
  58. new_CHS2[k] = v + 1
  59. for i in CHS2:
  60. if i in Set1:
  61. Site = new_CHS2.index(0)
  62. new_CHS2[Site] = i + 1
  63. new_CHS1= np.array([j - 1 for j in new_CHS1])
  64. new_CHS2 = np.array([j - 1 for j in new_CHS2])
  65. return new_CHS1,new_CHS2
  66. # 变异
  67. def Mutation(self, CHS):
  68. Tr = [i_num for i_num in range(self.args.Chromo_len)]
  69. # 机器选择部分
  70. r = random.randint(1, self.args.Chromo_len) # 在变异染色体中选择r个位置
  71. random.shuffle(Tr)
  72. T_r = Tr[0:r]
  73. K = []
  74. for i in T_r:
  75. K.append(CHS[i])
  76. random.shuffle(K)
  77. k = 0
  78. for i in T_r:
  79. CHS[i] = K[k]
  80. k += 1
  81. return CHS
  82. #解码
  83. def decode(self,chs,):
  84. self.Shop_Floor.reset()
  85. k = 0
  86. for i in chs:
  87. agv=random.choice([0,1]) #这里目前写的随机,所以得到的结果不是很好,可改成两层编码
  88. K = self.Shop_Floor.scheduling(i,agv)
  89. # Gantt(self.Shop_Floor.Machines, self.Shop_Floor.AGVs)
  90. k += 1
  91. # if done:
  92. # break
  93. return self.Shop_Floor.C_max()
  94. def main(self,target_C=None,file=None):
  95. start=time.time()
  96. BF = []
  97. C = self.CHS()
  98. Fit = []
  99. for C_i in C:
  100. Fit.append(self.decode(C_i))
  101. best_C = None
  102. best_fit = min(Fit)
  103. BF.append(best_fit)
  104. for i in range(100):
  105. C_id = self.Select(Fit)
  106. C = [C[_] for _ in C_id]
  107. for Ci in range(len(C)):
  108. if random.random() < self.Pc:
  109. _C = [C[Ci]]
  110. CHS1, CHS2 = self.Crossover(C[Ci], random.choice(C))
  111. _C.extend([CHS1, CHS2])
  112. Fi = []
  113. for ic in _C:
  114. Fi.append(self.decode(ic))
  115. C[Ci] = _C[Fi.index(min(Fi))]
  116. Fit.append(min(Fi))
  117. elif random.random() < self.Pm:
  118. CHS1 = self.Mutation(C[Ci])
  119. C[Ci] = CHS1
  120. Fit = []
  121. for C_i in C:
  122. fi=self.decode(C_i)
  123. Fit.append(fi)
  124. # Gantt(self.Shop_Floor.Machines, self.Shop_Floor.AGVs)
  125. if fi<=97:
  126. target_C=fi
  127. Gantt(self.Shop_Floor.Machines, self.Shop_Floor.AGVs)
  128. if min(Fit) < best_fit:
  129. best_fit = min(Fit)
  130. best_C = C[Fit.index(min(Fit))]
  131. # print(best_C)
  132. # Gantt(self.Shop_Floor.Machines, self.Shop_Floor.AGVs)
  133. BF.append(best_fit)
  134. # print('迭代次数:',i+1,'完工时间:',best_fit)
  135. x = [_ for _ in range(len(BF))]
  136. # plt.plot(x, BF)
  137. # plt.show()
  138. # best_C.Gantt()
  139. end=time.time()
  140. T=end-start
  141. print('运行时间--->>>>',T)
  142. return best_C,best_fit
  143. if __name__ == "__main__":
  144. import argparse
  145. from Instance.Text_extract import data, data_for_storer
  146. from Instance.data_extract import change
  147. import numpy as np
  148. K=["11",'21','31','41','51','61','71','81','91','101',"12",'22','32','42','52','62','72','82','92','102',
  149. "13",'23','33','43','53','63','73','83','93','103',"14",'24','34','44','54','64','74','84','94','104']
  150. for fi in K:
  151. f='.\Instance\Bilge_Ulusoy\C1\E'+str(53)+'.pkl'
  152. print(f)
  153. n, m, PT, agv_trans, MT, agv_num = data(f)
  154. # n, m, PT, MT=change('la',1)
  155. # agv_trans=[]
  156. # print(agv_trans)
  157. agv_num = 2
  158. parser = argparse.ArgumentParser("Reinforcement Learning experiments for multiagent environments")
  159. # TWO_AGVs_FS_ENV
  160. parser.add_argument("--n", type=int, default=n, help="Job number")
  161. parser.add_argument("--m", type=int, default=m, help="Machine number")
  162. parser.add_argument("--PT", type=list, default=PT, help="PROCESSING TIME OF JOB")
  163. parser.add_argument("--MT", type=list, default=MT, help="PROCESSING TIME OF JOB")
  164. parser.add_argument("--AGV_num", type=int, default=2, help="transportation time between machines")
  165. parser.add_argument("--agv_trans", type=list, default=agv_trans, help="transportation time between machines")
  166. args = parser.parse_args()
  167. print('测试算例:EX',fi)
  168. for i in range(10):
  169. # print( PT, MT)
  170. g = GA(args)
  171. best_C,best_fit=g.main(96)
  172. print(best_fit)
  173. # print(g.RCH())
  174. # g.decode([4, 4 ,0 ,3, 4, 0, 0, 1, 1, 1, 1, 2, 2, 0, 3, 3, 2, 2])
  175. # Gantt(g.Shop_Floor.Machines,g.Shop_Floor.AGVs)

 5.2 JS_ENV.py

  1. from Flow_Shop_Environment.FS_Env import RFS_Env
  2. from Job_Shop_Environment.JS_Job import Job
  3. import copy
  4. class JS_Env(RFS_Env):
  5. def __init__(self,args):
  6. super().__init__(args)
  7. self.MT=args.MT
  8. def Create_Job(self):
  9. self.Jobs = []
  10. for k in range(self.J_num):
  11. J = Job(k, self.PT[k],self.MT[k])
  12. self.Jobs.append(J)
  13. def reset(self):
  14. self.Create_Item()
  15. for Ji in self.Jobs:
  16. self.Machines[Ji.handling_machine[0]].put_into_job(Ji,0)
  17. Ji.Cur_site=Ji.handling_machine[0]
  18. def scheduling(self,action,agv_num):
  19. done = False
  20. # 空载
  21. agv = self.AGVs[agv_num]
  22. t = agv.end
  23. selected_job = self.Jobs[action]
  24. O_num=copy.copy(selected_job.O_num)
  25. Cur_Machine = self.Machines[selected_job.Cur_site]
  26. target_Machine = self.Machines[selected_job.next_machine()]
  27. trans_t1 = self.agv_trans[agv.Current_Site][Cur_Machine.index]
  28. trans_t2 = self.agv_trans[Cur_Machine.index][target_Machine.index]
  29. # 等待
  30. if selected_job in Cur_Machine.OB:
  31. # 获取AGV的移动开始时间
  32. JS,JE=selected_job.start,selected_job.end
  33. t,trans_t1 = agv.ST_move(selected_job.end, Cur_Machine.index,trans_t1,trans_t2)
  34. else:
  35. JS,JE=Cur_Machine.handling(selected_job)
  36. # 获取AGV的移动开始时间
  37. t,trans_t1 = agv.ST_move(selected_job.end, Cur_Machine.index,trans_t1,trans_t2)
  38. agv.send_to(selected_job.Cur_site, t, trans_t1, load=None)
  39. t1 = t + trans_t1
  40. Cur_Machine.pick_up_job(selected_job)
  41. # 负载转移
  42. agv.send_to(selected_job.next_machine(), t1, trans_t2, load=selected_job.index)
  43. selected_job.Cur_site = selected_job.next_machine()
  44. selected_job.finished_move()
  45. t3 = t1 + trans_t2
  46. selected_job.arrive = t3
  47. target_Machine.put_into_job(selected_job, t3)
  48. if selected_job.Cur_site == self.M_num:
  49. selected_job.J = True
  50. # self.update()
  51. if self.judge_state():
  52. done = True
  53. self.rest_work()
  54. return done,O_num,t,t3,JS,JE
  55. def scheduling1(self,action):
  56. wait_Job_IB, wait_Job_OB=0,0
  57. done = False
  58. # 空载
  59. T=[]
  60. selected_job = self.Jobs[action]
  61. Cur_Machine = self.Machines[selected_job.Cur_site]
  62. target_Machine = self.Machines[selected_job.next_machine()]
  63. Trans=[]
  64. if selected_job not in Cur_Machine.OB:
  65. # 获取AGV的移动开始时间
  66. Cur_Machine.handling(selected_job)
  67. # 获取AGV的移动开始时间
  68. for agv in self.AGVs:
  69. # t = agv.end
  70. trans_t1 = self.agv_trans[agv.Current_Site][Cur_Machine.index]
  71. trans_t2 = self.agv_trans[Cur_Machine.index][target_Machine.index]
  72. Trans.append([trans_t1,trans_t2])
  73. t,trans_t1 = agv.ST_move(selected_job.end, Cur_Machine.index,trans_t1,trans_t2)
  74. T.append(t)
  75. agv=self.AGVs[T.index(min(T))]
  76. t=min(T)
  77. trans_t1,trans_t2=Trans[T.index(min(T))][0],Trans[T.index(min(T))][1]
  78. agv.send_to(selected_job.Cur_site, t, trans_t1, load=None)
  79. t1 = t + trans_t1
  80. wait_Job_IB = selected_job.start - selected_job.arrive
  81. Cur_Machine.pick_up_job(selected_job)
  82. # 负载转移
  83. agv.send_to(selected_job.next_machine(), t1, trans_t2, load=selected_job.index)
  84. # print('这是agv',agv.idx,'动作为',agv._on)
  85. selected_job.Cur_site = selected_job.next_machine()
  86. selected_job.finished_move()
  87. t3 = t1 + trans_t2
  88. selected_job.arrive = t3
  89. target_Machine.put_into_job(selected_job, t3)
  90. if selected_job.Cur_site == self.M_num - 1:
  91. selected_job.J = True
  92. # self.update()
  93. if self.judge_state():
  94. done = True
  95. self.rest_work()
  96. return done, wait_Job_IB, wait_Job_OB
  97. if __name__=="__main__":
  98. from Gantt_graph import Gantt
  99. # from simple_MADRL.Params import args
  100. import random
  101. import pickle
  102. import numpy as np
  103. import argparse
  104. from Env.Instance.Text_extract import data
  105. fee = r"..\Instance\Bilge_Ulusoy\C1\E71.PKl"
  106. n, m, PT, agv_trans, MT, agv_num = data(fee)
  107. # print(PT)
  108. parser = argparse.ArgumentParser("Reinforcement Learning experiments for multiagent environments")
  109. # TWO_AGVs_FS_ENV
  110. parser.add_argument("--n", type=int, default=n, help="Job number")
  111. parser.add_argument("--m", type=int, default=m, help="Machine number")
  112. parser.add_argument("--PT", type=list, default=PT, help="PROCESSING TIME OF JOB")
  113. parser.add_argument("--MT", type=list, default=MT, help="PROCESSING TIME OF JOB")
  114. parser.add_argument("--AGV_num", type=int, default=2, help="transportation time between machines")
  115. parser.add_argument("--agv_trans", type=list, default=agv_trans, help="transportation time between machines")
  116. args = parser.parse_args()
  117. fs=JS_Env(args)
  118. # print(args.PT,args.MT)
  119. fs.reset()
  120. L=[0, 7, 6, 0, 5, 7, 2, 6, 4, 5, 1, 7, 3, 5, 6, 5, 3, 4, 1, 7, 3, 2, 0, 1, 6, 2, 4]
  121. for action in L:
  122. print(action)
  123. K=fs.scheduling1(action)
  124. # Gantt(fs.Machines, fs.AGVs)
  125. # if done:
  126. # break
  127. print(fs.C_max())
  128. Gantt(fs.Machines, fs.AGVs)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/600226
推荐阅读
相关标签
  

闽ICP备14008679号