赞
踩
最近几个月一直在看强化学习,把强化学习导论(Reinforcement Learning:An Introduction)跟着b站视频看了一遍,发现书上的大部分是理论,实践代码并不多,所以可以看看知乎David Silver强化学习公开课中文讲解及实践,看完之后呢就会发现,强化学习在沿着解决连续动作和深度卷积神经网络逼近值函数来解决问题。下面先写出目前所学到的知识,后期会继续更新。
Q-Learning是强化学习算法中极其重要的算法,Q即为Q(s,a)就是在某一时刻的 s 状态下,采取 动作a 动作所获得的收益,环境会根据动作反馈相应的回报reward,所以Q-Learning算法的主要思想就是将状态与动作构建成一张Q-table来存储Q值,然后根据Q值来选取能够获得最大的收益的动作。
缺点:Q-learning需要一个Q table,在状态很多的情况下,Q table会很大,查找和存储都需要消耗大量的时间和空间。而且这些表中的动作都是离散的动作,不能是连续的动作。
问题:在实际问题中,环境中有许多状态,其每个状态有许多动作,这样使得Q-table显得没有那么的好。
解决方法:提出DQN
DQN对Q-learning的修改主要体现在以下三个方面:
(1)DQN利用深度卷积神经网络逼近值函数;
(2)DQN利用了经验回放对强化学习的学习过程进行训练;
(3)DQN独立设置了目标网络来单独处理时间差分算法中的TD偏差。
(1)DQN利用深度卷积神经网络:
网络的作用是通过输入状态和行为可直接输出这个行为的价值Q,不需要再记录表格。
(2)DQN利用经验回放:
在训练过程中,会维护一个序列样本池Dt={e1,…,et},其中et={st,at,rt,s(t+1)},et就是在状态st下,采取了动作at,转移到了状态s(t+1),得到回报rt,这样就形成了一个样本(经验)。回放的意思就是在训练中,比如让agent玩游戏,并不是把样本按照时间顺序喂给网络,而是在一局游戏未结束之前,把生成的样本(经验)都更新地扔到经验池中,从池中平均采样minBatch个作为训练样本。
好处:
这样回放机制就会减少应用于高度相关的状态序列。:因为前后样本存在关联导致的强化学习震荡和发散的问题。
(3)DQN独立设置目标网络
Off-policy是Q-Learning的特点,DQN中也延用了这一特点。而不同的是,Q-Learning中用来计算target和预测值的Q是同一个Q,也就是说使用了相同的神经网络。这样带来的一个问题就是,每次更新神经网络的时候,target也都会更新,这样会容易导致参数不收敛。回忆在有监督学习中,标签label都是固定的,不会随着参数的更新而改变。因此DQN在原来的Q网络的基础上又引入了一个target Q网络,即用来计算target的网络。它和Q网络结构一样,初始的权重也一样,只是Q网络每次迭代都会更新,而target Q网络是每隔一段时间才会更新。
最后附上DQN过程直观图:
DQN代码实现如下:(代码中有自己加上去的解释)
import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import gym # Hyper Parameters 参数 BATCH_SIZE = 32 LR = 0.01 # learning rate EPSILON = 0.9 # greedy policy GAMMA = 0.9 # reward discount TARGET_REPLACE_ITER = 100 # target update frequency MEMORY_CAPACITY = 2000 env = gym.make('CartPole-v0') #创造环境 env = env.unwrapped#据说不做这个动作会有很多限制,unwrapped是打开限制的意思,用env.unwrapped可以得到原始的类,原始类想step多久就多久,不会200步后失败: N_ACTIONS = env.action_space.n #动作的个数 N_STATES = env.observation_space.shape[0] #查看这个环境中observation的特征即状态有多少个 ENV_A_SHAPE = 0 if isinstance(env.action_space.sample(), int) else env.action_space.sample().shape # to confirm the shape class Net(nn.Module): #神经网络 def __init__(self, ): super(Net, self).__init__() self.fc1 = nn.Linear(N_STATES, 50) #输入状态 self.fc1.weight.data.normal_(0, 0.1) # initialization self.out = nn.Linear(50, N_ACTIONS) #输出该状态下的所有动作的价值 self.out.weight.data.normal_(0, 0.1) # initialization def forward(self, x): x = self.fc1(x) x = F.relu(x) actions_value = self.out(x) return actions_value #返回动作的价值 class DQN(object): def __init__(self): self.eval_net, self.target_net = Net(), Net() #两个相同的网络,参数不同,需要两个网络:target网络每隔段时间更新一次。避免参数一直更新 self.learn_step_counter = 0 #学了多少步 # for target updating self.memory_counter = 0 #记忆库位置的计数 # for storing memory self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2)) # 初始化记忆库 self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR) self.loss_func = nn.MSELoss() def choose_action(self, x): #根据观测值决定动作 x = torch.unsqueeze(torch.FloatTensor(x), 0) #输入观测者,用变量包起来传入神经网络 # input only one sample if np.random.uniform() < EPSILON: # 如果概率小于一个随机数,采取贪婪算法,选取最大值 actions_value = self.eval_net.forward(x) #扔到网络中,输出行为价值 action = torch.max(actions_value, 1)[1].data.numpy() #选取最大的行为价值 # torch.max()[0]:只返回最大值的每个数 #torch.max()[1]:只返回最大值的每个索引 action = action[0] if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE) # return the argmax index else: #随机选取动作 action = np.random.randint(0, N_ACTIONS) #随机选取动作 action = action if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE) #返回的动作的下标(第几个) return action def store_transition(self, s, a, r, s_): #记忆库,记录之前学习的东西,就是回放机制 transition = np.hstack((s, [a, r], s_)) #所有的记忆捆到一起,并且存到相对应的位置 # replace the old memory with new memory index = self.memory_counter % MEMORY_CAPACITY #新的覆盖旧的记忆 self.memory[index, :] = transition self.memory_counter += 1 def learn(self): #学习 # target parameter update if self.learn_step_counter % TARGET_REPLACE_ITER == 0: #隔多少步(学习多少次)target Q网络更新一下 self.target_net.load_state_dict(self.eval_net.state_dict()) #val_net的数值复制到target_net self.learn_step_counter += 1 # sample batch transitions sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE) b_memory = self.memory[sample_index, :] #随机抽取记忆,并进行下面的分开打包 b_s = torch.FloatTensor(b_memory[:, :N_STATES]) b_a = torch.LongTensor(b_memory[:, N_STATES:N_STATES+1].astype(int)) b_r = torch.FloatTensor(b_memory[:, N_STATES+1:N_STATES+2]) b_s_ = torch.FloatTensor(b_memory[:, -N_STATES:]) # q_eval w.r.t the action in experience QDN的学习过程 q_eval = self.eval_net(b_s).gather(1, b_a) # shape (batch, 1) 之前做完的动作的价值 q_next = self.target_net(b_s_).detach() # detach 就是不需要反向传播 # Q(St,At)(真实值)←Q(St,At)(旧值)+α[R(t+1)+γmax Q(S(t+1),a,θ)−Q(St,At,θ)] q_target = b_r + GAMMA * q_next.max(1)[0].view(BATCH_SIZE, 1) # shape (batch, 1) loss = self.loss_func(q_eval, q_target) #预测值和真实值 self.optimizer.zero_grad() loss.backward() self.optimizer.step() dqn = DQN() print('\nCollecting experience...') for i_episode in range(400): s = env.reset() #现在所处的状态,复位 ep_r = 0 while True: env.render() #环境渲染,可看到屏幕的环境 a = dqn.choose_action(s) #根据现在的状态采取一个行为 # take action s_, r, done, info = env.step(a) #环境根据我的行为所给的反馈 # modify the reward x, x_dot, theta, theta_dot = s_ r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8 r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5 r = r1 + r2 dqn.store_transition(s, a, r, s_) #存储环境给我的反馈 ep_r += r if dqn.memory_counter > MEMORY00_CAPACITY: dqn.learn() #学习 if done: print('Ep: ', i_episode, '| Ep_r: ', round(ep_r, 2)) if done: break s = s_
在学习DDPG算法之前首先要学习AC(Actor Critic)算法。因为AC算法思想很重要。
Actor Critic算法
我们有了像 Q-learning这么伟大的算法, 为什么还要瞎折腾出一个 Actor-Critic? 原来 Actor-Critic 的 Actor 的前生是 Policy Gradients, 这能让它毫不费力地在连续动作中选取合适的动作, 而 Q-learning 做这件事会瘫痪. 那为什么不直接用 Policy Gradients 呢? 原来 Actor Critic 中的 Critic 的前生是 Q-learning 或者其他的 以值为基础的学习法 , 能进行单步更新, 而传统的 Policy Gradients 则是回合更新, 这降低了学习效率。
普通的Actor-Critic
AC算法其实用到了两个网络:
一个输出策略负责选择动作,我们把这个网络成为Actor;
一个负责计算每个动作的分数,我们把这个网络成为Critic。
Actor在台上跳舞,一开始舞姿并不好看,Critic根据Actor的舞姿打分。Actor通过Critic给出的分数,去学习:如果Critic给的分数高,那么Actor会调整这个动作的输出概率;相反,如果Critic给的分数低,那么就减少这个动作输出的概率。
DDPG算法中的Actor-Critic
Critic:
Critic网络的作用是预估Q,虽然它还叫Critic,但和普通AC中的Critic不一样,这里预估的是Q不是V;
注意Critic的输入有两个:动作和状态,需要一起输入到Critic中;
Critic网络的loss其还是和普通AC一样,用的是TD-error。
Actor:
和普通AC不同,Actor输出的是一个动作;
Actor的功能是,输出一个动作A,这个动作A输入到Critic后,能够获得最大的Q值。
所以Actor的更新方式和AC不同,不是用带权重梯度更新,而是用梯度上升。
DDPG要解决的问题
尽管DQN解决了高维观测空间的问题,但它只能处理离散的和低维的动作空间。许多令人关注的任务,尤其是物理控制任务,具有连续的(实际值)和高维度的动作空间。 DQN无法直接应用于连续域,因为DQN依赖于找到使动作值函数最大化的动作(是因为maxQ(s’,a’)函数只能处理离散型的),在连续值的情况下,每个步骤都需要迭代优化过程。
DDPG是采用的也是Actor-Critic架构,是基于DQN进行改进的。DQN中的action space必须是离散的,所以不能处理连续的动作空间的问题。DDPG在其基础上进行改动,引入了一个Actor Network,让一个网络来的输出来得到连续的动作空间。
DDPG的核心思想
DDPG的工作过程
为了稳定这个Q_target,DDPG分别给critic网络和策略网络搭建了target_network。
DDPG的四个网络
DDPG既然来自DQN,那当然也有经验回放与双网络,所以在 Critic 部分增加一个目标网络去计算目标 Q值,但是还需要选取动作用来估计目标Q值,由于我们有自己的Actor策略网络,用类似的做法,再增加一个Actor目标网络,所以DDPG算法一共包含了四个网络,分别是:Actor当前网络,Actor目标网络,Critic当前网络,Critic目标网络,2个Actor网络的结构相同,2个Critic网络的结构相同。这四个网络的功能如下:
Actor当前网络:负责策略网络参数 θ 的迭代更新,负责根据当前状态 S 选择当前动作 A,用于和环境交互生成 S’和 R。
Actor目标网络:负责根据经验回放池中采样的下一状态 S’ 选择最优下一动作 A’,网络参数 θ定期从 θ 复制。
Critic当前网络:负责价值网络参数w的迭代更新,负责计算负责计算当前Q值 Q(S,A,w) 。目标Q 值 yi=R+γQ′(S′,A′,w′) 。
Critic目标网络:负责计算目标Q值中的 Q′(S′,A′,w′)部分,网络参数 w′定期从 w复制。
这里训练需要用到的数据就是s,a,r,s’,只需用到这四个数据,用Replay Memory把这些数据存起来,然后sample进来训练就好了。因为DDPG使用了经验回放这个技巧,所以DDPG是一个off-poilcy算法。
最后回顾DQN在Q-Learning基础上所做的改进:使用了深度神经网络做函数近似;使用经验回放;使用target网络。DDPG类似的也使用了深度神经网络,经验回放和target网络。不过DQN中的target更新是hard update,即每隔固定步数更新一次target网络,DDPG使用soft update,每一步都会更新target网络,只不过更新的幅度非常小。
下面附DDPG代码和代码相应的解释
import argparse from itertools import count import os, sys, random import numpy as np import gym import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.distributions import Normal from tensorboardX import SummaryWriter ''' Implementation of Deep Deterministic Policy Gradients (DDPG) with pytorch riginal paper: https://arxiv.org/abs/1509.02971 Not the author's implementation ! ''' parser = argparse.ArgumentParser() parser.add_argument('--mode', default='train', type=str) # mode = 'train' or 'test' # OpenAI gym environment name, # ['BipedalWalker-v2', 'Pendulum-v0'] or any continuous environment # Note that DDPG is feasible about hyper-parameters. # You should fine-tuning if you change to another environment. parser.add_argument("--env_name", default="Pendulum-v0") parser.add_argument('--tau', default=0.005, type=float) # target smoothing coefficient parser.add_argument('--target_update_interval', default=1, type=int) parser.add_argument('--test_iteration', default=10, type=int) parser.add_argument('--learning_rate', default=1e-4, type=float) parser.add_argument('--gamma', default=0.99, type=int) # discounted factor parser.add_argument('--capacity', default=1000000, type=int) # replay buffer size parser.add_argument('--batch_size', default=100, type=int) # mini batch size parser.add_argument('--seed', default=False, type=bool) parser.add_argument('--random_seed', default=9527, type=int) # optional parameters parser.add_argument('--sample_frequency', default=2000, type=int) parser.add_argument('--render', default=False, type=bool) # show UI or not parser.add_argument('--log_interval', default=50, type=int) # parser.add_argument('--load', default=False, type=bool) # load model parser.add_argument('--render_interval', default=100, type=int) # after render_interval, the env.render() will work parser.add_argument('--exploration_noise', default=0.1, type=float) parser.add_argument('--max_episode', default=100000, type=int) # num of games parser.add_argument('--print_log', default=5, type=int) parser.add_argument('--update_iteration', default=200, type=int) args = parser.parse_args() device = 'cuda' if torch.cuda.is_available() else 'cpu' script_name = os.path.basename(__file__) env = gym.make(args.env_name) #制作环境 if args.seed: env.seed(args.random_seed) torch.manual_seed(args.random_seed) np.random.seed(args.random_seed) state_dim = env.observation_space.shape[0] #状态数 action_dim = env.action_space.shape[0] #动作数 max_action = float(env.action_space.high[0]) #最大化动作 min_Val = torch.tensor(1e-7).float().to(device) # min value 最小值 directory = './exp' + script_name + args.env_name +'./' class Replay_buffer(): #初始化Replay Buffer R,因为强化学习的马尔科夫序列之间的数据具有非常大的关联性, #采用R的目的就是打乱数据之间的相关性,使得数据之间满足独立同分布。 # Replay Buffer就是用来存储一系列等待学习的SARS片段。 #将学习的序列存储到R中,然后随机批量的读取R中的序列进行学习模型。 ''' Code based on: https://github.com/openai/baselines/blob/master/baselines/deepq/replay_buffer.py Expects tuples of (state, next_state, action, reward, done) ''' def __init__(self, max_size=args.capacity): self.storage = [] self.max_size = max_size self.ptr = 0 def push(self, data): if len(self.storage) == self.max_size: #判断存储池是否满,如果满了,就让新的值代替旧值 self.storage[int(self.ptr)] = data self.ptr = (self.ptr + 1) % self.max_size else: self.storage.append(data) #如果没有满,就直接在后面存储即可 def sample(self, batch_size): ind = np.random.randint(0, len(self.storage), size=batch_size) x, y, u, r, d = [], [], [], [], [] #可以设置Replay Buffer的容量,push函数是向buffer中添加一个SARS片段;sample代表从buffer中采样batch size个片段 for i in ind: X, Y, U, R, D = self.storage[i] x.append(np.array(X, copy=False)) y.append(np.array(Y, copy=False)) u.append(np.array(U, copy=False)) r.append(np.array(R, copy=False)) d.append(np.array(D, copy=False)) return np.array(x), np.array(y), np.array(u), np.array(r).reshape(-1, 1), np.array(d).reshape(-1, 1) class Actor(nn.Module): def __init__(self, state_dim, action_dim, max_action): super(Actor, self).__init__() self.l1 = nn.Linear(state_dim, 400) self.l2 = nn.Linear(400, 300) self.l3 = nn.Linear(300, action_dim) #用Actor来得到连续的动作,此时输出的真实动作而不是选择动作的概率 self.max_action = max_action def forward(self, x): x = F.relu(self.l1(x)) x = F.relu(self.l2(x)) x = self.max_action * torch.tanh(self.l3(x)) #Actor作用是接收状态描述,输出一个action, 由于DDPG中的动作空间要求是连续的,所以使用了一个tanh return x class Critic(nn.Module): #critic的输入是状态和该状态的动作 def __init__(self, state_dim, action_dim): super(Critic, self).__init__() self.l1 = nn.Linear(state_dim + action_dim, 400) self.l2 = nn.Linear(400 , 300) self.l3 = nn.Linear(300, 1) #最后输出不是动作,是1,表示价值 def forward(self, x, u): x = F.relu(self.l1(torch.cat([x, u], 1))) #在第一维度上对状态和动作进行连接 x = F.relu(self.l2(x)) x = self.l3(x) return x #Critic批评者,在DDPG中,接受来自Actor的一个Action值和当前的状态,输出的是当前状态下,采用Action动作以后得到的关于Q的期望。 class DDPG(object): #DDPG用到了以上的所有对象,包括Critic、Target Critic、Actor、Target Actor、memory。 def __init__(self, state_dim, action_dim, max_action): #定义四个网络:actor,actor_target,critic,critic_target self.actor = Actor(state_dim, action_dim, max_action).to(device) self.actor_target = Actor(state_dim, action_dim, max_action).to(device) #创建actor_target模型 self.actor_target.load_state_dict(self.actor.state_dict()) #将actor数据加载到actor_target中 self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4) #优化策略网络参数 self.critic = Critic(state_dim, action_dim).to(device) self.critic_target = Critic(state_dim, action_dim).to(device) #创建critic_target模型 self.critic_target.load_state_dict(self.critic.state_dict()) #将critic的数据加载到critic_target模型中 self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3) #优化Q网络 self.replay_buffer = Replay_buffer() # self.writer = SummaryWriter(directory) #tensorbord中的可视化函数 self.num_critic_update_iteration = 0 #critic的迭代次数 self.num_actor_update_iteration = 0 #actor的迭代次数 self.num_training = 0 def select_action(self, state): state = torch.FloatTensor(state.reshape(1, -1)).to(device) return self.actor(state).cpu().data.numpy().flatten() def update(self): #将所有变量转换为张量 for it in range(args.update_iteration): # Sample replay buffer x, y, u, r, d = self.replay_buffer.sample(args.batch_size) #读取存储池中的数据 state = torch.FloatTensor(x).to(device) # action = torch.FloatTensor(u).to(device) next_state = torch.FloatTensor(y).to(device) done = torch.FloatTensor(1-d).to(device) reward = torch.FloatTensor(r).to(device) # Compute the target Q value target_Q = self.critic_target(next_state, self.actor_target(next_state)) #传入下一个状态和下一状态的动作 target_Q = reward + (done * args.gamma * target_Q).detach() #target=r + r*target_Q(在Q网络中传来的) # Get current Q estimate,注意critic将(s_t,a)作为输入 current_Q = self.critic(state, action) #得到真实的Q值 # Compute critic loss critic_loss = F.mse_loss(current_Q, target_Q) #计算真实Q值和目标Q值的差值损失 self.writer.add_scalar('Loss/critic_loss', critic_loss, global_step=self.num_critic_update_iteration) # Optimize the critic self.critic_optimizer.zero_grad() #梯度下降来优化Q网络参数 critic_loss.backward() self.critic_optimizer.step() # Compute actor loss actor_loss = -self.critic(state, self.actor(state)).mean() #计算策略网络的损失,第二个参数是actor做的动作 self.writer.add_scalar('Loss/actor_loss', actor_loss, global_step=self.num_actor_update_iteration) # Optimize the actor 优化策略网络的参数 self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() # Update the frozen target models 更新封住的目标模型2 #更新Critic目标网络和Actor目标网络参数: #w′←τw+(1−τ)w′ 平滑的更新一点点,软更新 #θ′←τθ+(1−τ)θ′ for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()): target_param.data.copy_(args.tau * param.data + (1 - args.tau) * target_param.data) for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()): target_param.data.copy_(args.tau * param.data + (1 - args.tau) * target_param.data) self.num_actor_update_iteration += 1 self.num_critic_update_iteration += 1 def save(self): torch.save(self.actor.state_dict(), directory + 'actor.pth') torch.save(self.critic.state_dict(), directory + 'critic.pth') # print("====================================") # print("Model has been saved...") # print("====================================") def load(self): self.actor.load_state_dict(torch.load(directory + 'actor.pth')) self.critic.load_state_dict(torch.load(directory + 'critic.pth')) print("====================================") print("model has been loaded...") print("====================================") #将state放到actor对象得到action #将state,action放到critic对象得到policy loss #然后target actor和target critic也按照以上过程得到target value #根据target value 计算expected value: def main(): agent = DDPG(state_dim, action_dim, max_action) ep_r = 0 if args.mode == 'test': #测试 agent.load() for i in range(args.test_iteration): state = env.reset() for t in count(): action = agent.select_action(state) next_state, reward, done, info = env.step(np.float32(action)) ep_r += reward env.render() if done or t >= args.max_length_of_trajectory: print("Ep_i \t{}, the ep_r is \t{:0.2f}, the step is \t{}".format(i, ep_r, t)) ep_r = 0 break state = next_state elif args.mode == 'train': #训练 if args.load: agent.load() total_step = 0 for i in range(args.max_episode): total_reward = 0 step =0 state = env.reset() for t in count(): action = agent.select_action(state) action = (action + np.random.normal(0, args.exploration_noise, size=env.action_space.shape[0])).clip( env.action_space.low, env.action_space.high) #对动作增加噪音,对其保持探索 next_state, reward, done, info = env.step(action) if args.render and i >= args.render_interval : env.render() agent.replay_buffer.push((state, next_state, action, reward, np.float(done))) state = next_state if done: break step += 1 total_reward += reward total_step += step+1 print("Total T:{} Episode: \t{} Total Reward: \t{:0.2f}".format(total_step, i, total_reward)) agent.update() # "Total T: %d Episode Num: %d Episode T: %d Reward: %f if i % args.log_interval == 0: agent.save() else: raise NameError("mode wrong!!!") if __name__ == '__main__': main()
编写来之不易,对大家有帮助就点个赞赞哦,谢谢大家!后期我会继续更新深度学习,强化学习等经典算法。关注我及时了解最新科研进展。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。