当前位置:   article > 正文

【机器学习】应用深度Q网络(DQN)在Atari Breakout游戏中实现智能体

【机器学习】应用深度Q网络(DQN)在Atari Breakout游戏中实现智能体

1. 绪论

1.1 DQN是什么?

Deep Q-Learning,也被称为Deep Q-Network(DQN),是一种结合了深度学习和Q-Learning的强化学习算法。以下是关于Deep Q-Learning的详细解释:

  1. 背景介绍
    - 强化学习是一种机器学习方法,使智能体能够通过与环境互动来学习最佳行为。智能体在环境中执行动作,并接收奖励或惩罚作为反馈。
    - Q-Learning是一种基于值函数的强化学习算法,它通过学习一个状态-动作值函数(Q函数)来评估每个状态下采取每个动作的预期回报。

  2. Deep Q-Learning的核心思想
    - 使用深度神经网络(通常是卷积神经网络或多层感知机)作为函数逼近器,来估计动作-值函数(即Q函数)。
    - 网络的输入是环境的状态,输出是每个可能动作的预期奖励。

  3. 算法流程
    - 初始化Q网络和目标Q网络。
    - 智能体在环境中执行动作,并记录经验(包括状态、动作、奖励和下一个状态)。
    - 将这些经验存储起来,并在训练过程中使用经验回放机制来随机抽取一部分经验进行训练。
    - 使用随机梯度下降等优化算法来更新Q网络的参数,以最小化预测Q值与真实Q值之间的差距。
    - 定期或按照一定规则更新目标Q网络的参数,使其与Q网络的参数保持一致或接近。

  4. 特点与优势
    - 能够处理高维状态空间和复杂的决策问题。
    - 通过经验回放机制提高了训练的稳定性和效率。
    - 在许多领域(如游戏、机器人控制、自然语言处理等)都取得了突破性的成果。

  5. 关键技术
    - 经验回放:将历史的状态、动作、奖励等经验存储起来,并在训练过程中按一定规则采样,以提高训练的稳定性和效率。
    - 目标网络:修改网络的更新方式,例如不把刚学习到的网络权重马上用于后续的自益过程,以稳定训练过程。

  6. 应用领域
    - 游戏(如Atari游戏、围棋等)。
    - 机器人控制。
    - 自然语言处理。
    - 其他需要复杂决策的领域。

Deep Q-Learning是一种强大的机器学习算法,它结合了深度学习和强化学习的优势,为解决复杂决策问题提供了新的思路和方法。

1.2 Atari Breakout游戏概述

Atari Breakout游戏是一款经典的Arcade游戏,最初由Atari公司在1976年发布。这款游戏以其简单的游戏机制和富有挑战性的玩法而广受欢迎,成为了游戏历史上的一个里程碑。

游戏概述

  1. 游戏目标:玩家的目标是使用底部的挡板(paddle)来反弹一个球(ball),使其击碎屏幕顶部的砖块(bricks)。每当一个砖块被击中时,它会被消除并获得分数。当所有砖块都被消除时,游戏进入下一关,砖块排列会变得更加紧密,难度也会相应增加。

  2. 游戏机制
    - 挡板(Paddle):玩家控制的挡板位于屏幕底部,可以左右移动来反弹球。挡板的长度通常固定,但某些版本可能允许玩家通过收集特殊物品来增加挡板长度。
    - 球(Ball):球在屏幕上反弹,当球接触到挡板、砖块或屏幕边缘时,它会改变方向。如果球接触到屏幕底部而没有先接触挡板,则游戏结束。
    - 砖块(Bricks):砖块位于屏幕顶部,有多种颜色和形状。不同颜色的砖块可能需要被击中多次才能被消除。某些砖块被消除时可能会掉落特殊物品,如增加挡板长度或给予额外球。

  3. 操作方式:玩家通常使用两个按钮或操纵杆来控制挡板。一个按钮或操纵杆向左移动挡板,另一个向右移动。在某些版本中,玩家还可以使用额外的按钮来发射球或触发特殊效果。

  4. 关卡设计:随着游戏的进行,关卡难度逐渐增加。砖块排列变得更加紧密,需要更多的技巧和时间来消除所有砖块。某些关卡可能还包含障碍物或特殊效果,如移动挡板或改变球的速度。

  5. 游戏影响:Atari Breakout游戏对后来的游戏设计产生了深远影响。它启发了许多类似的砖块消除游戏,如Tetris(俄罗斯方块)和Puzzle Bobble(泡泡龙)。同时,这款游戏也展示了强化学习在解决复杂决策问题方面的潜力,成为了深度学习和强化学习研究中的常用基准测试之一。

