当前位置:   article > 正文

【强化学习】demo系列——基于gym自定义RL环境_使用gym搭建自己的强化学习环境

使用gym搭建自己的强化学习环境


前言

gym是许多强化学习框架都支持了一种常见RL环境规范,实现简单,需要重写的api很少也比较通用。

本文旨在给出一个简单的基于gym的自定义单智能体强化学习环境demo


一、Gym环境类

1.1 gym.Env类框架

首先给出一个gym环境类的基本框架,继承gym.Env类,所列出来的方法均为必须实现的方法。我们一个一个方法简单介绍,

import logging
import gym
import numpy as np
from gym.utils import seeding
import math

logger = logging.getLogger(__name__)

class TestEnv(gym.Env):

    def __init__(self):
        self.state = 0
    def reset(self,seed, options,infos=False):
    	self.state =0
    def step(self, action):   
        return state, reward, Done, Truncate,info
	def render(self):
		visual_demo()
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

1.2 初始化方法"__init__"

init”方法是实例化对象时会自动执行的代码,在这里面可以定义好RL的观察空间(如果和状态空间不同的话可以加上状态空间)、动作空间、以及折扣因子等等参数,必须重写。比如:

 def __init__(self):
        # 基本设置
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(2, 2), dtype=np.float32)  # 连续状态空间 2X2
        self.action_space = gym.spaces.Discrete(2)    # 离散一维动作空间 0,1
        # self.action_space = gym.spaces.MultiDiscrete([ 5, 2, 2 ]) # 多维离散动作空间, 第一维有5个动作、第二维有2个动作..
        self.state = None # state for rl
        self.gamma = 1  # 折扣因子 
        # self.setting = Setting() # 实例化RL问题对象,可以将具体场景的RL问题相关设置打包放在Setting中,gym环境类只从里面取出状态、输入动作获得奖励。这样的话场景设置与rl设置可以比较清楚地分开。(仅个人编程习惯)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

gym.spaces类给出了很多空间可供选择,这里动作状态空间实现上并无区别,要在初始化方法里明显指定这两个空间。

1.3 重置方法reset()

reset()方法也是必须重写的方法,它在每次RL一次任务结束时,会自动执行,将状态等相关设置再次初始化(因为init只会在实例化对象时执行一次,之后初始化就要在reset了),用于下次任务交互。

  def reset(self,seed, options,infos=False):
        self.state = [[1,1],[2,2]] # 重新初始化状态 
        self.reward = 0  # 重置奖励值
  
  • 1
  • 2
  • 3
  • 4

新的gym标准中reset()方法加入了一些新的参数,输入随机种子,options和infos一般用不到可以直接不写,自动会用默认值。

1.4 每步迭代方法step()

RL通过状态输出一个动作,这个动作就会自动输入到step()方法中,用以计算reward和下一步的观察状态,并判断任务是否结束。

    def step(self, action):    # 必须实现的函数,输入动作,输出下一步的状态、奖励、结束与否、其他可选info
        tmp_reward = reward_func(action) # 根据环境自定义的reward值
        self.reward += tmp_reward
        return self.state, self.reward, done,  truncated, info # done=True表示任务结束,truncated=True表示截断任务
  • 1
  • 2
  • 3
  • 4

这个方法自然也是必须重写的。

1.5 可视化任务过程方法render()

这个方法就不是必须的了,如果不重写的话,任务过程就不会进行可视化。如果有需要观测的内容,可以写在这,训练过程中每步动作都会自动执行render()方法。比如如果希望每次都能打印出当前的状态信息的话,可以写进去

def render(self):
		print(self.state)
  • 1
  • 2

二、注册自定义环境到gym库中

写好了自定义的RL环境后,还需要注册到安装好的gym库中,不然导入的时候是没有办法成功的。

首先需要在gym/envs文件目录下创建一个文件夹用于保存你的RL环境(./gymnasium/envs/MyEnv/mydemo.py),一般路径为./anaconda3/envs/RL/lib/python3.8/site-packages/gymnasium/envs。

在保存RL环境的文件夹下创建“__init__.py”文件用于链接python模块,./MyEnv/__init__.py,并在其中导入所写的RL环境类

from gymnasium.envs.MyEnv.mydemo import TestEnv
  • 1

最后,在“gym/envs/__init__.py”文件中注册自己的RL环境,在里面加上:

register(
    id='TestEnv-v0',
    entry_point='gymnasium.envs.MyEnv:TestEnv',
    max_episode_steps=1000,
)
  • 1
  • 2
  • 3
  • 4
  • 5

其中id是自定义RL环境的名称,可以按照自己习惯来取,在创建环境时用的就是这个。entry_point是自定义环境类的模块路径,这个一定不能出错(包括保存RL环境的文件夹名和自定义RL环境类名)

三、使用自定义RL环境

只要注册时候注意不要漏掉步骤或者文件名打错,使用的时候就不会报错了,直接使用如下代码即可实例化一个自定义的RL环境了:

