当前位置:   article > 正文

DDPG算法_ddpg算法原理

ddpg算法原理

1. 算法原理

DDPG算法是Actor-Critic (AC) 框架下解决连续动作的一种算法。其本质为深度网络+确定策略梯度 (Deterministic Policy Gradient, DPG),之所以叫确定策略梯度,是因为与之前的动作网络不同,其动作网络输出的是一个确定的动作而不是动作概率。
王树森老师的课本中框架画的很详细。
王树森DPG原理示意图
其本质就是通过优化价值网络使之逼近动作价值函数 Q π ( s , a ) Q_{\pi}(s,a) Qπ(s,a),不断优化策略网络参数,使其输出的动作a在价值网络那里越来越能够得到更高的评分。
价值网络就是使用的经典的TD算法进行更新,为了防止出现bootstrapping导致的高估/低估,引入target_network进行TD_target的估算。
策略网络用到了确定性策略梯度算法,本质也是链式法则,其公式以及更新法则如下:
∇ θ q ( s j , μ ( s j ; θ ) ; w ) = ∇ θ μ ( s j ; θ ) ⋅ ∇ a q ( s j , a ^ j ; w ) θ ← θ + β ⋅ ∇ θ μ ( s j ; θ ) ⋅ ∇ a q ( s j , a ^ j ; w ) \nabla_{\boldsymbol{\theta}} q\left(s_{j}, \boldsymbol{\mu}\left(s_{j} ; \boldsymbol{\theta}\right) ; \boldsymbol{w}\right)=\nabla_{\boldsymbol{\theta}} \boldsymbol{\mu}\left(s_{j} ; \boldsymbol{\theta}\right) \cdot \nabla_{\boldsymbol{a}} q\left(s_{j}, \widehat{\boldsymbol{a}}_{j} ; \boldsymbol{w}\right)\\ \\ \\ \boldsymbol{\theta} \leftarrow \boldsymbol{\theta}+\beta \cdot \nabla_{\boldsymbol{\theta}} \boldsymbol{\mu}\left(s_{j} ; \boldsymbol{\theta}\right) \cdot \nabla_{\boldsymbol{a}} q\left(s_{j}, \widehat{\boldsymbol{a}}_{j} ; \boldsymbol{w}\right) θq(sj,μ(sj;θ);w)=θμ(sj;θ)aq(sj,a j;w)θθ+βθμ(sj;θ)aq(sj,a j;w)
同样的,这个也有一个target_network,用来给出计算价值函数TD-target时候的动作的选取。

2. 算法伪代码

图拍自 肖智清 《强化学习:原理与python实现》
DDPG算法伪代码

3. 关键部分的代码

3.1 环境介绍

基于pytorch框架
以下代码基于gym库的 Pendulum-v1 环境。
截图
相关参数
在这里插入图片描述
reward

3.2 代码

主要就是两个actor网络与两个critic网络的更新,以及动作的选取(使用tanh函数映射到动作的区间上去)。
动作网络:

class ActorNet(nn.Module): # define the network structure for actor and critic
    def __init__(self, s_dim, a_dim):
        super(ActorNet, self).__init__()
        self.fc1 = nn.Linear(s_dim, 30)
        self.fc1.weight.data.normal_(0, 0.1) # initialization of FC1
        self.out = nn.Linear(30, a_dim)
        self.out.weight.data.normal_(0, 0.1) # initilizaiton of OUT
    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.out(x)
        x = torch.tanh(x)   # F.tanh会报warning 这个就是为了映射到[-1,1]
        actions = x * 2 # for the game "Pendulum-v1", action range is [-2, 2]
        return actions
		# print(actions)  
		# 输出结果:tensor([[0.3480]], grad_fn=<MulBackward0>)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

评价网络:基于action和state进行打分:

class CriticNet(nn.Module):
    def __init__(self, s_dim, a_dim):
        super(CriticNet, self).__init__()
        self.fcs = nn.Linear(s_dim, 30)
        self.fcs.weight.data.normal_(0, 0.1)
        self.fca = nn.Linear(a_dim, 30)
        self.fca.weight.data.normal_(0, 0.1)
        self.out = nn.Linear(30, 1)             # 输出q(s,a;w)
        self.out.weight.data.normal_(0, 0.1)
    def forward(self, s, a):
        x = self.fcs(s)
        y = self.fca(a)
        actions_value = self.out(F.relu(x+y))   # 基于s与a进行打分
        return actions_value
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

DDPG class:

class DDPG(object):
    def __init__(self, a_dim, s_dim, a_bound):
        self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, a_bound
        self.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32)
        self.pointer = 0 # serves as updating the memory data 
        # Create the 4 network objects
        self.actor_eval = ActorNet(s_dim, a_dim)
        self.actor_target = ActorNet(s_dim, a_dim)
        self.critic_eval = CriticNet(s_dim, a_dim)
        self.critic_target = CriticNet(s_dim, a_dim)
        # create 2 optimizers for actor and critic
        # lr-learning ratio 学习率
        self.actor_optimizer = torch.optim.Adam(self.actor_eval.parameters(), lr=LR_ACTOR)
        self.critic_optimizer = torch.optim.Adam(self.critic_eval.parameters(), lr=LR_CRITIC)
        # Define the loss function for critic network update
        self.loss_func = nn.MSELoss()
    def store_transition(self, s, a, r, s_): # how to store the episodic data to buffer
        transition = np.hstack((s, a, [r], s_))
        # 按照列的顺序对于[]进行堆叠,返回[]
        index = self.pointer % MEMORY_CAPACITY # replace the old data with new data 
        self.memory[index, :] = transition
        self.pointer += 1
    
    def choose_action(self, s):
        # print(s)
        s = torch.unsqueeze(torch.FloatTensor(s), 0)
        # print('1',self.actor_eval(s)[0].detach())
        # print('2',self.actor_eval(s).detach())
        return self.actor_eval(s)[0].detach()
        # 1 tensor([0.2417])
        # 2 tensor([[0.2417]])
    
    def learn(self):
        # softly update the target networks
        # 对应伪代码2.2.5,对于AC Target网络进行软更新
        for x in self.actor_target.state_dict().keys():
            eval('self.actor_target.' + x + '.data.mul_((1-TAU))')
            eval('self.actor_target.' + x + '.data.add_(TAU*self.actor_eval.' + x + '.data)')
        for x in self.critic_target.state_dict().keys():
            eval('self.critic_target.' + x + '.data.mul_((1-TAU))')
            eval('self.critic_target.' + x + '.data.add_(TAU*self.critic_eval.' + x + '.data)')           
        # sample from buffer a mini-batch data
        indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
        batch_trans = self.memory[indices, :]
        # extract data from mini-batch of transitions including s, a, r, s_
        batch_s = torch.FloatTensor(batch_trans[:, :self.s_dim])
        batch_a = torch.FloatTensor(batch_trans[:, self.s_dim:self.s_dim + self.a_dim])
        batch_r = torch.FloatTensor(batch_trans[:, -self.s_dim - 1: -self.s_dim])
        batch_s_ = torch.FloatTensor(batch_trans[:, -self.s_dim:])
        # make action and evaluate its action values
        a = self.actor_eval(batch_s)
        # torch.Size([32, 3]) torch.Size([32, 1])
        print(batch_s.size(),a.size())
        q = self.critic_eval(batch_s, a)
        actor_loss = -torch.mean(q)
        # optimize the loss of actor network
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        # compute the target Q value using the information of next state
        a_target = self.actor_target(batch_s_)
        q_tmp = self.critic_target(batch_s_, a_target)
        q_target = batch_r + GAMMA * q_tmp
        # compute the current q value and the loss
        q_eval = self.critic_eval(batch_s, batch_a)
        td_error = self.loss_func(q_target, q_eval)
        # optimize the loss of critic network
        self.critic_optimizer.zero_grad()
        td_error.backward()
        self.critic_optimizer.step()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

主程序:

ddpg = DDPG(a_dim, s_dim, a_bound)
var = 3 # the controller of exploration which will decay during training process
t1 = time.time()
for i in range(EPISODES):
    s = env.reset()
    ep_r = 0
    #没有清零的是网络参数
    for j in range(EP_STEPS):
        if RENDER: env.render()
        # render()函数在这里扮演图像引擎的角色。
        # 一个仿真环境必不可少的两部分是物理引擎和图像引擎。物理引擎模拟环境中物体的运动规律;图像引擎用来显示环境中的物体图像
        # add explorative noise to action 伪代码2.1.1
        a = ddpg.choose_action(s)
        # print(a) 输出:tensor([0.5961])
        a = np.clip(np.random.normal(a, var), a_low_bound, a_bound)
        # np.random.normal()的意思是一个正态分布
        # clip函数给出了一个上下界范围

        s_, r, done, info, _ = env.step(a)
        ddpg.store_transition(s, a, r / 10, s_) # store the transition to memory
        #reward = -costs = -(angle_normalize(th) ** 2 + .1 * thdot ** 2 + .001 * (u ** 2))
        
        if ddpg.pointer > MEMORY_CAPACITY:
            var *= 0.9995 #衰减动作的随机性
            ddpg.learn()
            
        s = s_
        ep_r += r

        # 这玩意200步返回一个值
        if j == EP_STEPS - 1:
            print('Episode: ', i, ' Reward: %i' % (ep_r), 'Explore: %.2f' % var)
            if ep_r > -300 : RENDER = True
            break
print('Running time: ', time.time() - t1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

着重强调随机性加入:
使用一个正态分布随机化,并且方差随着探索的数量增大逐渐变小。

		a = ddpg.choose_action(s)
        # print(a) 输出:tensor([0.5961])
        a = np.clip(np.random.normal(a, var), a_low_bound, a_bound)
  • 1
  • 2
  • 3

为什么有人说强化学习其实是一种进化算法,类似于遗传算法之类的。
因为这两类算法选优的核心机制是Exploration and Exploitation(探索和开发)

4. 部分注意事项

在扩展到多维动作空间时,要注意更改:
action_net动作网络的输出、choose_action选择的动作、随机化探索中的随机化;并且要注意之间类型的转化。

4.1 动作网络修改:

主要改动forward之后的输出部分,将两个变量分别映射:

 def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.out(x)
        x[:,0] =torch.tanh(x[:,0])  * 20 -65  
        x[:,1] =torch.tanh(x[:,1]) * 180 + 90
        return x
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4.2 动作选择修改:

不再进行下标索引:

def choose_action(self, s):
	# print(s)
    s = torch.unsqueeze(torch.FloatTensor(s), 0)
    action = self.actor_eval(s).detach()
    return(action)
  • 1
  • 2
  • 3
  • 4
  • 5

使用原语句 return self.actor_eval(s)[0].detach(),只有一个输出;

4.3 加入随机性部分:

使用下标索引,分别对动作进行随机化:

a[0,0] =  np.clip(np.random.normal(a[0,0], var1), -85, -45)
a[0,1] =  np.clip(np.random.normal(a[0,1], var2), -90, 270)
# np.clip()函数强制将动作限定在一定范围区间
  • 1
  • 2
  • 3
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/376102
推荐阅读
相关标签
  

闽ICP备14008679号