Atari Breakout游戏以其简单的游戏机制和富有挑战性的玩法成为了一款经典的游戏作品,并对后来的游戏设计和机器学习研究产生了重要影响。

1.3 智能体概念

智能体(Agent)是一个在人工智能、计算机科学、机器人学、社会学、经济学和认知科学等领域中广泛使用的概念。它通常指的是一个具有自主行为能力的实体,能够在特定环境中感知信息、进行决策并采取行动以实现其目标。

在人工智能领域,智能体通常指能够自主执行任务的软件或硬件系统。这些智能体通常具备以下特性:

  1. 自主性:智能体能够在没有直接人类干预的情况下运行,并根据其内部状态和感知到的环境信息自主决策和行动。
  2. 感知能力:智能体能够感知其所在环境的信息,如通过传感器获取外部数据,或通过分析其他智能体的行为来了解环境状态。
  3. 决策能力:智能体能够基于其感知到的信息和内部状态,通过一定的决策算法(如规则系统、机器学习模型等)来选择合适的行动。
  4. 行动能力:智能体能够执行决策产生的行动,这些行动可能包括移动、操作物体、与其他智能体交互等。
  5. 目标导向性:智能体通常具有明确的目标或任务,其决策和行动都是为了实现这些目标。

智能体可以应用于各种领域,如:

  • 机器人技术:智能体可以作为机器人的大脑,控制机器人的运动和交互行为。
  • 智能系统:智能体可以用于构建智能家居、智能交通等智能系统,通过感知环境和用户需求来提供智能化的服务。
  • 电子商务:智能体可以作为虚拟助手或智能客服,为用户提供个性化的购物建议、解决用户问题等服务。
  • 社交网络:智能体可以模拟人类行为,参与社交网络中的互动和交流。

在强化学习中,智能体通常指与环境进行交互并学习最优策略的学习者。智能体通过与环境进行交互(如执行动作、接收奖励等)来收集数据,并利用这些数据来更新其策略,以便在未来更好地适应环境并实现其目标。

2. 智能体的训练过程

2.1 设置

以下代码是深度Q学习(DQN)在Atari Breakout游戏中实现的设置部分。它主要负责配置训练环境和参数:

  1. 设置后端引擎:代码首先设置了Keras的后端引擎为TensorFlow,确保深度学习模型能够在TensorFlow框架上运行。

  2. 导入所需的库:接着,代码导入了Keras、Gymnasium、NumPy和TensorFlow等库,这些库提供了实现深度Q学习所需的基础功能。

  3. 配置参数:定义了一系列控制训练过程的参数,包括:
    - seed:确保实验的可重复性。
    - gamma:折扣因子,用于计算未来奖励的当前价值。
    - epsilon:贪婪参数,用于平衡探索和利用。
    - epsilon_minepsilon_max:贪婪参数的最小和最大值,用于控制epsilon随时间的衰减。
    - epsilon_interval:用于计算epsilon衰减的速率。
    - batch_size:从重放缓冲区中抽取的样本批量大小。
    - max_steps_per_episode:每个剧集的最大步数。
    - max_episodes:训练过程中的最大剧集数。

  4. 创建和配置Atari环境:使用Gymnasium库创建了Atari Breakout环境,并对其进行了预处理,比如帧率转换、屏幕裁剪等。此外,还使用了FrameStack来堆叠最近的几个帧,为智能体提供更丰富的状态信息,并设置了环境的随机种子以确保结果的可重复性。

