赞
踩
- #动作数量
- self.n_actions
- #状态数量
- self.n_features
- #learning_rate学习速率
- self.lr
- #Q-learning中reward衰减因子
- self.gamma
- #e-greedy的选择概率最大值
- self.epsilon_max
- #更新Q现实网络参数的步骤数
- self.replace_target_iter
- #存储记忆的数量
- self.memory_size
- #每次从记忆库中取的样本数量
- self.batch_size = batch_size
- self.epsilon_increment = e_greedy_increment
- self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max
- #学习的步骤
- self.learn_step_counter
- #记忆库,此刻的n_feature + 下一步的n_feature + reward + action
- self.memory = np.zeros((self.memory_size, n_features * 2 + 2))
-
- #利用Q目标的参数替换Q估计中的参数
- t_params = tf.get_collection('target_net_params')
- e_params = tf.get_collection('eval_net_params')
- #生成了一个tensorflow操作列表[tf.assign(t1,e1), tf.assign(t2,e2), tf.assign(t3,e3)]
- self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]
- def _build_net(self):
- #输入
- self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s')
- #Q现实输入
- self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target')
-
- with tf.variable_scope('eval_net'):
- #collection
- c_names = ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
- #神经元数量
- n_l1 = 10
- #权值
- w_initializer = tf.random_normal_initializer(0., 0.3)
- #偏置
- b_initializer = tf.constant_initializer(0.1)
-
- #第一层神经元
- with tf.variable_scope('l1'):
- w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
- b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
- l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1)
- #第二层神经元
- with tf.variable_scope('l2'):
- w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
- b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
- self.q_eval = tf.matmul(l1, w2) + b2
-
- #基于Q估计与Q现实,构造loss-function
- with tf.variable_scope('loss'):
- self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))
-
- #训练
- with tf.variable_scope('train'):
- self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)
- #输入
- self.s_sub = tf.placeholder(tf.float32, [None, self.n_features], name='s_sub')
- with tf.variable_scope('target_net'):
- #collection
- c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
-
- #第一层神经元
- with tf.variable_scope('l1'):
- w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
- b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
- l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)
-
- #第二层神经元
- with tf.variable_scope('l2'):
- w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
- b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
- self.q_next = tf.matmul(l1, w2) + b2
- def store_transition(self, s, a, r, s_):
- if not hasattr(self, 'memory_counter'):
- self.memory_counter = 0
-
- #状态信息list ==> [x, y]
- #[action, reward]动作与奖励信息合并为list
- #下一步状态信息 ==> [x_next, y_next]
- transition = np.hstack((s, [a, r], s_))
- #hstack的结果为 ==> [x, y, a, r, x_next, y_next]
-
- #每过memory_size,替换存储值
- index = self.memory_counter % self.memory_size
-
- #memory为二维列表,transition为一行向量,插入index行中
- self.memory[index, :] = transition
- self.memory_counter += 1
- def choose_action(self, observation):
- # 将observation的list[x, y]转为行向量[[x, y]]
- observation = observation[np.newaxis, :]
-
- if np.random.uniform() < self.epsilon:
- # 得到每个action的q的估计值
- actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation})
- # 选择q值最大的action
- action = np.argmax(actions_value)
- else:
- action = np.random.randint(0, self.n_actions)
- return action
- def learn(self):
- #更换参数
- if self.learn_step_counter % self.replace_target_iter == 0:
- self.sess.run(self.replace_target_op)
-
- if self.memory_counter > self.memory_size:
- sample_index = np.random.choice(self.memory_size, size=self.batch_size)
- else:
- sample_index = np.random.choice(self.memory_counter, size=self.batch_size)
-
- #从memory中抽取一个记忆值,一个行向量
- #[x, y, a, r, x_next, y_next]
- batch_memory = self.memory[sample_index, :]
-
- q_next, q_eval = self.sess.run(
- [self.q_next, self.q_eval],
- feed_dict={
- self.s_: batch_memory[:, -self.n_features:], # fixed params
- self.s: batch_memory[:, :self.n_features], # newest params
- })
-
- q_target = q_eval.copy()
-
- batch_index = np.arange(self.batch_size, dtype=np.int32)
- eval_act_index = batch_memory[:, self.n_features].astype(int)
- reward = batch_memory[:, self.n_features + 1]
-
- q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)
-
- #训练网络
- _, self.cost = self.sess.run([self._train_op, self.loss],
- feed_dict={self.s: batch_memory[:, :self.n_features],
- self.q_target: q_target})
- self.cost_his.append(self.cost)
-
- # increasing epsilon
- self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
- self.learn_step_counter += 1
举例说明上述过程
action_0 | action_1 | action_2 |
---|---|---|
1 | 2 | 1 |
2 | 3 | 2 |
行:每一个样本
列:每一个action对应的Q值
一维list ==> [0, 1] #长度:bactch_size
一维list ==> [1, 0]
一维list ==> [1, 2]
action_0 | action_1 | action_2 |
---|---|---|
-1 | 2 | 1 |
2 | 3 | -2 |
4. 利用更新后的q-target与q-eval之间的差值进行训练
- def run_maze():
- # 游戏的每一个回合需要的步数
- step = 0
- # 游戏的回合
- for episode in range(300):
- # 初始化观察值
- observation = env.reset()
-
- while True:
- # 开始环境仿真
- env.render()
-
- # 选择动作
- action = RL.choose_action(observation)
-
- # 加入动作后,环境进行仿真
- # 获取了执行action后,下一步的观测值observation
- # 获取了奖励reward
- # 游戏是否结束标志done
- observation_, reward, done = env.step(action)
-
- # 存储样本
- RL.store_transition(observation, action, reward, observation_)
-
- if (step > 200) and (step % 5 == 0):
- # 随机抽取样本,网络进行学习
- RL.learn()
-
- # 交换观测值
- observation = observation_
-
- # 判断游戏是否结束
- if done:
- break
-
- step += 1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。