import gym
env = gym.make("TestEnv-v0")  # 注意,这里用的是你注册环境时给的id名称
env.reset() # 可以使用它的方法来测试一下效果
env.step(action) # 可以自己输几个动作试试看状态和奖励会怎么样
  • 1
  • 2
  • 3
  • 4

四、代码示例

这个给出完整的基于gym的自定义RL环境类的示例,一个简单的资源分配场景。

import logging
import gym
import numpy as np
import math

logger = logging.getLogger(__name__)

class Setting():
        def __init__(self):
            self.BaseStationLocation = [0,0]
            self.UserLocation = [[1,1],[2,2]]
            self.RbInfo = [0.01,0.02]   # 按需自定义设置rb的相关信息
            self.RbAllocation = [0,0] #记录rb分配状态,可以用分配完了当作任务结束的标志
            self.bandwidth = 1 # 带宽
            self.p_t = 1  # 发射功率
            self.alpha = 2 # 路损因子
            self.sigma = 0.00001 # 噪声的单边功率谱密度
            self.reward = 0
        def random_move(self):
            random = [[0.1,0.1],[-0.1,-0.1]] # 每次分配用户随机移动到新位置,具体用random包可以实现
            self.UserLocation += random

        def reset(self):
            self.random_move() # 每次分配完成了,要重新开始时,在这里重置初始状态即可
        def data_rate(self,rb_index,user_index):  # 输入分配的资源块的索引、用户索引,计算返回速率
            if self.RbAllocation[rb_index] == 1:
                return 0  # 当前资源块已经分过了,得不到更多的速率奖励了
            else:
                self.RbAllocation[rb_index] = user_index + 1 # 表示rb_inex资源块分配给了用户user_index
            power = self.p_t /np.power(np.linalg.norm(self.BaseStationLocation-self.UserLocation[user_index]),self.alpha)
            rate = self.bandwidth*math.log(1+power/(self.sigma^2+self.RbInfo[rb_index]))
            return rate

class MyEnv(gym.Env):

    def __init__(self):
        # 基本设置
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(2, 2), dtype=np.float32)  # 状态空间,只考虑用户位置的话,设成2*2
        self.action_space = gym.spaces.Discrete(2)    # 动作空间,这个是一维离散动作空间,即一次决定一个用户的分配0,1。
        # self.action_space = gym.spaces.Box(low=0,high=1,shape=2,dtype=np.int8) 如果要一次决定多个分配,就用离散向量表示动作
        self.gamma = 1  # 折扣因子 ,资源分配里不区分先后分配的重要性区别,设为1就行。直接不设也可以。
        self.setting = Setting() # 实例化资源分配对象

   
    def seed(self, seed=None):  # 这个似乎gym更新后改了,这里不用管
        self.np_random, seed = seeding.np_random(seed)
        return [seed]


    def step(self, action):    # 必须实现的函数,输入动作,输出下一步的状态、奖励、结束与否、其他可选info
        # 根据输入action,映射成资源分配,这里可以假设动作就是分配当前资源块与否
        # 然后计算指标与奖励,
        # 最后判断是否任务结束
        rb_index = self.setting.RbAllocation.index(0) # 找到第一个未分配的资源块,进行分配
        rate = self.setting.data_rate(rb_index,action) # action就是把当前资源块分配给哪个用户
        self.reward += rate
        return self.state, self.reward, done, truncated, info if 0 not in self.setting.RbAllocation else self.state, reward, done, truncated, info
      # 依次返回下一时刻的状态,当前的奖励(如果只有分配完才有奖励的话,当前奖励就是0),任务是否结束(True or False),info(可选的返回信息)
      # self.state如果每次动作会改变状态的话,也在step方法里去改变
      # 返回值的格式时由gym规范的,如果gym版本更新有改变这个规范的话,也要相应调整。


    def reset(self,seed=None,options=None,infos=False):
        # 环境状态的初始化,在创建环境时会自动执行该函数,输出初始化的state,每一次任务结束,重新开始一次任务时也会执行
        self.setting.reset()  
        self.state = self.setting.UserLocation
        self.reward = 0
        return self.state

    def render(self): # 这是一个可视化函数,如果不重写的话,训练过程就不会进行可视化。可以根据需要去设计
        print(self.setting.RbAllocation)  # 比如可以打印每一步step的分配情况


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

其他的场景根据需求去修改RL场景设置类Setting()以及RL参数与状态转移、奖励计算等细节即可。


总结

本文旨在从头至尾的完成基于gym的RL环境自定义,主要分为实现gym标准的几个方法,注册自定义环境,及使用三部分。尽可能简化了所有的代码内容,只作为demo理解每个方法需要实现哪些功能,而不涉及具体如果和完成某个任务。不过只要理解了本文的内容,写出用于自己建模问题的RL环境就轻而易举了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/722645
推荐阅读
相关标签
  

闽ICP备14008679号