import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers

import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing, FrameStack
import numpy as np
import tensorflow as tf

# 整个设置的配置参数
seed = 42
gamma = 0.99  # 过去奖励的折扣因子
epsilon = 1.0  # 贪婪参数epsilon
epsilon_min = 0.1  # 最小贪婪参数epsilon
epsilon_max = 1.0  # 最大贪婪参数epsilon
epsilon_interval = (
    epsilon_max - epsilon_min
)  # 减少采取随机行动机会的速率
batch_size = 32  # 从重放缓冲区取出的批量大小
max_steps_per_episode = 10000
max_episodes = 10  # 限制训练剧集,如果小于1,则会运行直到解决

# 使用Atari环境
# 指定`render_mode`参数以在弹出窗口中显示智能体的尝试。
env = gym.make("BreakoutNoFrameskip-v4")  # , render_mode="human")
# 环境预处理
env = AtariPreprocessing(env)
# 堆叠四帧
env = FrameStack(env, 4)
env.seed(seed)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

2.2 实现深度Q网络

代码定义了用于深度Q学习(DQN)的神经网络模型,并且创建了两个模型:一个用于预测Q值,另一个作为目标模型用于稳定预测未来奖励:

  1. 定义动作数量
    num_actions = 4 表示在Atari Breakout游戏中智能体在每个状态下可以采取的行动数量是4个。

  2. 创建Q网络模型
    create_q_model() 函数定义了DQN的神经网络结构。这个网络结构是根据DeepMind的论文中定义的网络来构建的。网络由以下几层组成:
    - Lambda层:用于将输入张量的维度重新排列,以适应卷积层的输入需求。
    - 卷积层:三个Conv2D层依次堆叠,用于从游戏屏幕的帧中提取特征。第一个卷积层使用32个过滤器,大小为8x8,步长为4,激活函数为ReLU。第二个和第三个卷积层使用64个过滤器,分别具有不同的大小和步长,激活函数也是ReLU。
    - Flatten层:将卷积层的输出扁平化为一维数组,以便输入到密集层。
    - Dense层:第一个全连接层有512个单元,激活函数为ReLU。最后一个全连接层的单元数量等于可能的动作数(即4),激活函数为线性,输出即为每个动作的Q值。

  3. 实例化模型
    model = create_q_model() 创建了第一个模型实例,用于预测在给定状态下每个动作的Q值。

  4. 创建目标模型
    model_target = create_q_model() 创建了第二个模型实例,即目标模型。目标模型的权重会在训练过程中定期更新,以保持预测未来奖励的稳定性。这种技术有助于减少训练过程中的不稳定性。

总的来说,这段代码为深度Q学习算法提供了一个神经网络模型,该模型能够根据当前状态预测采取每个可能动作的预期回报(Q值)。通过训练这个模型,智能体可以学习在Atari Breakout游戏中如何做出最优决策。

这个网络学习Q表的一个近似值,Q表是智能体将采取的动作与状态之间的映射。对于每一个状态,我们都有四个可以采取的行动。环境提供状态,而行动是通过选择输出层预测的四个Q值中较大的一个来选择的。

num_actions = 4
def create_q_model():
    # 由Deepmind论文定义的网络
    return keras.Sequential(
        [
            layers.Lambda(
                lambda tensor: keras.ops.transpose(tensor, [0, 2, 3, 1]),
                output_shape=(84, 84, 4),
                input_shape=(4, 84, 84),
            ),
            # 屏幕帧上的卷积
            layers.Conv2D(32, 8, strides=4, activation="relu", input_shape=(4, 84, 84)),
            layers.Conv2D(64, 4, strides=2, activation="relu"),
            layers.Conv2D(64, 3, strides=1, activation="relu"),
            layers.Flatten(),
            layers.Dense(512, activation="relu"),
            layers.Dense(num_actions, activation="linear"),
        ]
    )

