赞
踩
学习了莫烦讲解的PPO,写了点自己的理解笔记,希望能帮到你们。
代码可以去上面的链接自己下载跑一下,这边也给出我参考莫烦自己学的,基本是一样的:
import gym import tensorflow as tf import numpy as np import matplotlib.pyplot as plt EP_MAX = 1000 EP_LEN = 200 BATCH = 32 GAMMA = 0.9 C_LR = 0.0002 A_LR = 0.0001 A_UPDATE_STEPS = 10 C_UPDATE_STEPS = 10 METHOD = [ dict(name='kl_pen', kl_target=0.01, lam=0.5), # KL penalty dict(name='clip', epsilon=0.2), # Clipped surrogate objective, find this is better ][1] class PPO: def __init__(self): self.sess = tf.Session() self.tfs = tf.placeholder(tf.float32, [None, S_DIM], 'state') self._build_anet('Critic') with tf.variable_scope('closs'): self.tfdc_r = tf.placeholder(tf.float32, [None, 1], name='discounted_r') self.adv = self.tfdc_r - self.v closs = tf.reduce_mean(tf.square(self.adv)) self.ctrain = tf.train.AdamOptimizer(C_LR).minimize(closs) pi, pi_params = self._build_anet('pi', trainable=True) oldpi, oldpi_params = self._build_anet('oldpi', trainable=False) with tf.variable_scope('sample_action'): self.sample_op = tf.squeeze(pi.sample(1), axis=0) with tf.variable_scope('update_oldpi'): self.update_oldpi_op = [oldp.assign(p) for p, oldp in zip(pi_params, oldpi_params)] with tf.variable_scope('aloss'): self.tfa = tf.placeholder(tf.float32, [None, A_DIM], 'action') self.tfadv = tf.placeholder(tf.float32, [None, 1], 'advantage') with tf.variable_scope('surrogate'): ratio = pi.prob(self.tfa) / oldpi.prob(self.tfa) surr = ratio * self.tfadv if METHOD['name'] == 'kl_pen': self.tflam = tf.placeholder(tf.float32, None, 'lambda') kl = tf.distributions.kl_divergence(oldpi, pi) self.kl_mean = tf.reduce_mean(kl) self.aloss = -(tf.reduce_mean(surr - self.tflam * kl)) else: # clipping method, find this is better self.aloss = -tf.reduce_mean(tf.minimum( surr, tf.clip_by_value(ratio, 1. - METHOD['epsilon'], 1. + METHOD['epsilon']) * self.tfadv)) self.atrain = tf.train.AdamOptimizer(A_LR).minimize(self.aloss) tf.summary.FileWriter('log/', self.sess.graph) self.sess.run(tf.global_variables_initializer()) def _build_anet(self, name, trainable=True): if name == 'Critic': with tf.variable_scope(name): # self.s_Critic = tf.placeholder(tf.float32, [None, S_DIM], 'state') l1_Critic = tf.layers.dense(self.tfs, 100, tf.nn.relu, trainable=trainable, name='l1') self.v = tf.layers.dense(l1_Critic, 1, trainable=trainable, name='value_predict') else: with tf.variable_scope(name): # self.s_Actor = tf.placeholder(tf.float32, [None, S_DIM], 'state') l1_Actor = tf.layers.dense(self.tfs, 100, tf.nn.relu, trainable=trainable, name='l1') mu = 2 * tf.layers.dense(l1_Actor, A_DIM, tf.nn.tanh, trainable=trainable, name='mu') sigma = tf.layers.dense(l1_Actor, A_DIM, tf.nn.softplus, trainable=trainable, name='sigma') norm_list = tf.distributions.Normal(loc=mu, scale=sigma) params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope=name) return norm_list, params def update(self, s, a, r): self.sess.run(self.update_oldpi_op) adv = self.sess.run(self.adv, {self.tfdc_r: r, self.tfs: s}) if METHOD['name'] == 'kl_pen': for _ in range(A_UPDATE_STEPS): _, kl = self.sess.run([self.atrain, self.kl_mean], {self.tfa: a, self.tfadv: adv, self.tfs: s, self.tflam: METHOD['lam']}) if kl > 4 * METHOD['kl_target']: # this in in google's paper break if kl < METHOD['kl_target'] / 1.5: # adaptive lambda, this is in OpenAI's paper METHOD['lam'] /= 2 elif kl > METHOD['kl_target'] * 1.5: METHOD['lam'] *= 2 METHOD['lam'] = np.clip(METHOD['lam'], 1e-4, 10) # sometimes explode, this clipping is my solution else: [self.sess.run(self.atrain, {self.tfs: s, self.tfa: a, self.tfadv: adv}) for _ in range(A_UPDATE_STEPS)] [self.sess.run(self.ctrain, {self.tfs: s, self.tfdc_r: r}) for _ in range(C_UPDATE_STEPS)] def choose_action(self, s): s = s[np.newaxis, :] a = self.sess.run(self.sample_op, {self.tfs: s})[0] return np.clip(a, -2, 2) def get_v(self, s): if s.ndim < 2: s = s[np.newaxis, :] return self.sess.run(self.v, {self.tfs: s}) env = gym.make('Pendulum-v0').unwrapped S_DIM = env.observation_space.shape[0] A_DIM = env.action_space.shape[0] ppo = PPO() all_ep_r = [] for ep in range(EP_MAX): s = env.reset() buffer_s, buffer_a, buffer_r = [], [], [] ep_r = 0 for t in range(EP_LEN): env.render() a = ppo.choose_action(s) s_, r, done, _ = env.step(a) buffer_s.append(s) buffer_a.append(a) buffer_r.append((r+8)/8) s = s_ ep_r += r if (t+1) % BATCH == 0 or t == EP_LEN - 1: v_s_ = ppo.get_v(s_) discounted_r = [] for r in buffer_r[::-1]: v_s_ = r + GAMMA*v_s_ discounted_r.append(v_s_) discounted_r.reverse() bs, ba, br = np.vstack(buffer_s), np.vstack(buffer_a), np.vstack(discounted_r) buffer_s, buffer_a, buffer_r = [], [], [] ppo.update(bs, ba, br) if ep == 0: all_ep_r.append(ep_r) else: all_ep_r.append(all_ep_r[-1]*0.9 + ep_r*0.1) print('Ep:%d | Ep_r:%f' % (ep, ep_r)) plt.plot(np.arange(len(all_ep_r)), all_ep_r) plt.xlabel('Episode') plt.ylabel('Moving averaged episode reward') plt.show()
PPO算法本质上是一个AC算法,有Actor和Critic神经网络,其中,Critic网络的更新方式和AC算法差不多,Actor网络我感觉和Q-Learning一样有新旧神经网络,并周期性的更新旧神经网络。Critci网络就不多说了,不懂的可以参考一下莫烦的教程和我之前写得一篇理解,adv相当于AC中的TD_error。Actor网络主要作用就是决定策略 π \pi π(pi),程序中实现的时候假设策略是一个正态分布,所以神经网络主要是预测合适的 μ \mu μ(mu)和 σ \sigma σ(sigma)。然后根据这个分布选择动作,作用于环境,环境反馈下个状态等等信息。程序更新神经网络实现的时候,会存储32个动作及其环境输出的信息来更新网络,就是上面流程图中提到的batch和buffer,其中缓存下来的reward还需要做一个discounted的转换(就是一个累计的reward)。
程序主要分为两部分,一部分是PPO类,还有一部分就是主程序。实现的思路莫烦老师已经已经讲的很清楚了,这边就不赘述了,截图蹭页数
主程序讲的就是环境env和算法交互的内容,输入环境内容,输出算法决策并更新算法参数。
几个我自己看程序的疑惑点记录一下,主要是在PPO类中:
在__init__
sample_action中self.sample_op = tf.squeeze(pi.sample(1), axis=0)
,这边pi
是一个正态分布,sample(1)
就是采一个点(就是选一个动作),我们调整一下程序调试一下:
# self.sample_op = tf.squeeze(pi.sample(1), axis=0)
self.sample_op = pi.sample(1)
…………
a = self.sess.run(self.sample_op, {self.tfs: s})
看下输出结果:
再调整一下程序:
self.sample_op = tf.squeeze(pi.sample(1), axis=0)
# self.sample_op = pi.sample(1)
…………
a = self.sess.run(self.sample_op, {self.tfs: s})
输出结果:
明显压缩了一维。写一段程序帮助理解一下:
简单的来说,squeeze是改变shape的,里面的内容是不变的:把所有一维的抹去(参数axis是锁定要抹去的维数的,下面一段程序axis=0,就是指抹去shape中的第一个1)。
ratio = pi.prob(self.tfa) / oldpi.prob(self.tfa)
这个函数是用来求对应点的概率密度的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。