赞
踩
gym是许多强化学习框架都支持了一种常见RL环境规范,实现简单,需要重写的api很少也比较通用。
本文旨在给出一个简单的基于gym的自定义单智能体强化学习环境demo
首先给出一个gym环境类的基本框架,继承gym.Env类,所列出来的方法均为必须实现的方法。我们一个一个方法简单介绍,
import logging import gym import numpy as np from gym.utils import seeding import math logger = logging.getLogger(__name__) class TestEnv(gym.Env): def __init__(self): self.state = 0 def reset(self,seed, options,infos=False): self.state =0 def step(self, action): return state, reward, Done, Truncate,info def render(self): visual_demo()
“init”方法是实例化对象时会自动执行的代码,在这里面可以定义好RL的观察空间(如果和状态空间不同的话可以加上状态空间)、动作空间、以及折扣因子等等参数,必须重写。比如:
def __init__(self):
# 基本设置
self.observation_space = gym.spaces.Box(low=0, high=1, shape=(2, 2), dtype=np.float32) # 连续状态空间 2X2
self.action_space = gym.spaces.Discrete(2) # 离散一维动作空间 0,1
# self.action_space = gym.spaces.MultiDiscrete([ 5, 2, 2 ]) # 多维离散动作空间, 第一维有5个动作、第二维有2个动作..
self.state = None # state for rl
self.gamma = 1 # 折扣因子
# self.setting = Setting() # 实例化RL问题对象,可以将具体场景的RL问题相关设置打包放在Setting中,gym环境类只从里面取出状态、输入动作获得奖励。这样的话场景设置与rl设置可以比较清楚地分开。(仅个人编程习惯)
gym.spaces类给出了很多空间可供选择,这里动作状态空间实现上并无区别,要在初始化方法里明显指定这两个空间。
reset()方法也是必须重写的方法,它在每次RL一次任务结束时,会自动执行,将状态等相关设置再次初始化(因为init只会在实例化对象时执行一次,之后初始化就要在reset了),用于下次任务交互。
def reset(self,seed, options,infos=False):
self.state = [[1,1],[2,2]] # 重新初始化状态
self.reward = 0 # 重置奖励值
新的gym标准中reset()方法加入了一些新的参数,输入随机种子,options和infos一般用不到可以直接不写,自动会用默认值。
RL通过状态输出一个动作,这个动作就会自动输入到step()方法中,用以计算reward和下一步的观察状态,并判断任务是否结束。
def step(self, action): # 必须实现的函数,输入动作,输出下一步的状态、奖励、结束与否、其他可选info
tmp_reward = reward_func(action) # 根据环境自定义的reward值
self.reward += tmp_reward
return self.state, self.reward, done, truncated, info # done=True表示任务结束,truncated=True表示截断任务
这个方法自然也是必须重写的。
这个方法就不是必须的了,如果不重写的话,任务过程就不会进行可视化。如果有需要观测的内容,可以写在这,训练过程中每步动作都会自动执行render()方法。比如如果希望每次都能打印出当前的状态信息的话,可以写进去
def render(self):
print(self.state)
写好了自定义的RL环境后,还需要注册到安装好的gym库中,不然导入的时候是没有办法成功的。
首先需要在gym/envs文件目录下创建一个文件夹用于保存你的RL环境(./gymnasium/envs/MyEnv/mydemo.py),一般路径为./anaconda3/envs/RL/lib/python3.8/site-packages/gymnasium/envs。
在保存RL环境的文件夹下创建“__init__.py”文件用于链接python模块,./MyEnv/__init__.py,并在其中导入所写的RL环境类
from gymnasium.envs.MyEnv.mydemo import TestEnv
最后,在“gym/envs/__init__.py”文件中注册自己的RL环境,在里面加上:
register(
id='TestEnv-v0',
entry_point='gymnasium.envs.MyEnv:TestEnv',
max_episode_steps=1000,
)
其中id是自定义RL环境的名称,可以按照自己习惯来取,在创建环境时用的就是这个。entry_point是自定义环境类的模块路径,这个一定不能出错(包括保存RL环境的文件夹名和自定义RL环境类名)
只要注册时候注意不要漏掉步骤或者文件名打错,使用的时候就不会报错了,直接使用如下代码即可实例化一个自定义的RL环境了:
import gym
env = gym.make("TestEnv-v0") # 注意,这里用的是你注册环境时给的id名称
env.reset() # 可以使用它的方法来测试一下效果
env.step(action) # 可以自己输几个动作试试看状态和奖励会怎么样
这个给出完整的基于gym的自定义RL环境类的示例,一个简单的资源分配场景。
import logging import gym import numpy as np import math logger = logging.getLogger(__name__) class Setting(): def __init__(self): self.BaseStationLocation = [0,0] self.UserLocation = [[1,1],[2,2]] self.RbInfo = [0.01,0.02] # 按需自定义设置rb的相关信息 self.RbAllocation = [0,0] #记录rb分配状态,可以用分配完了当作任务结束的标志 self.bandwidth = 1 # 带宽 self.p_t = 1 # 发射功率 self.alpha = 2 # 路损因子 self.sigma = 0.00001 # 噪声的单边功率谱密度 self.reward = 0 def random_move(self): random = [[0.1,0.1],[-0.1,-0.1]] # 每次分配用户随机移动到新位置,具体用random包可以实现 self.UserLocation += random def reset(self): self.random_move() # 每次分配完成了,要重新开始时,在这里重置初始状态即可 def data_rate(self,rb_index,user_index): # 输入分配的资源块的索引、用户索引,计算返回速率 if self.RbAllocation[rb_index] == 1: return 0 # 当前资源块已经分过了,得不到更多的速率奖励了 else: self.RbAllocation[rb_index] = user_index + 1 # 表示rb_inex资源块分配给了用户user_index power = self.p_t /np.power(np.linalg.norm(self.BaseStationLocation-self.UserLocation[user_index]),self.alpha) rate = self.bandwidth*math.log(1+power/(self.sigma^2+self.RbInfo[rb_index])) return rate class MyEnv(gym.Env): def __init__(self): # 基本设置 self.observation_space = gym.spaces.Box(low=0, high=1, shape=(2, 2), dtype=np.float32) # 状态空间,只考虑用户位置的话,设成2*2 self.action_space = gym.spaces.Discrete(2) # 动作空间,这个是一维离散动作空间,即一次决定一个用户的分配0,1。 # self.action_space = gym.spaces.Box(low=0,high=1,shape=2,dtype=np.int8) 如果要一次决定多个分配,就用离散向量表示动作 self.gamma = 1 # 折扣因子 ,资源分配里不区分先后分配的重要性区别,设为1就行。直接不设也可以。 self.setting = Setting() # 实例化资源分配对象 def seed(self, seed=None): # 这个似乎gym更新后改了,这里不用管 self.np_random, seed = seeding.np_random(seed) return [seed] def step(self, action): # 必须实现的函数,输入动作,输出下一步的状态、奖励、结束与否、其他可选info # 根据输入action,映射成资源分配,这里可以假设动作就是分配当前资源块与否 # 然后计算指标与奖励, # 最后判断是否任务结束 rb_index = self.setting.RbAllocation.index(0) # 找到第一个未分配的资源块,进行分配 rate = self.setting.data_rate(rb_index,action) # action就是把当前资源块分配给哪个用户 self.reward += rate return self.state, self.reward, done, truncated, info if 0 not in self.setting.RbAllocation else self.state, reward, done, truncated, info # 依次返回下一时刻的状态,当前的奖励(如果只有分配完才有奖励的话,当前奖励就是0),任务是否结束(True or False),info(可选的返回信息) # self.state如果每次动作会改变状态的话,也在step方法里去改变 # 返回值的格式时由gym规范的,如果gym版本更新有改变这个规范的话,也要相应调整。 def reset(self,seed=None,options=None,infos=False): # 环境状态的初始化,在创建环境时会自动执行该函数,输出初始化的state,每一次任务结束,重新开始一次任务时也会执行 self.setting.reset() self.state = self.setting.UserLocation self.reward = 0 return self.state def render(self): # 这是一个可视化函数,如果不重写的话,训练过程就不会进行可视化。可以根据需要去设计 print(self.setting.RbAllocation) # 比如可以打印每一步step的分配情况
其他的场景根据需求去修改RL场景设置类Setting()以及RL参数与状态转移、奖励计算等细节即可。
本文旨在从头至尾的完成基于gym的RL环境自定义,主要分为实现gym标准的几个方法,注册自定义环境,及使用三部分。尽可能简化了所有的代码内容,只作为demo理解每个方法需要实现哪些功能,而不涉及具体如果和完成某个任务。不过只要理解了本文的内容,写出用于自己建模问题的RL环境就轻而易举了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。