# 第一个模型用于预测Q值,这些Q值用于采取行动。
model = create_q_model()
# 构建一个目标模型,用于预测未来奖励。
# 目标模型的权重每10000步更新一次,因此在计算Q值的损失时,目标Q值是稳定的。
model_target = create_q_model()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

2.3 训练

后面的代码展示了深度Q学习(DQN)算法的核心训练循环,包括了优化器的设置、经验重放机制、探索与利用的平衡策略、神经网络的训练过程以及目标网络的更新:

  1. 优化器设置

    • 使用Adam优化器代替了DeepMind论文中的RMSProp,因为Adam优化器在训练时间上有所改善。
    • 设置学习率为0.00025,并限制梯度的最大值为1.0(clipnorm=1.0),以避免梯度爆炸问题。
  2. 经验重放缓冲区

    • 定义了多个列表来存储历史数据,包括行动、状态、下一个状态、奖励、结束标志和剧集奖励。
  3. 训练参数

    • epsilon_random_frames:在训练初期采取随机行动的帧数,以促进探索。
    • epsilon_greedy_frames:探索期的总帧数。
    • max_memory_length:重放缓冲区的最大长度,超过这个长度会开始删除旧数据。
    • update_after_actions:每进行几次动作后更新一次模型。
    • update_target_network:每隔多少帧更新一次目标网络的权重。
    • loss_function:使用Huber损失函数,有助于提高训练的稳定性。
  4. 训练循环

    • 使用while True开始一个无限循环,这将一直运行直到满足停止条件。
  5. 环境重置与状态初始化

    • 在每个剧集开始时,重置环境并初始化状态。
  6. 探索与利用

    • 使用epsilon-greedy策略来平衡探索和利用。在训练初期,智能体更倾向于探索(随机选择行动),随着训练的进行,逐渐增加利用已知信息采取最佳行动的比例。
  7. 环境交互

    • 根据当前策略(探索或利用)选择行动,并在环境中执行这个行动,获取下一个状态、奖励和是否结束的信号。
  8. 更新epsilon值

    • 逐渐减少采取随机行动的概率,使得智能体从探索转向利用。
  9. 存储数据到重放缓冲区

    • 将当前的行动、状态、下一个状态、奖励和结束信号存储到对应的历史列表中。
  10. 神经网络训练
    - 达到一定条件后(每update_after_actions步或批量大小超过batch_size),从重放缓冲区中随机采样数据进行训练。
    - 使用目标网络预测未来奖励,计算Q值损失,并进行反向传播更新模型权重。

  11. 目标网络更新
    - 每隔update_target_network帧,将目标网络的权重更新为与主网络相同,以保持预测的稳定性。

  12. 日志记录与停止条件
    - 定期记录当前的运行奖励,并在满足解决条件(运行奖励超过40)或达到最大剧集数时停止训练。

整个训练过程涉及到智能体与环境的交互、经验的存储与重放、神经网络的更新以及目标网络的同步,这些都是深度Q学习算法的关键组成部分。

# 在Deepmind论文中他们使用RMSProp,但后来Adam优化器改善了训练时间
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)

# 经验重放缓冲区
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0
# 采取随机行动并观察输出的帧数
epsilon_random_frames = 50000
# 探索的帧数
epsilon_greedy_frames = 1000000.0
# 最大重放长度
# 注意:Deepmind论文建议1000000,但这会导致内存问题
max_memory_length = 100000
# 每4个动作后训练模型
update_after_actions = 4
# 更新目标网络的频率
update_target_network = 10000
# 使用huber损失函数以提高稳定性
loss_function = keras.losses.Huber()

