赞
踩
不同与之前的1D问题,这次任务将问题拓展到2D上,如下图只考虑三个作战单元:蓝方战斗机Fighter、红方目标Target、红方导弹防御系统SAM。
控制:在每个时刻的Fighter的下一个航向(the fighter’s next heading),是一个连续型控制变量 [ 0 , 2 π ] [0,2\pi] [0,2π],是唯一的动作变量。
假设:在此假设Agent只能根据当前状态 s t s_t st下的每个时间点处的战斗机理想位置采取动作 a t a_t at,而与之前的Agent的状态 s i ( i = 0 , 1 , . . t − 1 ) s_{i}(i=0,1,..t-1) si(i=0,1,..t−1)无关。并且在原论文中,为了限制状态空间的尺寸,而将Fighter的速度设置成常量。而在我们这里,将在扩展部分考虑将Fighter的加速度 a t a_t at也纳入动作空间。同时SAM系统具有部分全向检测的能力,即对于只落在其射程内的Fighter都具有检测能力,而Fighter则对于SAM系统的状态则是全可观测的。
基本平台:原论文中采用的是AFGYM/AFSIM是基本平台。求解算法不再使用Q-Learning,由于其相比与策略梯度方法而言在求解高维状态空间的问题上不具备优势,其在使用带Reinforce的Q-Learning训练行为时,最后的结果不太理想,就像随机游走一样。并且由于决策的实时性导致GAN也是不太适用的,论文采用了A3C和PPO算法对原来问题做了训练。
将原来场景中的状态向量 s t s_t st由以下的11维向量组成:
注意:训练过程中除了0-1变量,所有的状态向量的值都被归一化了,否则将限制梯度的反向传播过程。在后面的编程中暂时不考虑SAM pointing、Target heading,因此状态空间被压缩到9维。这个可以作为后面改进的一个具体的方向。其中的Fighter heading作为一个基本的动作 a t a_t at。
因此最终状态 s t s_t st的状态空间: S = [ 0 , 1 ] 8 S=[0,1]^{8} S=[0,1]8,而动作 a t a_t at的动作空间: A = [ 0 , 1 ] A=[0,1] A=[0,1]映射到角度范围 [ − 18 0 。 , 18 0 。 ] [-180^{。},180^{。}] [−180。,180。]。
每当无人机飞行一个时间步长 Δ t \Delta t Δt,若未出现在SAM的火力射程之内,则返回奖励-1,回合继续。若出现在SAM的火力射程之内则返回奖励-50,回合继续。若出现在SAM的火力射程之内且被SAM击落,则返回-1000,回合结束。若出现在Target 附近返回50,回合继续。若出现在Target附近且成功打击到Target,返回1000,回合结束。
模型只考虑Fighter匀速运动的情况,后面可以考虑施加突防效果,以此作为一个突破点的研究方向(2)。
假设Fighter的速度恒定为
v
v
v,
t
t
t时刻Fighter的航向为
a
t
a_t
at(动作变量),Fighter的位置为
(
x
J
(
t
)
,
y
J
(
t
)
)
(x_J^{(t)},y_J^{(t)})
(xJ(t),yJ(t)),则有:
x
J
(
t
+
1
)
=
x
J
(
t
)
+
v
Δ
t
cos
a
t
y
J
(
t
+
1
)
=
y
J
(
t
)
+
v
Δ
t
sin
a
t
x_J^{(t+1)}=x_J^{(t)}+v\Delta t \cos a_t \\ y_J^{(t+1)}=y_J^{(t)}+v\Delta t \sin a_t
xJ(t+1)=xJ(t)+vΔtcosatyJ(t+1)=yJ(t)+vΔtsinat
Fighter的弹药量
w
t
w_{t}
wt,如果
t
t
t时刻Fighter打击目标则
w
t
+
1
=
w
t
−
d
w
t
w_{t+1}=w_t-dw_t
wt+1=wt−dwt。在此处我们不考虑弹药量的损失,考虑充分的弹药量,这里也可以作为一个突破点方向(3)。设SAM的火力范围区域为
R
R
R,在这里考虑
R
R
R为以SAM为中心,
R
S
A
M
R_{SAM}
RSAM为半径的圆形,其中
R
S
A
M
R_{SAM}
RSAM为Fighter的火力杀伤范围。
下面将直接给出代码:
class Fighter:
def __init__(self,L_limits=100,x=0,y=0,SAM_x=50,SAM_y=50,
target_x = 70,target_y=80,velocity=2175.0/3600,
delta_t=10,dead=False,SAM_range=40,SAM_time=60,
Fighter_range=10,firing_range=3.5,weapon_count=20):
self.L_limits = L_limits # 限制范围
self.x = x # 战斗机经度
self.y = y # 战斗机纬度
self.SAM_x = SAM_x
self.SAM_y = SAM_y
self.target_x = target_x
self.target_y = target_y
# 上面是位置的基本属性
self.SAM_disable = False
self.SAM_time = SAM_time # SAM系统的预备时间
self.SAM_range = SAM_range
self.firing_range = firing_range # 战斗机火力范围
self.weapon_count = weapon_count # 战斗机弹药容量
self.velocity = velocity
self.delta_t = delta_t
self.dead = dead
self.wait_time = 0
self.burn_reward = -1000
self.win_reward = 1000
self.reward = -1 # 战斗机正常运行
# 下面对战斗机的初始位置进行更新
def reset(self):
self.x = self.L_limits*np.random.random()
self.y = self.L_limits*np.random.random()
self.dead = False
self.wait_time = 0
self.SAM_disable = False
return self.x,self.y
# 下面是状态转移方程
def step(self,figher_heading):
# 下面采用弧度制
figher_heading = np.clip(figher_heading,-np.pi,np.pi)
self.x = self.x + self.velocity * self.delta_t*np.cos(figher_heading)
self.y = self.y + self.velocity * self.delta_t*np.sin(figher_heading)
# 限制战斗机的横向距离
if self.x <= 0:
self.x = 0
elif self.x > self.L_limits:
self.x = self.L_limits
# 限制战斗机的纵向位置
if self.y <=0:
self.y = 0
elif self.y > self.L_limits:
self.y = self.L_limits
# 判断战斗机是否在区域内
if self.SAM_disable == False:
d = np.sqrt((self.x - self.SAM_x)**2 + (self.y - self.SAM_y)**2)
if d <= self.SAM_range:
self.wait_time = self.wait_time + self.delta_t
else:
# 一旦不在其SAM射程之内
self.wait_time = 0
# 判断战斗机的等待时间
if self.wait_time >= self.SAM_time: #当战斗机在SAM的时间内停留一段时间
# 此时战斗机会被击落,返回极大奖励
self.wait_time = 0
return (self.x,self.y),self.burn_reward,True
# 当战斗机靠近SAM时可以立即击毁SAM(这是一个关键节点给50的奖励)
if d <= self.firing_range and self.SAM_disable == False:
self.SAM_disable = True
self.wait_time = 0
return (self.x,self.y),50,False
# 当战斗机靠近target时
if np.sqrt((self.target_x - self.x)**2 + (self.target_y - self.y)**2) <= self.firing_range:
return (self.x,self.y),self.win_reward,True
# 战斗机正常运行时候,返回-1
return (self.x,self.y),-1,False
## 强化学习或许可以结合故障树进行策略分析!!!
下面利用随机游走给出测试代码:
x_array,y_array = [],[]
rewards = 0.0
SAM_x,SAM_y = 50,50
target_x,target_y = 80,70
SAM_range = 20
fighter = Fighter(SAM_x=SAM_x,SAM_y=SAM_y,target_x=target_x,target_y=target_y,SAM_range=SAM_range)
while True:
fighter_heading = np.random.random()*2*np.pi-np.pi
next_state,reward,done = fighter.step(fighter_heading)
rewards += reward
print("当前Figher位置[{},{}],获得奖励{},累计奖励{}".format(next_state[0],next_state[1],reward,rewards))
x_array.append(next_state[0])
y_array.append(next_state[1])
if done:
break
import matplotlib.pyplot as plt
# 以火力射程画圆
circle_x,circle_y = [],[]
for theta in np.linspace(-np.pi,np.pi):
circle_x.append(SAM_x + SAM_range*np.cos(theta))
circle_y.append(SAM_y + SAM_range*np.sin(theta))
plt.figure(figsize=(5,5))
plt.plot(x_array,y_array,'b.-',linewidth=1)
plt.plot(SAM_x,SAM_y,'ro',linewidth=5)
plt.plot(circle_x,circle_y,'r.-',linewidth=1)
plt.plot(target_x,target_y,'go',linewidth=5)
plt.title(['one road'])
测试结果为:
以下情况是被SAM击落的:
以下情况时成功绕开SAM的:
采用带优化的连续PPO算法求解上面的问题,需要导入库:
import torch
import matplotlib.pyplot as plt
import numpy as np
from normalization import Normalization, RewardScaling
from replaybuffer import ReplayBuffer
from ppo_continuous import PPO_continuous
from Fighter_env import Fighter
from CPPO_main import args_param,train_network,test_network
采用下面的主函数求解:
# 声明环境
args = args_param(max_episode_steps=2000,batch_size=64,max_train_steps=5000,K_epochs=3)
# 声明参数
env = Fighter(delta_t=25,SAM_range=30,Fighter_range=10,SAM_time=30)
env.test(show_pictures=True,show_tips=False)
agent = train_network(args,env)
x_array,y_array,_ = test_network(args,env,agent)
得到以下平均奖励的收敛曲线:
得到如下的测试结果:
看来用连续动作空间的PPO训练网络还是问题太大了!!!
import numpy as np
class RunningMeanStd:
# Dynamically calculate mean and std
def __init__(self, shape): # shape:the dimension of input data
self.n = 0
self.mean = np.zeros(shape)
self.S = np.zeros(shape)
self.std = np.sqrt(self.S)
def update(self, x):
x = np.array(x)
self.n += 1
if self.n == 1:
self.mean = x
self.std = x
else:
old_mean = self.mean.copy()
self.mean = old_mean + (x - old_mean) / self.n
self.S = self.S + (x - old_mean) * (x - self.mean)
self.std = np.sqrt(self.S / self.n)
class Normalization:
def __init__(self, shape):
self.running_ms = RunningMeanStd(shape=shape)
def __call__(self, x, update=True):
# Whether to update the mean and std,during the evaluating,update=False
if update:
self.running_ms.update(x)
x = (x - self.running_ms.mean) / (self.running_ms.std + 1e-8)
return x
class RewardScaling:
def __init__(self, shape, gamma):
self.shape = shape # reward shape=1
self.gamma = gamma # discount factor
self.running_ms = RunningMeanStd(shape=self.shape)
self.R = np.zeros(self.shape)
def __call__(self, x):
self.R = self.gamma * self.R + x
self.running_ms.update(self.R)
x = x / (self.running_ms.std + 1e-8) # Only divided std
return x
def reset(self): # When an episode is done,we should reset 'self.R'
self.R = np.zeros(self.shape)
import numpy as np
import matplotlib.pyplot as plt
# 下面类用来定义环境
class Fighter:
def __init__(self,L_limits=100,x=0,y=0,SAM_x=50,SAM_y=50,
target_x = 70,target_y=80,velocity=2175.0/3600,
delta_t=10,dead=False,SAM_range=40,SAM_time=60,
Fighter_range=10,weapon_count=20):
self.L_limits = L_limits # 限制范围
self.x = x # 战斗机经度
self.y = y # 战斗机纬度
self.SAM_x = SAM_x
self.SAM_y = SAM_y
self.target_x = target_x
self.target_y = target_y
# 上面是位置的基本属性
self.SAM_disable = False
self.SAM_time = SAM_time # SAM系统的预备时间
self.SAM_range = SAM_range
self.firing_range = Fighter_range # 战斗机火力范围
self.weapon_count = weapon_count # 战斗机弹药容量
self.velocity = velocity
self.delta_t = delta_t
self.dead = dead
self.wait_time = 0
self.burn_reward = -1000
self.win_reward = 1000
self.reward = -1 # 战斗机正常运行
# 下面声明其基本属性
self.observation_space = np.array([[0,self.L_limits],[0,self.L_limits]])
self.action_space = np.array([[-np.pi,np.pi]])
# 下面对战斗机的初始位置进行更新
def reset(self):
# self.x = self.L_limits*np.random.random()
# self.y = self.L_limits*np.random.random()
self.x = 0
self.y = 0
self.dead = False
self.wait_time = 0
self.SAM_disable = False
return np.array([self.x,self.y],dtype=np.float32)
# 下面是状态转移方程
def step(self,figher_heading):
# 下面采用弧度制
figher_heading = np.clip(figher_heading,-np.pi,np.pi)
self.x = self.x + self.velocity * self.delta_t*np.cos(figher_heading)
self.y = self.y + self.velocity * self.delta_t*np.sin(figher_heading)
self.x = np.clip(self.x,a_min=0.0,a_max=self.L_limits)
self.y = np.clip(self.y,a_min=0.0,a_max=self.L_limits)
# 当战斗机靠近target时
if np.linalg.norm(np.array([self.target_x - self.x, self.target_y - self.y], dtype=np.float32)) <= self.firing_range:
return np.array([self.x, self.y], dtype=np.float32), self.win_reward, True
# 判断战斗机是否在区域内
d = np.linalg.norm(np.array([self.x - self.SAM_x, self.y - self.SAM_y], dtype=np.float32))
if self.SAM_disable == False:
if d <= self.SAM_range:
self.wait_time = self.wait_time + self.delta_t
else:
# 一旦不在其SAM射程之内
self.wait_time = 0
# 判断战斗机的等待时间
if self.wait_time >= self.SAM_time: #当战斗机在SAM的时间内停留一段时间
# 此时战斗机会被击落,返回极大奖励
self.wait_time = 0
return np.array([self.x,self.y],dtype=np.float32),self.burn_reward,True
# 当战斗机靠近SAM时可以立即击毁SAM(这是一个关键节点给50的奖励)
if d <= self.firing_range and self.SAM_disable == False:
self.SAM_disable = True
self.wait_time = 0
return np.array([self.x,self.y],dtype=np.float32),50,False
# 战斗机正常运行时候,返回-1
return np.array([self.x,self.y],dtype=np.float32),-1,False
## 强化学习或许可以结合故障树进行策略分析!!!
# 下面将测试一个回合
def test(self,show_tips=True,show_pictures=True):
x_array,y_array=[],[]
rewards = 0.0
while True:
fighter_headings = np.random.random()*2*np.pi - np.pi
next_state,reward,done = self.step(fighter_headings)
rewards += reward
if show_tips:
print("当前Figher位置[{},{}],获得奖励{},累计奖励{}".format(next_state[0], next_state[1], reward, rewards))
x_array.append(next_state[0])
y_array.append(next_state[1])
if done:
break
# 如果要画图就以火力范围画图
if show_pictures:
circle_x, circle_y = [], []
for theta in np.linspace(-np.pi, np.pi):
circle_x.append(self.SAM_x + self.SAM_range * np.cos(theta))
circle_y.append(self.SAM_y + self.SAM_range * np.sin(theta))
plt.figure(figsize=(5, 5))
plt.plot(x_array, y_array, 'b.-', linewidth=1)
plt.plot(self.SAM_x, self.SAM_y, 'ro', linewidth=5)
plt.plot(circle_x, circle_y, 'r.-', linewidth=1)
plt.plot(self.target_x, self.target_y, 'go', linewidth=5)
plt.plot(x_array[-1],y_array[-1],'m*',linewidth=5)
plt.title(['one road'])
plt.show()
在replaybuffer.py中存储缓存区replaybuffer类:
def __init__(self, args):
self.state_dim = args.state_dim
self.action_dim = args.action_dim
self.batch_size = args.batch_size
self.s = np.zeros((args.batch_size, args.state_dim))
self.a = np.zeros((args.batch_size, args.action_dim))
self.a_logprob = np.zeros((args.batch_size, args.action_dim))
self.r = np.zeros((args.batch_size, 1))
self.s_ = np.zeros((args.batch_size, args.state_dim))
self.dw = np.zeros((args.batch_size, 1))
self.done = np.zeros((args.batch_size, 1))
self.count = 0
def store(self, s, a, a_logprob, r, s_, dw, done):
self.s[self.count] = s
self.a[self.count] = a
self.a_logprob[self.count] = a_logprob
self.r[self.count] = r
self.s_[self.count] = s_
self.dw[self.count] = dw
self.done[self.count] = done
self.count += 1
def numpy_to_tensor(self):
s = torch.tensor(self.s, dtype=torch.float)
a = torch.tensor(self.a, dtype=torch.float)
a_logprob = torch.tensor(self.a_logprob, dtype=torch.float)
r = torch.tensor(self.r, dtype=torch.float)
s_ = torch.tensor(self.s_, dtype=torch.float)
dw = torch.tensor(self.dw, dtype=torch.float)
done = torch.tensor(self.done, dtype=torch.float)
return s, a, a_logprob, r, s_, dw, done
import torch
import torch.nn.functional as F
from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler
import torch.nn as nn
from torch.distributions import Beta, Normal
# Trick 8: orthogonal initialization
def orthogonal_init(layer, gain=1.0):
nn.init.orthogonal_(layer.weight, gain=gain)
nn.init.constant_(layer.bias, 0)
class Actor_Beta(nn.Module):
def __init__(self, args):
super(Actor_Beta, self).__init__()
self.fc1 = nn.Linear(args.state_dim, args.hidden_width)
self.fc2 = nn.Linear(args.hidden_width, args.hidden_width)
self.alpha_layer = nn.Linear(args.hidden_width, args.action_dim)
self.beta_layer = nn.Linear(args.hidden_width, args.action_dim)
self.activate_func = [nn.ReLU(), nn.Tanh()][args.use_tanh] # Trick10: use tanh
if args.use_orthogonal_init:
print("------use_orthogonal_init------")
orthogonal_init(self.fc1)
orthogonal_init(self.fc2)
orthogonal_init(self.alpha_layer, gain=0.01)
orthogonal_init(self.beta_layer, gain=0.01)
def forward(self, s):
s = self.activate_func(self.fc1(s))
s = self.activate_func(self.fc2(s))
# alpha and beta need to be larger than 1,so we use 'softplus' as the activation function and then plus 1
alpha = F.softplus(self.alpha_layer(s)) + 1.0
beta = F.softplus(self.beta_layer(s)) + 1.0
return alpha, beta
def get_dist(self, s):
alpha, beta = self.forward(s)
dist = Beta(alpha, beta)
return dist
def mean(self, s):
alpha, beta = self.forward(s)
mean = alpha / (alpha + beta) # The mean of the beta distribution
return mean
class Actor_Gaussian(nn.Module):
def __init__(self, args):
super(Actor_Gaussian, self).__init__()
self.max_action = args.max_action
self.fc1 = nn.Linear(args.state_dim, args.hidden_width)
self.fc2 = nn.Linear(args.hidden_width, args.hidden_width)
self.mean_layer = nn.Linear(args.hidden_width, args.action_dim)
self.log_std = nn.Parameter(torch.zeros(1, args.action_dim)) # We use 'nn.Parameter' to train log_std automatically
self.activate_func = [nn.ReLU(), nn.Tanh()][args.use_tanh] # Trick10: use tanh
if args.use_orthogonal_init:
print("------use_orthogonal_init------")
orthogonal_init(self.fc1)
orthogonal_init(self.fc2)
orthogonal_init(self.mean_layer, gain=0.01)
def forward(self, s):
s = self.activate_func(self.fc1(s))
s = self.activate_func(self.fc2(s))
mean = self.max_action * torch.tanh(self.mean_layer(s)) # [-1,1]->[-max_action,max_action]
return mean
def get_dist(self, s):
mean = self.forward(s)
log_std = self.log_std.expand_as(mean) # To make 'log_std' have the same dimension as 'mean'
std = torch.exp(log_std) # The reason we train the 'log_std' is to ensure std=exp(log_std)>0
dist = Normal(mean, std) # Get the Gaussian distribution
return dist
class Critic(nn.Module):
def __init__(self, args):
super(Critic, self).__init__()
self.fc1 = nn.Linear(args.state_dim, args.hidden_width)
self.fc2 = nn.Linear(args.hidden_width, args.hidden_width)
self.fc3 = nn.Linear(args.hidden_width, 1)
self.activate_func = [nn.ReLU(), nn.Tanh()][args.use_tanh] # Trick10: use tanh
if args.use_orthogonal_init:
print("------use_orthogonal_init------")
orthogonal_init(self.fc1)
orthogonal_init(self.fc2)
orthogonal_init(self.fc3)
def forward(self, s):
s = self.activate_func(self.fc1(s))
s = self.activate_func(self.fc2(s))
v_s = self.fc3(s)
return v_s
class PPO_continuous():
def __init__(self, args):
self.policy_dist = args.policy_dist
self.max_action = args.max_action
self.batch_size = args.batch_size
self.mini_batch_size = args.mini_batch_size
self.max_train_steps = args.max_train_steps
self.lr_a = args.lr_a # Learning rate of actor
self.lr_c = args.lr_c # Learning rate of critic
self.gamma = args.gamma # Discount factor
self.lamda = args.lamda # GAE parameter
self.epsilon = args.epsilon # PPO clip parameter
self.K_epochs = args.K_epochs # PPO parameter
self.entropy_coef = args.entropy_coef # Entropy coefficient
self.set_adam_eps = args.set_adam_eps
self.use_grad_clip = args.use_grad_clip
self.use_lr_decay = args.use_lr_decay
self.use_adv_norm = args.use_adv_norm
if self.policy_dist == "Beta":
self.actor = Actor_Beta(args)
else:
self.actor = Actor_Gaussian(args)
self.critic = Critic(args)
if self.set_adam_eps: # Trick 9: set Adam epsilon=1e-5
self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr_a, eps=1e-5)
self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr_c, eps=1e-5)
else:
self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr_a)
self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr_c)
def evaluate(self, s): # When evaluating the policy, we only use the mean
s = torch.unsqueeze(torch.tensor(s, dtype=torch.float), 0)
if self.policy_dist == "Beta":
a = self.actor.mean(s).detach().numpy().flatten()
else:
a = self.actor(s).detach().numpy().flatten()
return a
def choose_action(self, s):
s = torch.unsqueeze(torch.tensor(s, dtype=torch.float), 0)
if self.policy_dist == "Beta":
with torch.no_grad():
dist = self.actor.get_dist(s)
a = dist.sample() # Sample the action according to the probability distribution
a_logprob = dist.log_prob(a) # The log probability density of the action
else:
with torch.no_grad():
dist = self.actor.get_dist(s)
a = dist.sample() # Sample the action according to the probability distribution
a = torch.clamp(a, -self.max_action, self.max_action) # [-max,max]
a_logprob = dist.log_prob(a) # The log probability density of the action
return a.numpy().flatten(), a_logprob.numpy().flatten()
def update(self, replay_buffer, total_steps):
s, a, a_logprob, r, s_, dw, done = replay_buffer.numpy_to_tensor() # Get training data
"""
Calculate the advantage using GAE
'dw=True' means dead or win, there is no next state s'
'done=True' represents the terminal of an episode(dead or win or reaching the max_episode_steps). When calculating the adv, if done=True, gae=0
"""
adv = []
gae = 0
with torch.no_grad(): # adv and v_target have no gradient
vs = self.critic(s)
vs_ = self.critic(s_)
deltas = r + self.gamma * (1.0 - dw) * vs_ - vs
for delta, d in zip(reversed(deltas.flatten().numpy()), reversed(done.flatten().numpy())):
gae = delta + self.gamma * self.lamda * gae * (1.0 - d)
adv.insert(0, gae)
adv = torch.tensor(adv, dtype=torch.float).view(-1, 1)
v_target = adv + vs
if self.use_adv_norm: # Trick 1:advantage normalization
adv = ((adv - adv.mean()) / (adv.std() + 1e-5))
# Optimize policy for K epochs:
for _ in range(self.K_epochs):
# Random sampling and no repetition. 'False' indicates that training will continue even if the number of samples in the last time is less than mini_batch_size
for index in BatchSampler(SubsetRandomSampler(range(self.batch_size)), self.mini_batch_size, False):
dist_now = self.actor.get_dist(s[index])
dist_entropy = dist_now.entropy().sum(1, keepdim=True) # shape(mini_batch_size X 1)
a_logprob_now = dist_now.log_prob(a[index])
# a/b=exp(log(a)-log(b)) In multi-dimensional continuous action space,we need to sum up the log_prob
ratios = torch.exp(a_logprob_now.sum(1, keepdim=True) - a_logprob[index].sum(1, keepdim=True)) # shape(mini_batch_size X 1)
surr1 = ratios * adv[index] # Only calculate the gradient of 'a_logprob_now' in ratios
surr2 = torch.clamp(ratios, 1 - self.epsilon, 1 + self.epsilon) * adv[index]
actor_loss = -torch.min(surr1, surr2) - self.entropy_coef * dist_entropy # Trick 5: policy entropy
# Update actor
self.optimizer_actor.zero_grad()
actor_loss.mean().backward()
if self.use_grad_clip: # Trick 7: Gradient clip
torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
self.optimizer_actor.step()
v_s = self.critic(s[index])
critic_loss = F.mse_loss(v_target[index], v_s)
# Update critic
self.optimizer_critic.zero_grad()
critic_loss.backward()
if self.use_grad_clip: # Trick 7: Gradient clip
torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.5)
self.optimizer_critic.step()
if self.use_lr_decay: # Trick 6:learning rate Decay
self.lr_decay(total_steps)
def lr_decay(self, total_steps):
lr_a_now = self.lr_a * (1 - total_steps / self.max_train_steps)
lr_c_now = self.lr_c * (1 - total_steps / self.max_train_steps)
for p in self.optimizer_actor.param_groups:
p['lr'] = lr_a_now
for p in self.optimizer_critic.param_groups:
p['lr'] = lr_c_now
import matplotlib.pyplot as plt
import numpy as np
from normalization import Normalization, RewardScaling
from replaybuffer import ReplayBuffer
from ppo_continuous import PPO_continuous
from Fighter_env import Fighter
# 先定义一个参数类,用来储存超参数
class args_param(object):
def __init__(self,max_train_steps=int(3e6),
evaluate_freq=5e3,
save_freq=20,
policy_dist="Gaussian",
batch_size=2048,
mini_batch_size=64,
hidden_width=64,
lr_a=3e-4,
lr_c=3e-4,
gamma=0.99,
lamda=0.95,
epsilon=0.2,
K_epochs=10,
max_episode_steps = 1000,
use_adv_norm=True,
use_state_norm=True,
use_reward_norm=False,
use_reward_scaling=True,
entropy_coef=0.01,
use_lr_decay=True,
use_grad_clip=True,
use_orthogonal_init=True,
set_adam_eps=True,
use_tanh=True):
self.max_train_steps = max_train_steps
self.evaluate_freq = evaluate_freq
self.save_freq = save_freq
self.policy_dist = policy_dist
self.batch_size = batch_size
self.mini_batch_size = mini_batch_size
self.hidden_width = hidden_width
self.lr_a = lr_a
self.lr_c = lr_c
self.gamma = gamma
self.lamda = lamda
self.epsilon = epsilon
self.K_epochs = K_epochs
self.use_adv_norm = use_adv_norm
self.use_state_norm = use_state_norm
self.use_reward_norm = use_reward_norm
self.use_reward_scaling = use_reward_scaling
self.entropy_coef = entropy_coef
self.use_lr_decay = use_lr_decay
self.use_grad_clip = use_grad_clip
self.use_orthogonal_init = use_orthogonal_init
self.set_adam_eps = set_adam_eps
self.use_tanh = use_tanh
self.max_episode_steps = max_episode_steps
def print_information(self):
print("Maximum number of training steps:",self.max_train_steps)
print("Evaluate the policy every 'evaluate_freq' steps:",self.evaluate_freq)
print("Save frequency:",self.save_freq)
print("Beta or Gaussian:",self.policy_dist)
print("Batch size:",self.batch_size)
print("Minibatch size:",self.mini_batch_size)
print("The number of neurons in hidden layers of the neural network:",self.hidden_width)
print("Learning rate of actor:",self.lr_a)
print("Learning rate of critic:",self.lr_c)
print("Discount factor:",self.gamma)
print("GAE parameter:",self.lamda)
print("PPO clip parameter:",self.epsilon)
print("PPO parameter:",self.K_epochs)
print("Trick 1:advantage normalization:",self.use_adv_norm)
print("Trick 2:state normalization:",self.use_state_norm)
print("Trick 3:reward normalization:",self.use_reward_norm)
print("Trick 4:reward scaling:",self.use_reward_scaling)
print("Trick 5: policy entropy:",self.entropy_coef)
print("Trick 6:learning rate Decay:",self.use_lr_decay)
print("Trick 7: Gradient clip:",self.use_grad_clip)
print("Trick 8: orthogonal initialization:",self.use_orthogonal_init)
print("Trick 9: set Adam epsilon=1e-5:",self.set_adam_eps)
print("Trick 10: tanh activation function:",self.use_tanh)
# 下面函数用来训练网络
def train_network(args,env,show_picture=True):
epsiode_rewards = []
epsiode_mean_rewards = []
# 下面将导入env环境参数
args.state_dim = env.observation_space.shape[0]
args.action_dim = env.action_space.shape[0]
args.max_action = float(env.action_space[0][1])
# 下面将定义一个缓冲区
replay_buffer = ReplayBuffer(args)
# 下面将定义一个PPO智能体类
agent = PPO_continuous(args)
# 下面将采用Trick 2技巧标准化
state_norm = Normalization(shape=args.state_dim)
if args.use_reward_norm:
reward_norm = Normalization(shape=1)
elif args.use_reward_scaling: # Trick 4:reward scaling
reward_scaling = RewardScaling(shape=1, gamma=args.gamma)
# 下面开始进行训练过程
for epsiode in range(args.max_train_steps):
# 每个回合首先对值进行初始化
epsiode_reward = 0.0
done = False
epsiode_count = 0
# 再赋予一个新的初始状态
s = env.reset()
# 对状态进行标准化操作
if args.use_state_norm:
s = state_norm(s)
if args.use_reward_scaling:
reward_scaling.reset()
# 设置一个死循环,后面若跳出便在死循环中跳出
while True:
# 每执行一个回合,count次数加1
epsiode_count += 1
a,a_logprob = agent.choose_action(s)
# 根据参数的不同选择输出是高斯分布/Beta分布调整
if args.policy_dist == "Beta":
action = 2*(a-0.5)*args.max_action
else:
action = a
# 下面是执行环境交互操作
s_,r,done = env.step(action) ## !!! 这里的环境是自己搭建的,输出每个人都不一样
s_ = s_.T[0] ## !!! 这一部起始没必要,由于我的环境有问题所以加的这一步
epsiode_reward += r
# 下面考虑状态标准化的情况
if args.use_state_norm:
s_ = state_norm(s_)
# 下面考虑奖励标准化的可能性
if args.use_reward_norm:
r = reward_norm(r)
elif args.use_reward_scaling:
r = reward_scaling(r)
# 下面考虑回合的最大运行次数(只要回合结束或者超过最大回合运行次数)
if done or epsiode_count >= args.max_episode_steps:
dw = True
else:
dw = False
# 将经验存入replayBuffer中
replay_buffer.store(s,action,a_logprob,r,s_,dw,done)
# 重新赋值状态
s = s_
# 当replaybuffer尺寸到达batchsize便会开始训练
if replay_buffer.count == args.batch_size:
agent.update(replay_buffer,epsiode)
replay_buffer.count = 0
# 如果回合结束便退出
if done:
epsiode_rewards.append(epsiode_reward)
epsiode_mean_rewards.append(np.mean(epsiode_rewards))
print("第{}次训练的奖励为{:.2f},平均奖励为{:.2f}".format(epsiode,
epsiode_reward, epsiode_mean_rewards[-1]))
break
# 如果需要画图的话
if show_picture:
plt.plot(epsiode_rewards)
plt.plot(epsiode_mean_rewards)
plt.xlabel("epsiodes")
plt.ylabel("rewards")
plt.title("Continuous PPO With Optimization")
plt.legend(["rewards","mean_rewards"])
plt.show()
return agent
# 下面将用训练好的网络跑一次例程
def test_network(args,env,agent,show_pictures=True):
epsiode_reward = 0.0
done = False
epsiode_count = 0
state_norm = Normalization(shape=args.state_dim)
# 再赋予一个新的初始状态
s = env.reset()
x_array, y_array = [s[0]], [s[1]]
if args.use_reward_norm:
reward_norm = Normalization(shape=1)
s = state_norm(s)
elif args.use_reward_scaling: # Trick 4:reward scaling
reward_scaling = RewardScaling(shape=1, gamma=args.gamma)
reward_scaling.reset()
# 设置一个死循环,后面若跳出便在死循环中跳出
while True:
# 每执行一个回合,count次数加1
epsiode_count += 1
a, a_logprob = agent.choose_action(s)
# 根据参数的不同选择输出是高斯分布/Beta分布调整
if args.policy_dist == "Beta":
action = 2 * (a - 0.5) * args.max_action
else:
action = a
# 下面是执行环境交互操作
s_, r, done = env.step(action) ## !!! 这里的环境是自己搭建的,输出每个人都不一样
epsiode_reward += r
s_ = s_.T[0] ## !!! 这一部起始没必要,由于我的环境有问题所以加的这一步
x_array.append(s_[0])
y_array.append(s_[1])
epsiode_reward += r
# 下面考虑状态标准化的情况
if args.use_state_norm:
s_ = state_norm(s_)
# 重新赋值状态
s = s_
# 如果回合结束便退出
if done or epsiode_count>=2000:
print("当前测试得分为{}".format(epsiode_reward))
break
# 下面开始画图
if show_pictures:
circle_x, circle_y = [], []
for theta in np.linspace(-np.pi, np.pi):
circle_x.append(env.SAM_x + env.SAM_range * np.cos(theta))
circle_y.append(env.SAM_y + env.SAM_range * np.sin(theta))
plt.plot(x_array, y_array, 'b.-', linewidth=1)
plt.plot(env.SAM_x, env.SAM_y, 'ro', linewidth=5)
plt.plot(circle_x, circle_y, 'r.-', linewidth=1)
plt.plot(env.target_x, env.target_y, 'go', linewidth=5)
plt.plot(x_array[-1], y_array[-1], 'm*', linewidth=5)
plt.title("one road rewards:{:.2f}".format(epsiode_reward))
plt.show()
return x_array,y_array,epsiode_reward
# 下面是主函数
if __name__ == "__main__":
# 声明环境
args = args_param(max_episode_steps=1000,batch_size=64,max_train_steps=1000,K_epochs=3)
# 声明参数
env = Fighter(delta_t=25,SAM_range=30,Fighter_range=10,SAM_time=30)
env.test(show_pictures=True,show_tips=False)
agent = train_network(args,env)
x_array,y_array,_ = test_network(args,env,agent)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。