当前位置:   article > 正文

基于深度强化学习的DQN模型实现自动玩俄罗斯方块游戏(附详细代码讲解)_俄罗斯方块 强化学习

俄罗斯方块 强化学习

一、DQN(Deep Q-Network)方法概述

        DQN(Deep Q-Network)是一种强化学习方法,通过结合Q-learning算法和深度神经网络来解决强化学习问题。它是深度强化学习的里程碑之一,由DeepMind在2013年提出,被广泛应用于各种复杂的强化学习任务。DQN方法的概述如下:

1.强化学习问题:在强化学习中,智能体与环境进行交互,通过观察环境的状态并采取动作,来最大化累积奖励。智能体在环境中移动并与之交互,不断学习并优化策略,以在不同的状态下选择最优动作。

2.Q-learning算法:Q-learning是一种经典的强化学习算法,用于学习最优Q函数。Q函数表示在给定状态下采取某个动作的预期累积奖励值。Q-learning使用迭代更新的方式逼近最优Q函数,其核心思想是使用贝尔曼方程来更新Q值。贝尔曼方程表示当前状态下采取动作的Q值可以通过下一个状态的最大Q值和立即奖励来进行递归更新。

3.DQN的创新点:DQN的创新之处在于使用深度神经网络来逼近Q函数。传统的Q-learning方法使用表格存储Q值,但在大型状态空间问题中,表格变得不可行。DQN通过使用神经网络来表示Q函数,将状态作为输入,输出对应于每个动作的Q值,从而可以对大型状态空间进行近似求解。

4.Experience Replay(经验回放):DQN使用经验回放技术来存储智能体的经验,包括状态、动作、奖励和下一个状态。在训练过程中,DQN从经验回放缓冲区中随机抽样,以打破数据之间的关联性,从而更有效地使用经验数据进行训练。

5.Target Network(目标网络):为了稳定训练过程,DQN引入了目标网络。在训练过程中,有两个神经网络:一个是用于选择动作的主网络,另一个是用于计算目标Q值的目标网络。目标网络的参数比主网络的参数更新更慢,这有助于减少训练中的目标Q值估计的波动性。

6.Double Q-learning:DQN还采用了Double Q-learning的思想,用于更准确地估计Q值。在目标网络和主网络中分别选择最大动作,并结合它们的Q值来更新目标Q值。

        DQN方法的训练过程是迭代的,通过反复与环境交互、更新神经网络权重和优化策略,使得智能体逐渐学习到最优的Q函数,并从中得到最佳决策策略。DQN在很多复杂的强化学习任务中取得了显著的成功,并为后续深度强化学习算法的发展奠定了基础。

二、强化学习代码实现及训练过程

        本文将实现一个使用DQN算法和深度神经网络的强化学习代理,通过逼近Q函数来优化决策策略。代理以单一分数来表示每个状态的预期得分,并使用神经网络训练来逼近这些Q值。在训练过程中,代理通过经验回放和目标网络来稳定训练,并通过探索与利用策略优化决策能力。

        首先,我们基于DQN实现一个DQNAgent类,该类实现了一个基于深度强化学习的代理,使用DQN(Deep Q-Network)算法来解决强化学习问题。代码逻辑功能概述如下:

1.这个代理使用DQN算法来学习最优的决策策略。
2.代理的目标是找到所有可能状态的最佳最终状态的组合,而不是传统方法中找到特定状态的最佳动作。
3.通过使用深度神经网络来逼近Q函数,代理可以处理大型状态空间的问题。
4.代码中使用经验回放技术和目标网络来优化训练过程,提高稳定性和效率。