while True:
    observation, _ = env.reset()
    state = np.array(observation)
    episode_reward = 0

    for timestep in range(1, max_steps_per_episode):
        frame_count += 1

        # 使用epsilon-greedy进行探索
        if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
            # 采取随机行动
            action = np.random.choice(num_actions)
        else:
            # 预测行动Q值
            # 从环境状态
            state_tensor = keras.ops.convert_to_tensor(state)
            state_tensor = keras.ops.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor, training=False)
            # 采取最佳行动
            action = keras.ops.argmax(action_probs[0]).numpy()

        # 减少采取随机行动的概率
        epsilon -= epsilon_interval / epsilon_greedy_frames
        epsilon = max(epsilon, epsilon_min)

        # 在我们的环境里应用采样的行动
        state_next, reward, done, _, _ = env.step(action)
        state_next = np.array(state_next)

        episode_reward += reward

        # 在重放缓冲区中保存行动和状态
        action_history.append(action)
        state_history.append(state)
        state_next_history.append(state_next)
        done_history.append(done)
        rewards_history.append(reward)
        state = state_next

        # 每第四帧更新一次,并且一旦批量大小超过32
        if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
            # 获取重放缓冲区的样本索引
            indices = np.random.choice(range(len(done_history)), size=batch_size)

            # 使用列表推导式从重放缓冲区中采样
            state_sample = np.array([state_history[i] for i in indices])
            state_next_sample = np.array([state_next_history[i] for i in indices])
            rewards_sample = [rewards_history[i] for i in indices]
            action_sample = [action_history[i] for i in indices]
            done_sample = keras.ops.convert_to_tensor(
                [float(done_history[i]) for i in indices]
            )

            # 构建采样未来状态的更新Q值
            # 使用目标模型以提高稳定性
            future_rewards = model_target.predict(state_next_sample)
            # Q值 = 奖励 + 折扣因子 * 预期未来奖励
            updated_q_values = rewards_sample + gamma * keras.ops.amax(
                future_rewards, axis=1
            )

            # 如果是最后一帧,将最后值设为-1
            updated_q_values = updated_q_values * (1 - done_sample) - done_sample

            # 创建一个掩码,以便我们只在更新的Q值上计算损失
            masks = keras.ops.one_hot(action_sample, num_actions)

            with tf.GradientTape() as tape:
                # 在状态和更新的Q值上训练模型
                q_values = model(state_sample)

                # 应用掩码到Q值上,以获得采取行动的Q值
                q_action = keras.ops.sum(keras.ops.multiply(q_values, masks), axis=1)
                # 计算新Q值和旧Q值之间的损失
                loss = loss_function(updated_q_values, q_action)

            # 反向传播
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if frame_count % update_target_network == 0:
            # 使用新权重更新目标网络
            model_target.set_weights(model.get_weights())
            # 记录详情
            template = "running reward: {:.2f} at episode {}, frame count {}"
            print(template.format(running_reward, episode_count, frame_count))

        # 限制状态和奖励历史
        if len(rewards_history) > max_memory_length:
            del rewards_history[:1]
            del state_history[:1]
            del state_next_history[:1]
            del action_history[:1]
            del done_history[:1]

        if done:
            break

    # 更新运行奖励以检查解决条件
    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)

    episode_count += 1

    if running_reward > 40:  # 认为任务解决的条件
        print("Solved at episode {}!".format(episode_count))
        break

    if (
        max_episodes > 0 and episode_count >= max_episodes
    ):  # 达到最大剧集数
        print("Stopped at episode {}!".format(episode_count))
        break
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142

3. 示例源代码

"""
## Setup
"""

import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers

import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing, FrameStack
import numpy as np
import tensorflow as tf

# Configuration parameters for the whole setup
seed = 42
gamma = 0.99  # Discount factor for past rewards
epsilon = 1.0  # Epsilon greedy parameter
epsilon_min = 0.1  # Minimum epsilon greedy parameter
epsilon_max = 1.0  # Maximum epsilon greedy parameter
epsilon_interval = (
    epsilon_max - epsilon_min
)  # Rate at which to reduce chance of random action being taken
batch_size = 32  # Size of batch taken from replay buffer
max_steps_per_episode = 10000
max_episodes = 10  # Limit training episodes, will run until solved if smaller than 1

