赞
踩
本文主要是介绍DQN原理和算法伪代码和代码实现,算法实现包括pytorch版本和stable-baselines3版本
强化学习与监督学习最大的不同是监督学习有监督信号,而强化学习只有奖励信号。
举例如下:
例 1:监督方式训练 |
---|
假设我是个用户,我要从北京驾车去上海。从北京出发之前,我有个不太好的模型做预测,模型告诉我总车程是14小时当我到达上海,我知道自己花的实际时间是 16 小时,这样这次旅行就形成了一个训练样本:这样就可以根据这个样本对模型进行更新,让模型更准确一点 |
显然这个例子中,这个
y
=
16
y=16
y=16是监督信号,模型训练步骤如下:
在完成一次梯度下降之后,如果再让模型做一次预测,那么模型的预测值会比原先更接近 y = 16.
强化学习的情形是没有监督信号,只有奖励信号。和上面的例子对应就是如下的情形:
例 2:时间差分方式训练 |
---|
在一局 (Episode) 实验中,把从起始到结束的所有奖励记作:
定义折扣率
γ
∈
[
0
,
1
]
\gamma \in [0, 1]
γ∈[0,1]。折扣回报的定义是:
在游戏尚未结束的
t
t
t 时刻,
U
t
U_t
Ut 是一个未知的随机变量,其随机性来自于
t
t
t 时刻之后的所有状态与动作。动作价值函数的定义是:
最优动作价值函数用最大化消除策略 π:
可以这样理解
Q
∗
Q_*
Q∗:已知
s
t
s_t
st和
a
t
a_t
at,不论未来采取什么样的策略
π
\pi
π,回报
U
t
U_t
Ut的期望不可能超过
Q
∗
Q_*
Q∗。
所以假如知道了
Q
∗
Q_*
Q∗,就可以用
Q
∗
Q_*
Q∗去做控制。举例:对于动作空间
A
=
{
左
,
右
,
上
}
\mathcal{A} = \{左,右,上\}
A={左,右,上},对于当前状态
s
t
s_t
st,有
如果现在智能体选择向左走,不管后面采取什么策略,回报的期望不会超过610,智能体应该选择向上跳。所以我们希望知道
Q
∗
Q_∗
Q∗,因为它就像是先知一般,可以预见未来,在 t 时刻就预见 t 到 n 时刻之间的累计奖励的期望。
所以问题是怎么得到
Q
∗
Q_∗
Q∗?深度强化学习的办法就是用深度神经网络通过训练去近似:
接下来的问题是,怎么用TD的方式训练DQN。其根据就是最优贝尔曼方程
在最优贝尔曼方程中已经看到了TD目标的出现:
所以用TD的方式训练DQN的过程需要收集四元组
(
s
t
,
a
t
,
r
t
,
s
t
+
1
)
(s_t,a_t,r_t,s_{t+1})
(st,at,rt,st+1):
如下两个过程循环往复,直至收敛
经验回放数组是为了减小数据之间的关联性,另一个方法是设置目标网络(用于计算TD目标的网络)和Q网络,二者不同步更新,每次更新Q网络,而不更新目标网络,更新几次之后同步一下。
DQN算法伪代码 |
---|
输入: (1)很小的
ε
\varepsilon
ε; (2)折扣因子
γ
\gamma
γ;(3)步长
α
∈
(
0
,
1
]
\alpha \in(0,1]
α∈(0,1] 输出: Q θ Q_\theta Qθ |
[1] 初始化(1)replay memory D ,容量为N;(2)网络构架、参数
θ
\theta
θ(3)TD目标动作价值函数的参数
θ
−
=
θ
\theta^-=\theta
θ−=θ [2] For episode = 1,2,…,M: [3] 初始化一个状态: s = { x } s=\{x\} s={x},做预处理: ϕ = ϕ ( s ) \phi=\phi(s) ϕ=ϕ(s) [4] 如果 s s s不是结束状态就迭代: [5] 按照 ε \varepsilon ε-贪婪策略,从 Q θ Q_\theta Qθ函数得到一个动作 a a a [6] 执行动作a,观察到 R R R和观察 x ′ x' x′ [7] 令 s ′ = s , a , x ′ s'=s,a,x' s′=s,a,x′,做预处理 ϕ ′ = ϕ ( s ′ ) \phi'=\phi(s') ϕ′=ϕ(s′) [8] 存储一个transition ( ϕ , a , R , ϕ ′ ) (\phi,a,R,\phi') (ϕ,a,R,ϕ′) 放在D中 [9] 从D中随机采样一个minibatch of transitions: ( ϕ j , a j , R j , ϕ j + 1 ) (\phi_j,a_j,R_j,\phi_{j+1}) (ϕj,aj,Rj,ϕj+1) [10] 让 G o a l = { R j ,如果 j + 1 步结束 R j + γ max a ′ Q ^ θ − ( ϕ j + 1 , a ′ ) ,否则 Goal=\left\{Rj,如果j+1步结束Rj+γ\textcolorredmaxa′ˆQθ−(ϕj+1,a′),否则\right. Goal={Rj,如果j+1步结束Rj+γa′maxQ^θ−(ϕj+1,a′),否则 [11] θ ← θ + α ( G o a l − Q θ ( s , a ) ) ∇ θ ( Q θ ( s , a ) ) \theta\leftarrow \theta+\alpha(Goal-Q_\theta(s,a))\nabla_\theta(Q_\theta(s,a)) θ←θ+α(Goal−Qθ(s,a))∇θ(Qθ(s,a)) [12] end [13] 每C步两个网络进行一下同步: θ − = θ \theta^-=\theta θ−=θ [14] end |
分离散和连续的情况。目前只展示离散的情况
import gym import torch import random #定义环境 class MyWrapper(gym.Wrapper): def __init__(self): env = gym.make('CartPole-v1', render_mode='human') # env = gym.make('Pendulum-v1', render_mode='human') super().__init__(env) self.env = env self.step_n = 0 def reset(self): state, _ = self.env.reset() self.step_n = 0 return state def step(self, action): state, reward, done, _, info = self.env.step(action) #一局游戏最多走N步 self.step_n += 1 if self.step_n >= 200: done = True return state, reward, done, info #认识游戏环境 def test_env(env): print('env.observation_space=', env.observation_space) print('env.action_space=', env.action_space) state = env.reset() action = env.action_space.sample() next_state, reward, done, _ = env.step(action) print('state=', state) print('action=', action) print('next_state=', next_state) print('reward=', reward) print('done=', done) #得到一个动作 def get_action(Q_action_net,state): """ 根据行为网络和状态获得动作 Q_action_net:行为网络 state:状态 """ if random.random() < 0.01: return random.choice([0, 1]) #走神经网络,得到一个动作 state = torch.FloatTensor(state).reshape(1, 4) return Q_action_net(state).argmax().item() #向样本池中添加N条数据,删除M条最古老的数据 def update_data(Q_action_net,datas,len_data_buffer=1000): """ datas:经验回放数组,是一个列表,每个列表元素是一个5元组(state, action, reward, next_state, over) len_data_buffer:经验回放数组的长度 """ old_count = len(datas) # ii=1 #玩到新增了N个数据为止 while len(datas) - old_count < len_data_buffer: # print(f'第{ii}局===================') #初始化游戏 state = env.reset() #玩到游戏结束为止 over = False while not over: #根据当前状态得到一个动作 action = get_action(Q_action_net,state) #执行动作,得到反馈 next_state, reward, over, _ = env.step(action) #记录数据样本 datas.append((state, action, reward, next_state, over)) #更新游戏状态,开始下一个动作 state = next_state # ii=ii+1 #数据上限,超出时从最古老的开始删除 while len(datas) > len_data_buffer: datas.pop(0) return datas #获取一批数据样本 def get_sample(datas): """ datas:经验回放数组,是一个列表,每个列表元素是一个5元组(state, action, reward, next_state, over) """ #从样本池中采样 samples = random.sample(datas, 64) #[b, 4] state = torch.FloatTensor([i[0] for i in samples]).reshape(-1, 4) #[b, 1] action = torch.LongTensor([i[1] for i in samples]).reshape(-1, 1) #[b, 1] reward = torch.FloatTensor([i[2] for i in samples]).reshape(-1, 1) #[b, 4] next_state = torch.FloatTensor([i[3] for i in samples]).reshape(-1, 4) #[b, 1] over = torch.LongTensor([i[4] for i in samples]).reshape(-1, 1) return state, action, reward, next_state, over def get_value(Q_net,state, action): """ Q_net:torch模型 state:状态,pytorch tensor,第一个维度是batch维度 action:动作,pytorch tensor,与上面的状态一一对应 """ #使用状态计算出动作的logits #[b, 4] -> [b, 2] value = Q_net(state) #根据实际使用的action取出每一个值 #这个值就是模型评估的在该状态下,执行动作的分数 #在执行动作前,显然并不知道会得到的反馈和next_state #所以这里不能也不需要考虑next_state和reward #[b, 2] -> [b, 1] value = value.gather(dim=1, index=action) return value def get_target(Q_TD_target_net,reward, next_state, over): """ 获取TD目标 ---------- Q_TD_target_net : 目标网络 """ #上面已经把模型认为的状态下执行动作的分数给评估出来了 #下面使用next_state和reward计算真实的分数 #针对一个状态,它到底应该多少分,可以使用以往模型积累的经验评估 #这也是没办法的办法,因为显然没有精确解,这里使用延迟更新的next_model评估 #使用next_state计算下一个状态的分数 #[b, 4] -> [b, 2] with torch.no_grad(): target = Q_TD_target_net(next_state) #取所有动作中分数最大的 #[b, 2] -> [b, 1] target = target.max(dim=1)[0] target = target.reshape(-1, 1) #下一个状态的分数乘以一个系数,相当于权重 target *= 0.98 #如果next_state已经游戏结束,则next_state的分数是0 #因为如果下一步已经游戏结束,显然不需要再继续玩下去,也就不需要考虑next_state了. #[b, 1] * [b, 1] -> [b, 1] target *= (1 - over) #加上reward就是最终的分数 #[b, 1] + [b, 1] -> [b, 1] target += reward return target def test(Q_model): state = env.reset() reward_sum = 0 over = False while not over: action = get_action(Q_model,state) state, reward, over, _ = env.step(action) reward_sum += reward return reward_sum def train(Q_action_net,Q_TD_target_net): Q_action_net.train() optimizer = torch.optim.Adam(Q_action_net.parameters(), lr=2e-3) loss_fn = torch.nn.MSELoss() datas=[] #训练N次 for epoch in range(10): print(f'第{epoch}个epoch========================') #更新N条数据 datas=update_data(Q_action_net,datas,len_data_buffer=200) #每次更新过数据后,学习N次 for i in range(20): #采样一批数据 # print(f'第{epoch}个epoch,第{i}次更新==========') state, action, reward, next_state, over = get_sample(datas) #计算一批样本的value和target value = get_value(Q_action_net,state, action) target = get_target(Q_TD_target_net,reward, next_state, over) #更新参数 loss = loss_fn(value, target) optimizer.zero_grad() loss.backward() optimizer.step() #把model的参数复制给next_model if (i + 1) % 2 == 0: Q_TD_target_net.load_state_dict(Q_action_net.state_dict()) if epoch % 2 == 0: print(epoch, len(datas), sum([test(Q_action_net) for _ in range(5)]) / 5) return Q_action_net if __name__ == "__main__": env = MyWrapper() env.reset() test_env(env) #计算动作的模型,也是真正要用的模型 Q_action_net = torch.nn.Sequential( torch.nn.Linear(4, 128), torch.nn.ReLU(), torch.nn.Linear(128, 2), ) #经验网络,用于评估一个状态的分数 Q_TD_target_net = torch.nn.Sequential( torch.nn.Linear(4, 128), torch.nn.ReLU(), torch.nn.Linear(128, 2), ) DQN_CartPole_model=train(Q_action_net,Q_TD_target_net) torch.save(DQN_CartPole_model, 'save/4.DQN_CartPole') env.close()
另外其他版本可见:
DQN(Deep Q-Network)算法加代码实现
强化学习之stable_baseline3详细说明和各项功能的使用
参考链接:强化学习之stable_baseline3详细说明和各项功能的使用
from stable_baselines3 import DQN from stable_baselines3.common.vec_env.dummy_vec_env import DummyVecEnv from stable_baselines3.common.evaluation import evaluate_policy import gym #定义环境 class MyWrapper(gym.Wrapper): def __init__(self): env = gym.make('CartPole-v1', render_mode='human') super().__init__(env) self.env = env self.step_n = 0 def reset(self): state, _ = self.env.reset() self.step_n = 0 return state def step(self, action): state, reward, done, _, info = self.env.step(action) #一局游戏最多走N步 self.step_n += 1 if self.step_n >= 200: done = True return state, reward, done, info env = MyWrapper() env.reset() # env = gym.make('CartPole-v1', render_mode='human') # env.reset() # # 把环境向量化,如果有多个环境写成列表传入DummyVecEnv中,可以用一个线程来执行多个环境,提高训练效率 # env = DummyVecEnv([lambda : env]) # 定义一个DQN模型,设置其中的各个参数 model = DQN( "MlpPolicy", # MlpPolicy定义策略网络为MLP网络 env=env, learning_rate=5e-4, batch_size=128, buffer_size=50000, learning_starts=0, target_update_interval=250, policy_kwargs={"net_arch" : [256, 256]}, # 这里代表隐藏层为2层256个节点数的网络 verbose=0, # verbose=1代表打印训练信息,如果是0为不打印,2为打印调试信息 tensorboard_log="./tensorboard/CartPole-v1/" # 训练数据保存目录,可以用tensorboard查看 ) # 开始训练 model.learn(total_timesteps=1e4,progress_bar=True) # 策略评估,可以看到倒立摆在平稳运行了 mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10, render=False) #env.close() print("mean_reward:",mean_reward,"std_reward:",std_reward) # 保存模型到相应的目录 model.save("./save/CartPole-DQN.pkl") # 导入模型 model = DQN.load("./save/CartPole-DQN.pkl") state = env.reset() done = False score = 0 while not done: # 预测动作 action, _ = model.predict(observation=state) # 与环境互动 state, reward, done, info = env.step(action=action) score += reward env.render() env.close() print("score=",score)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。