实现的代码如下:

  1. class DQNAgent:
  2. def __init__(self, state_size, mem_size=10000, discount=0.95, epsilon=1, epsilon_min=0, epsilon_stop_episode=500,
  3. n_neurons=[32, 32], activations=('relu', 'relu', 'linear'), loss='mse', optimizer='adam',
  4. replay_start_size=None):
  5. # 初始化DQNAgent代理
  6. assert len(activations) == len(n_neurons) + 1
  7. self.state_size = state_size
  8. self.memory = deque(maxlen=mem_size)
  9. self.discount = discount
  10. self.epsilon = epsilon
  11. self.epsilon_min = epsilon_min
  12. self.epsilon_decay = (self.epsilon - self.epsilon_min) / epsilon_stop_episode
  13. self.n_neurons = n_neurons
  14. self.activations = activations
  15. self.loss = loss
  16. self.optimizer = optimizer
  17. if not replay_start_size:
  18. replay_start_size = mem_size / 2
  19. self.replay_start_size = replay_start_size
  20. self.model = self.build_model()
  21. # 创建一个深度神经网络模型
  22. def build_model(self) -> Model:
  23. model = Sequential()
  24. model.add(Dense(self.n_neurons[0], input_dim=self.state_size, activation=self.activations[0]))
  25. for i in range(1, len(self.n_neurons)):
  26. model.add(Dense(self.n_neurons[i], activation=self.activations[i]))
  27. model.add(Dense(1, activation=self.activations[-1]))
  28. model.compile(loss=self.loss, optimizer=self.optimizer)
  29. return model
  30. # 将动作过程添加到经验回放缓冲区中
  31. def add_to_memory(self, current_state, next_state, reward, done):
  32. self.memory.append((current_state, next_state, reward, done))
  33. # 为某个动作分配一个随机得分
  34. def random_value(self):
  35. return random.random()
  36. # 预测给定状态的得分
  37. def predict_value(self, state: np.ndarray) -> float:
  38. return self.model.predict(state)[0]
  39. # 返回给定状态的预期得分
  40. def act(self, state):
  41. state = np.reshape(state, [1, self.state_size])
  42. if random.random() <= self.epsilon:
  43. return self.random_value()
  44. else:
  45. return self.predict_value(state)
  46. # 返回给定状态集合中的最佳状态
  47. def best_state(self, states):
  48. max_value = None
  49. best_state = None
  50. if random.random() <= self.epsilon:
  51. return random.choice(list(states))
  52. else:
  53. for state in states:
  54. value = self.predict_value(np.reshape(state, [1, self.state_size]))
  55. if not max_value or value > max_value:
  56. max_value = value
  57. best_state = state
  58. return best_state
  59. # 训练神经网络模型
  60. def train(self, batch_size=32, epochs=3):
  61. n = len(self.memory)
  62. if n >= self.replay_start_size and n >= batch_size:
  63. batch = random.sample(self.memory, batch_size)
  64. # 获取下一个状态的预期得分
  65. next_states = np.array([x[1] for x in batch])
  66. next_qs = [x[0] for x in self.model.predict(next_states)]
  67. x = []
  68. y = []
  69. # 构建训练数据的输入输出结构
  70. for i, (state, _, reward, done) in enumerate(batch):
  71. if not done:
  72. new_q = reward + self.discount * next_qs[i] # 更新预期得分(Q值)
  73. else:
  74. new_q = reward
  75. x.append(state)
  76. y.append(new_q)
  77. # 使用训练数据拟合模型
  78. self.model.fit(np.array(x), np.array(y), batch_size=batch_size, epochs=epochs, verbose=0)
  79. # 更新探索变量
  80. if self.epsilon > self.epsilon_min:
  81. self.epsilon -= self.epsilon_decay

代码中DQNAgent类:

state_size:输入域(状态空间)的大小。
mem_size:回放缓冲区的大小。
discount:未来奖励相对于即时奖励的重要性(折扣因子)[0,1]。
epsilon:开始时的探索概率(以给定概率执行随机动作)。
epsilon_min:当代理停止减少探索概率时的最小值。
epsilon_stop_episode:代理停止减少探索变量的回合数。
n_neurons:每个隐藏层中神经元的数量的列表。
activations:每个隐藏层和输出层中使用的激活函数的列表。
loss:用于训练神经网络的损失函数。
optimizer:用于训练神经网络的优化器。
build_model():方法用于创建和编译基于指定结构(神经元数量和激活函数)的Keras神经网络模型。
add_to_memory():方法用于将一个经验元组(当前状态、下一个状态、奖励、完成标志)添加到回放缓冲器中。
random_value():方法返回一个随机值(用于在探索时为某个动作分配随机分数)。
predict_value():方法接收一个状态作为输入,并预测该状态的预期分数,使用训练好的神经网络。
act():方法根据当前状态选择一个动作(值)。它根据探索概率(epsilon)决定是进行探索(随机动作)还是利用(预测动作)。
best_state():方法接收一个状态集合,并根据预测的最高分数返回最佳状态。它可以根据一定概率(epsilon)进行探索或利用最佳预测状态。
train():方法用于训练神经网络,使用从回放缓冲器中抽样的经验。它使用Q-learning的更新规则来调整神经网络权重,以更好地逼近Q函数。

        将写好的俄罗斯方块游戏和DQNAgent类的强化学习策略进行结合,通过训练和评估在Tetris游戏中的性能,以寻找最佳策略来玩这个游戏。实现的过程如下:

        1.定义了run_model函数,用于训练和评估DQN代理的性能。在每个回合(episode)中,代理在Tetris游戏环境中执行动作,并收集游戏得分。代码实现如下:

  1. def run_model(dir_name, episodes=100, render=False):
  2. env = Tetris()
  3. epsilon_stop_episode = 1500
  4. mem_size = 20000
  5. discount = 0.95
  6. replay_start_size = 2000
  7. n_neurons = [32, 32]
  8. activations = ['relu', 'relu', 'linear']
  9. agent = DQNAgent(env.get_state_size(), n_neurons=n_neurons, activations=activations,
  10. epsilon_stop_episode=epsilon_stop_episode, mem_size=mem_size, discount=discount,
  11. replay_start_size=replay_start_size)
  12. model_path = '../checkpoints/' + dir_name + '/model.hdf'
  13. agent.model = load_model(model_path)
  14. agent.epsilon = 0
  15. scores = []
  16. for episode in range(episodes):
  17. env.reset()
  18. game_over = False
  19. while not game_over:
  20. next_states = env.get_next_states()
  21. best_state = agent.best_state(next_states.values())
  22. # find the action, that corresponds to the best state
  23. best_action = None
  24. for action, state in next_states.items():
  25. if state == best_state:
  26. best_action = action
  27. break
  28. _, game_over = env.hard_drop([best_action[0], 0], best_action[1], render)
  29. scores.append(env.score)
  30. print(f'episode {episode} => {env.score}')
  31. return scores

        2.定义run_model_helper函数,用于运行多个训练过程并评估它们的性能。该函数加载预先训练好的模型,并在每个目录下执行run_model函数,输出训练得分的最大值和对应目录的名称。代码如下:

  1. def run_model_helper(episodes=128, render=False):
  2. dirs = [name for name in os.listdir('../checkpoints') if os.path.isdir(os.path.join('../checkpoints', name))]
  3. dirs.sort(reverse=True)
  4. dirs = [dirs[0]]
  5. max_scores = []
  6. for directory in dirs:
  7. print(f"Evaluating dir '{directory}'")
  8. scores = run_model(directory, episodes, render)
  9. max_scores.append((directory, max(scores)))
  10. max_scores.sort(key=lambda t: t[1], reverse=True)
  11. for k, v in max_scores:
  12. print(f"{v}\t{k}")

         训练过程如下:

 三、实现自动玩俄罗斯方块游戏

        首先实现一个简单的俄罗斯方块基本游戏,实现的逻辑过程如下:

  1. 定义游戏地图和方块的常量:在游戏中,定义了游戏地图的大小、方块的大小、颜色等常量,并存储在相应的类属性中。

  2. 初始化游戏状态:在Tetris类的构造函数中,初始化了游戏的各种状态,包括当前方块位置、旋转角度、游戏得分等,以及游戏地图和下一个方块的预览板状态。

  3. 重置游戏状态:通过reset方法,重置游戏状态,重新开始一局游戏。重置后,将清空游戏地图,生成新的随机方块,并更新下一个方块的预览板。

  4. 方块的旋转与移动:游戏中的方块可以通过W键进行顺时针旋转,A键向右移动一列,S键向下移动一行,D键向左移动一列。这些操作在游戏中通过调整当前方块的位置和旋转角度来实现。

  5. 方块的硬降:在游戏中,通过按下空格键,可以将当前方块快速降落到底部。为了实现这个功能,游戏会不断检测方块是否可以继续向下移动,直到无法移动为止。

  6. 方块的落地和消除:当方块无法再继续向下移动时,将方块固定在游戏地图上,并判断是否有可消除的行。如果有,则消除行并增加玩家得分。

  7. 下一个方块的预览:游戏界面上会显示下一个方块的预览板,让玩家提前了解下一个方块的形状。

  8. 游戏结束判断:游戏判断是否结束的条件是当前方块在初始位置无法继续向下移动。

  9. 获取游戏状态特征:游戏通过一系列特征函数来提取当前游戏状态的特征,例如已消除行数、空洞数量、高度差等,用于在强化学习中作为输入来训练智能体。

  10. 图形渲染:游戏界面使用cv2库进行图形渲染,将游戏状态以图形化形式显示在屏幕上。

该游戏可以实现自己玩,控制方法为:

        W - 将方块顺时针旋转90度
        A - 将方块向右移动一列
        S - 将方块向下移动一行
        D - 将方块向左移动一列
        空格键 - 快速落下方块
        ESC - 退出游戏

自己玩游戏的过程如下:

AI方法玩游戏的过程如下:(非常快) 

 

全部代码链接:

https://download.csdn.net/download/weixin_40651515/88114773

运行配置环境:tensorflow==1.14.0 tensorboard==1.14.0 keras==2.2.4 opencv==4.7.0.72 numpy==1.21.6 pillow==5.4.1 tqdm==4.31.1等

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/427255
推荐阅读
相关标签
  

闽ICP备14008679号