# Use the Atari environment
# Specify the `render_mode` parameter to show the attempts of the agent in a pop up window.
env = gym.make("BreakoutNoFrameskip-v4")  # , render_mode="human")
# Environment preprocessing
env = AtariPreprocessing(env)
# Stack four frames
env = FrameStack(env, 4)
env.seed(seed)
"""
## Implement the Deep Q-Network

This network learns an approximation of the Q-table, which is a mapping between
the states and actions that an agent will take. For every state we'll have four
actions, that can be taken. The environment provides the state, and the action
is chosen by selecting the larger of the four Q-values predicted in the output layer.

"""

num_actions = 4


def create_q_model():
    # Network defined by the Deepmind paper
    return keras.Sequential(
        [
            layers.Lambda(
                lambda tensor: keras.ops.transpose(tensor, [0, 2, 3, 1]),
                output_shape=(84, 84, 4),
                input_shape=(4, 84, 84),
            ),
            # Convolutions on the frames on the screen
            layers.Conv2D(32, 8, strides=4, activation="relu", input_shape=(4, 84, 84)),
            layers.Conv2D(64, 4, strides=2, activation="relu"),
            layers.Conv2D(64, 3, strides=1, activation="relu"),
            layers.Flatten(),
            layers.Dense(512, activation="relu"),
            layers.Dense(num_actions, activation="linear"),
        ]
    )


# The first model makes the predictions for Q-values which are used to
# make a action.
model = create_q_model()
# Build a target model for the prediction of future rewards.
# The weights of a target model get updated every 10000 steps thus when the
# loss between the Q-values is calculated the target Q-value is stable.
model_target = create_q_model()


"""
## Train
"""
# In the Deepmind paper they use RMSProp however then Adam optimizer
# improves training time
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)

# Experience replay buffers
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0
# Number of frames to take random action and observe output
epsilon_random_frames = 50000
# Number of frames for exploration
epsilon_greedy_frames = 1000000.0
# Maximum replay length
# Note: The Deepmind paper suggests 1000000 however this causes memory issues
max_memory_length = 100000
# Train the model after 4 actions
update_after_actions = 4
# How often to update the target network
update_target_network = 10000
# Using huber loss for stability
loss_function = keras.losses.Huber()

