赞
踩
简介:(1)主要为文献《Intelligent Power Control for Spectrum Sharing in Cognitive Radios:A Deep Reinforcement Learning Approach》在github上的开源代码进行了注释,注释中添加了自己对于问题的一些理解和疑问。这个Python源码的程序入口为main函数。main函数中调用了DQN、Power_control两个作者独立完成的包和numpy、random两个python自带的包。对于这个源码,还有部分位置没有添加好注释,需要再花一点时间理解和消化代码的意义。(2)除了为这部分源码添加注释以外,最近还学习了教材《Python编程:从入门到实践》的一部分,尝试构造几个DQN和深度学习的模板,方便以后使用。
import tensorflow as tf import numpy as np import random from collections import deque #后面的部分可以直接调用,不在需要使用collections.deque了 GAMMA = 0.8 #在更新Q函数时的折损参数 OBSERVE = 300 EXPLORE = 100000 FINAL_EPSILON = 0.0 INITIAL_EPSILON = 0.8 REPLAY_MEMORY = 400 BATCH_SIZE = 256 class BrainDQN: def __init__(self,actions,Sensor): self.replayMemory = deque() #replayMemory是一个双向进出的队列 self.timeStep = 0 self.epsilon = INITIAL_EPSILON self.recording = EXPLORE self.sensor_dim = Sensor self.actions = actions self.hidden1 = 256 self.hidden2 = 256 self.hidden3 = 512 self.createQNetwork() def createQNetwork(self): #以tf的方式搭建graph,然后完成初始化,这里的步骤只是在create,并没有开始跑 W_fc1 = self.weight_variable([self.sensor_dim,self.hidden1]) b_fc1 = self.bias_variable([self.hidden1]) W_fc2 = self.weight_variable([self.hidden1,self.hidden2]) b_fc2 = self.bias_variable([self.hidden2]) W_fc3 = self.weight_variable([self.hidden2,self.hidden3]) b_fc3 = self.bias_variable([self.hidden3]) W_fc4 = self.weight_variable([self.hidden3,self.actions]) b_fc4 = self.bias_variable([self.actions]) #设置Q网络的权重 self.stateInput = tf.placeholder("float",[None,self.sensor_dim]) #DQN的输入值:是这10个感应器的结果 #placeholder和feed_dic是一对相互搭配使用的函数,前者申请了一个必要的空间来放置需要的数据。后者则实时输入数据 h_fc1 = tf.nn.relu(tf.matmul(self.stateInput,W_fc1) + b_fc1) h_fc2 = tf.nn.relu(tf.matmul(h_fc1,W_fc2) + b_fc2) h_fc3 = tf.nn.tanh(tf.matmul(h_fc2,W_fc3) + b_fc3) #前面是在赋初始值,现在便把这个网络的隐藏层搭建好了 self.QValue = tf.matmul(h_fc3,W_fc4) + b_fc4 #现在是确定了输出值,输出值是QValue self.actionInput = tf.placeholder("float",[None,self.actions]) self.yInput = tf.placeholder("float", [None]) #????这里又定义了yInput和actionInput Q_action = tf.reduce_sum(tf.multiply(self.QValue, self.actionInput), reduction_indices = 1) #reduce_sum函数求和,后面的reduction_indices代表了求和的维度 self.cost = tf.reduce_mean(tf.square(self.yInput - Q_action)) #这个self.cost就是我们使用adam算法要优化的东西,这里采用了一个求平均值函数 self.trainStep = tf.train.AdamOptimizer(learning_rate=10**-5).minimize(self.cost) #攒够数据调参的时候,使用的就是这个地方的Adam函数,learning_rate定义的是学习率,后面的minimize是指明了优化的变量是self.cost???那么在这里,调谁的参? self.session = tf.InteractiveSession() self.session.run(tf.global_variables_initializer()) #初始化语句 def trainQNetwork(self): #数据攒够了以后,就开始调用这个部分来训练q网络,没有输入的参数,故直接调用。返回值是self.loss,这里已经开始跑这个程序了 minibatch = random.sample(self.replayMemory,BATCH_SIZE) #随机地从replayMemory中取出BATCH_SIZE的数据,minibatch是一个BATCH_SIZE的数组,这里还没有对replayMemory进行赋值 state_batch = [data[0] for data in minibatch] action_batch = [data[1] for data in minibatch] reward_batch = [data[2] for data in minibatch] nextState_batch = [data[3] for data in minibatch] #猜测:定义了data数组,这个数组的0.1.2.3号元素分别存放了从minibatch中取得的s、a、r、s+1 y_batch = [] QValue_batch = self.QValue.eval(feed_dict={self.stateInput:nextState_batch}) #????这里的.eval是什么意思 for i in range(0,BATCH_SIZE): y_batch.append(reward_batch[i] + GAMMA * np.max(QValue_batch[i])) _, self.loss = self.session.run([self.trainStep,self.cost],feed_dict={ self.yInput : y_batch, self.actionInput : action_batch, self.stateInput : state_batch }) #self.loss = self.session.run([self.trainStep,self.cost],feed_dict={self.yInput : y_batch,self.actionInput : action_batch,self.stateInput : state_batch}) ,要run的是trainStep和self.cost,但输入数据的数组不明白为什么是三组,且使用冒号连接在了一起。 return self.loss #这之前的两个大函数trainqnetwork和creatqnetwork都没有调用后面的函数。 def setPerception(self,nextObservation,action,reward): loss = 0 newState = nextObservation self.replayMemory.append((self.currentState,action,reward,newState)) if len(self.replayMemory) > REPLAY_MEMORY: self.replayMemory.popleft() if self.timeStep > OBSERVE: loss = self.trainQNetwork() #如果时间步大于300,那么训练一次q网络???如何和积攒够400个数据联系在一起 self.currentState = newState self.timeStep += 1 #时间步加1 return loss #loss=self.loss def getAction(self): #Q网络已经生成结果,此时便是要决定是选择Q网络的输出结果还是按照某种概率去探索新的世界:得到当前时刻应该采取的动作 QValue = self.QValue.eval(feed_dict= {self.stateInput:[self.currentState]}) #????这里的.eval是什么意思 action = np.zeros(self.actions) if random.random() <= self.epsilon: action_index = random.randrange(self.actions) #rando.randrange函数返回递增集合中的一个随机数,这里的action_index相当于从action许用的下标中随机选中了一个,然后在下面的语句action[action_index]中调用 action[action_index] = 1 #action集中某个元素被置为1表示选用这个集合中的这个元素作为当前的动作 else: action_index = np.argmax(QValue) #这时选用Q网络输出值中最大值作为当前的动作 action[action_index] = 1 if self.epsilon > FINAL_EPSILON and self.timeStep > OBSERVE: self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON)/EXPLORE #根据步骤来更新epsilon self.recording = self.recording-1 #迭代的次数加一。换言之,每次迭代都需要调用这个getaction函数一次,迭代标志就是在这里改变的。 return action,self.recording #返回action表(本质上就是告知了应该选择哪个动作,然后再参照这个动作和环境进行互动)和剩余迭代次数。 def getAction_test(self,observation): #这里是所有的迭代次数已经完成,把agent投放到了环境当中实打实地去进行工作时的调用 QValue = self.QValue.eval(feed_dict= {self.stateInput:[observation]}) action = np.zeros(self.actions) action_index = np.argmax(QValue) #这里不再采用贪心算法,直接采用q网络输出值中的最好动作即可 action[action_index] = 1 return action def setInitState(self,observation): #把观察到的情况输入到这个函数中作为当前时间的状态值,然后参与到上面replayMemory的工作中去。 self.currentState = observation def weight_variable(self,shape): #和下面的bias_variable一样,都是在createQnetwork时使用的辅助函数,需要调用TF包中的相关函数 initial = tf.truncated_normal(shape) return tf.Variable(initial) def bias_variable(self,shape): initial = tf.constant(0.01, shape = shape) return tf.Variable(initial
import random import numpy as np import math class GameState: #环境生成模拟器 def __init__(self,P_1,P_2,Noise,Sensor): #初始化函数,将需要的增益、噪声、数量等参数输入 self.P_1 = P_1 self.P_2 = P_2 self.length_P_1 = len(self.P_1) self.length_P_2 = len(self.P_2) self.h_11 = 1;self.h_12 = 1 self.h_21 = 1;self.h_22 = 1 self.ita_1 = 1.2 self.ita_2 = 0.7 self.sigma_sq_1 = 0.01 self.sigma_sq_2 = 0.01 self.lam = 0.1 self.alpha = 0.5 #这个alpha是什么? self.num_sensor = Sensor self.noise = Noise self.P,self.sigma = self.dis() #初始化时级调用了self.dis def dis(self): #猜测:这个diss就是代表了discover,就是这个sense node感知到的具体结果,具体结果的表达式就是38行的sigma数组,这个数组中存放了感知的所有结果 d = [[random.uniform(100,300) for _ in range(self.num_sensor)] for _ in range(2)] #d阵代表了距离 P = np.zeros((2,self.num_sensor)) for i in range(0,self.num_sensor): P[0][i] = ((self.lam/(4*math.pi)/d[0][i])**2) #P阵是一个二维数组,第一行存放的是主用户的信道衰落,第二行存放的是次用户的信道衰落 P[1][i] = ((self.lam/(4*math.pi)/d[1][i])**2) sigma = np.zeros((self.num_sensor)) for i in range(0,self.num_sensor): sigma[i] = ( P[0][i]*self.P_1[0]+P[1][i]*self.P_2[0] )/ self.noise return P,sigma # 猜测,感知节点也不知道PUS和SUS使用功率的具体情况,它所能测到的其实就是这个表达是的“值”,只不过我们在仿真的过程中是知道这个值诞生的原因的。 # 换言之,我们在构造DQN时,这种输入的state是可以很模糊的。在这里我们不需要知道PUS和SUs具体的情况,我们只需要知道当前固定频段上的噪声结果(包括了环境噪声和使用这个频段所有用户通信功率造成的噪声)。 # 再进一步考虑,我们可不可以设置这样一种并行的DQN?比如现在我们面临5个频段,在每个时刻,都在每个频段上去训练这个DQN网络,单个的DQN网络输出的结果是在这个频段上最优的许用功率,然后在这个DQN外面 # 再嵌套一个?优先级列表?选择器来决定使用哪个频段。从而把这个问题分解为了2个步骤,第一步先求每个频段的最优功率,第二步再选择最优频段 # 需要解决的问题: # (1)可否参考ADMM算法,分步解决需要解决的问题,细化更多的部分 # (2)分步在每个频段上做DQN时,环境的变化如何处理?换言之,这个MDP过程需要输出一个结果,但这个action结果是需要优先级选择器来决定的 # (3)是否存在分部DQN的东西? def ini(self): self.p_1 = self.P_1[random.randint(0,self.length_P_1-1)] self.p_2 = self.P_2[random.randint(0,self.length_P_2-1)] def ini_test(self): self.p_1_test = self.P_1[random.randint(0,self.length_P_1-1)] self.p_2_test = self.P_2[random.randint(0,self.length_P_2-1)] def frame_step(self, input_actions, policy, i): if i == True: if policy == 1: self.p_1 = self.update_p1_v1(self.p_2) if policy == 2: self.p_1 = self.update_p1_v2(self.p_1,self.p_2) action = np.flatnonzero(input_actions)[0] # Return indices that are non-zero in the flattened version of a. # 返回非零元素的下标。 self.p_2 = self.P_2[action] observation = self.compute_observation(self.p_1,self.p_2) reward = self.compute_reward(self.p_1,self.p_2) terminal = (reward==10) return observation,reward,terminal def frame_step_test(self, input_actions, policy, i): if i == True: if policy == 1: self.p_1_test = self.update_p1_v1(self.p_2_test) if policy == 2: self.p_1_test = self.update_p1_v2(self.p_1_test,self.p_2_test) action = np.flatnonzero(input_actions)[0] self.p_2_test = self.P_2[action] observation = self.compute_observation(self.p_1_test,self.p_2_test) reward = self.compute_reward(self.p_1_test,self.p_2_test) terminal = (reward==10) # 当reward==10时,作为terminal的标志。 return observation,reward,terminal def compute_observation(self,x,y): # 感知器观察到的环境状态,存储在observation数组中,输入是PU和SU正在使用的功率 observation = np.zeros((self.num_sensor)) for i in range(0,self.num_sensor): observation[i] = self.P[0][i] * x + self.P[1][i] * y + random.gauss(0,self.sigma[i]) #Pnrk表示目前的状态空间 if observation[i]<0: observation[i] =0 observation[i] = observation[i]*(10**7)#乘了10的七次方 return observation def compute_reward(self,x,y): success_1,success_2 = self.compute_SINR(x,y) reward = self.alpha * success_1 + (1-self.alpha) * success_2 if reward == 0.5: reward = 0 if reward == 1: reward = 10 return reward def update_p1_v1(self,y): #主用户的第一种更新方式 p_1_n = self.ita_1/((abs(self.h_11)**2)/((abs(self.h_21)**2)*y + self.sigma_sq_1)) v = [] for ind in range(self.length_P_1): v.append(max(p_1_n-self.P_1[ind],0)) p_1_new = self.P_1[v.index(min(v))] return p_1_new def update_p1_v2(self,x,y): #主用户的第二种更新方式 ind_p_1 = self.P_1.index(x) tSINR_1 = ((abs(self.h_11)**2)*x/((abs(self.h_21)**2)*y + self.sigma_sq_1)) tao = x * self.ita_1 / tSINR_1 if tao>=x and ind_p_1+1<=self.length_P_1-1 and tao<=self.P_1[ind_p_1+1] : x = self.P_1[ind_p_1+1] elif ind_p_1-1>=0 and tao<=self.P_1[ind_p_1-1] : x = self.P_1[ind_p_1-1] return x def compute_SINR(self,x,y): success_1 = ( (abs(self.h_11)**2)*x/((abs(self.h_21)**2)*y + self.sigma_sq_1)) >= self.ita_1 success_2 = ( (abs(self.h_22)**2)*y/((abs(self.h_12)**2)*x + self.sigma_sq_2)) >= self.ita_2 return success_1,success_2
from power_control import GameState from DQN import BrainDQN import numpy as np import matplotlib.pyplot as plt P_1 = [round(0.1*i/2.0,2) for i in range(1,9)] P_2 = [round(0.1*i/2.0,2) for i in range(1,9)] #round返回浮点数的四舍五入值 actions = len(P_2) #返回数组P_2的长度,这里的长度是9 Loss = [] Success = [] Fre = [] noise = 3 num_sensor = 10 # N policy = 2 # choose power change policy for PU, it should be 1(Multi-step) or 2(Single step) brain = BrainDQN(actions,num_sensor) #DQN模拟生成器,需要告知动作空间的个数、感知器的数量 com = GameState(P_1,P_2,noise,num_sensor)#一个环境模拟生成器,告知PU和SU的功率表,噪声,感知器的数量。 terminal =True recording = 100000 #迭代次数标记,这里需要迭代100000次,然后使用下面的100000-recording来表示当前正在迭代的次数 while(recording>0): #每次迭代都需要执行这个函数段 # initialization if(terminal ==True): com.ini() #环境模拟生成器的初始化 observation0, reward0, terminal = com.frame_step(np.zeros(actions),policy,False) brain.setInitState(observation0) # train train和test是分开的,train完以后,Q网络就训练完了,这时候可以直接投放到环境中去运作 action,recording = brain.getAction() #每次迭代都按照DQN包中的函数来得到本次迭代选择的action(返回值是一个数组,这个数组里面被赋值为1的action便是本次迭代选择的动作) nextObservation,reward,terminal = com.frame_step(action,policy,True) #取得了action列表,我们就要按照和这个action和环境互动了 loss = brain.setPerception(nextObservation,action,reward) #得到本次的loss值 # test 只有在500的整数倍时,才test一次。????那么攒够一定的次数才调参是怎样呢? if (recording+1)%500==0: Loss.append(loss) #append把元素添加到列表末尾 print("iteration : %d , loss : %f ." %(100000-recording, loss)) success = 0.0 fre = 0 num = 1000.0 for ind in range(1000): #每次test阶段,都使用1000个时间点,来得到把agent放到环境里时的实际情况,这里的run和最终的test不一样,这里只是验证当前训练步骤下能不能收敛(通过成功率、平均步数来衡量成功率) T = 0 com.ini_test() observation0_test, reward_test, terminal_test = com.frame_step_test(np.zeros(actions),policy,False) while (terminal_test !=True) and T<20: action_test = brain.getAction_test(observation0_test) observation0_test,reward_test,terminal_test = com.frame_step_test(action_test,policy,True) T +=1 if terminal_test==True: success +=1 fre +=T if success == 0: fre = 0 else: fre = fre/success success = success/num Success.append(success) Fre.append(fre) print("success : %f , step : %f ." %(success , fre)) plt.plot(Loss) #最后一次test时生成Loss的结果生成图片 plt.show() plt.plot(Success) plt.show() plt.plot(Fre) plt.show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。