while True:
    observation, _ = env.reset()
    state = np.array(observation)
    episode_reward = 0

    for timestep in range(1, max_steps_per_episode):
        frame_count += 1

        # Use epsilon-greedy for exploration
        if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
            # Take random action
            action = np.random.choice(num_actions)
        else:
            # Predict action Q-values
            # From environment state
            state_tensor = keras.ops.convert_to_tensor(state)
            state_tensor = keras.ops.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor, training=False)
            # Take best action
            action = keras.ops.argmax(action_probs[0]).numpy()

        # Decay probability of taking random action
        epsilon -= epsilon_interval / epsilon_greedy_frames
        epsilon = max(epsilon, epsilon_min)

        # Apply the sampled action in our environment
        state_next, reward, done, _, _ = env.step(action)
        state_next = np.array(state_next)

        episode_reward += reward

        # Save actions and states in replay buffer
        action_history.append(action)
        state_history.append(state)
        state_next_history.append(state_next)
        done_history.append(done)
        rewards_history.append(reward)
        state = state_next

        # Update every fourth frame and once batch size is over 32
        if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
            # Get indices of samples for replay buffers
            indices = np.random.choice(range(len(done_history)), size=batch_size)

            # Using list comprehension to sample from replay buffer
            state_sample = np.array([state_history[i] for i in indices])
            state_next_sample = np.array([state_next_history[i] for i in indices])
            rewards_sample = [rewards_history[i] for i in indices]
            action_sample = [action_history[i] for i in indices]
            done_sample = keras.ops.convert_to_tensor(
                [float(done_history[i]) for i in indices]
            )

            # Build the updated Q-values for the sampled future states
            # Use the target model for stability
            future_rewards = model_target.predict(state_next_sample)
            # Q value = reward + discount factor * expected future reward
            updated_q_values = rewards_sample + gamma * keras.ops.amax(
                future_rewards, axis=1
            )

            # If final frame set the last value to -1
            updated_q_values = updated_q_values * (1 - done_sample) - done_sample

            # Create a mask so we only calculate loss on the updated Q-values
            masks = keras.ops.one_hot(action_sample, num_actions)

            with tf.GradientTape() as tape:
                # Train the model on the states and updated Q-values
                q_values = model(state_sample)

                # Apply the masks to the Q-values to get the Q-value for action taken
                q_action = keras.ops.sum(keras.ops.multiply(q_values, masks), axis=1)
                # Calculate loss between new Q-value and old Q-value
                loss = loss_function(updated_q_values, q_action)

            # Backpropagation
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if frame_count % update_target_network == 0:
            # update the the target network with new weights
            model_target.set_weights(model.get_weights())
            # Log details
            template = "running reward: {:.2f} at episode {}, frame count {}"
            print(template.format(running_reward, episode_count, frame_count))

        # Limit the state and reward history
        if len(rewards_history) > max_memory_length:
            del rewards_history[:1]
            del state_history[:1]
            del state_next_history[:1]
            del action_history[:1]
            del done_history[:1]

        if done:
            break

    # Update running reward to check condition for solving
    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)

    episode_count += 1

    if running_reward > 40:  # Condition to consider the task solved
        print("Solved at episode {}!".format(episode_count))
        break

    if (
        max_episodes > 0 and episode_count >= max_episodes
    ):  # Maximum number of episodes reached
        print("Stopped at episode {}!".format(episode_count))
        break

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226

4. 总结

深度Q学习(DQN)是一种创新的强化学习算法,它成功地结合了深度学习与Q-Learning的优势,以处理高维状态空间和复杂的决策问题。DQN的核心在于使用深度神经网络来近似Q函数,该网络接受环境状态作为输入,并输出每个可能动作的预期奖励。通过经验回放和目标网络技术,DQN提高了训练过程的稳定性和效率,这些技术分别用于通过随机采样历史经验来减少数据相关性,并稳定学习过程中的预测。

Atari Breakout是一款具有里程碑意义的经典Arcade游戏,自1976年发布以来,以其简单的游戏机制和挑战性玩法广受欢迎。在游戏中,玩家的目标是操控一个挡板来反弹球,击碎屏幕顶部的砖块。随着游戏的进行,关卡难度逐渐增加,砖块排列变得更为紧密,这为强化学习算法提供了一个理想的测试平台。

智能体是人工智能领域中的一个核心概念,指能够在特定环境中自主感知、决策和行动以实现目标的实体。智能体具备自主性、感知能力、决策能力、行动能力和目标导向性等关键特性,广泛应用于机器人技术、智能系统、电子商务和社交网络等多个领域。

在DQN的训练过程中,首先需要进行设置,包括选择合适的后端引擎、导入所需的库、配置训练参数和准备Atari Breakout环境。接着,实现深度Q网络,定义神经网络结构并创建Q网络和目标网络。最后,在训练循环中,智能体与环境进行交互,采取动作并记录经验,然后使用经验回放机制和优化算法来更新Q网络的参数,定期更新目标网络以保持预测的稳定性。

通过这样的训练过程,智能体能够学习如何在Atari Breakout游戏中有效地击碎砖块,提高其在游戏中的表现。DQN算法的成功实施展示了其在解决复杂决策问题方面的巨大潜力,为机器学习领域带来了新的研究思路和方法。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/706071
推荐阅读
相关标签
  

闽ICP备14008679号