赞
踩
import gym import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import numpy as np import random from collections import deque class DQNAgent: def __init__(self, state_size, action_size, memory_size=100000, batch_size=32, gamma=0.99, epsilon=1.0, epsilon_decay=0.995, lr=0.001): self.state_size = state_size self.action_size = action_size self.memory = deque(maxlen=memory_size) self.batch_size = batch_size self.gamma = gamma self.epsilon = epsilon self.epsilon_decay = epsilon_decay self.lr = lr self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.q_network = self.build_model().to(self.device) self.target_network = self.build_model().to(self.device) self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr) def build_model(self): model = nn.Sequential( nn.Linear(self.state_size, 64), nn.ReLU(), nn.Linear(64, 64), nn.ReLU(), nn.Linear(64, self.action_size) ) return model def update_target_network(self): self.target_network.load_state_dict(self.q_network.state_dict()) def remember(self, state, action, reward, next_state, done): self.memory.append((state, action, reward, next_state, done)) def choose_action(self, state): if np.random.rand() < self.epsilon: return np.random.choice(self.action_size) state = torch.from_numpy(state).float().to(self.device) with torch.no_grad(): q_values = self.q_network(state) return q_values.argmax().item() def learn(self): if len(self.memory) < self.batch_size: return minibatch = random.sample(self.memory, self.batch_size) states = torch.from_numpy(np.vstack([x[0] for x in minibatch])).float().to(self.device) actions = torch.from_numpy(np.array([x[1] for x in minibatch])).long().to(self.device) rewards = torch.from_numpy(np.array([x[2] for x in minibatch])).float().to(self.device) next_states = torch.from_numpy(np.vstack([x[3] for x in minibatch])).float().to(self.device) dones = torch.from_numpy(np.array([x[4] for x in minibatch]).astype(np.uint8)).float().to(self.device) q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1) next_q_values = self.target_network(next_states).max(1)[0] expected_q_values = rewards + (1 - dones) * self.gamma * next_q_values loss = F.mse_loss(q_values, expected_q_values.detach()) self.optimizer.zero_grad() loss.backward() self.optimizer.step() def update_epsilon(self): self.epsilon = max(self.epsilon * self.epsilon_decay, 0.01) def save_model(self, filename): torch.save(self.q_network.state_dict(), filename) def load_model(self, filename): self.q_network.load_state_dict(torch.load(filename)) def train(env, agent, episodes, max_steps): scores = deque(maxlen=100) for i_episode in range(1, episodes + 1): state = env.reset() score = 0 for t in range(max_steps): action = agent.choose_action(state) next_state, reward, done, _ = env.step(action) agent.remember(state, action, reward, next_state, done) state = next_state score += reward agent.learn() if done: break scores.append(score) agent.update_epsilon() if i_episode % 100 == 0: print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores)), end="") if np.mean(scores) >= 200: print('\nEnvironment solved in {} episodes!\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores))) agent.save_model('dqn.pth') break if i_episode % 20 == 0: agent.update_target_network() return scores env = gym.make('CartPole-v1') agent = DQNAgent(state_size=env.observation_space.shape[0], action_size=env.action_space.n) scores = train(env, agent, episodes=1000, max_steps=1000)
在上述代码中,train() 函数使用给定的 DQNAgent 对象和 OpenAI Gym 环境训练智能体。训练完成后,该函数返回最近 100 个 episode 得分的平均值的列表。在训练过程中,每隔一定 episode 数,目标网络将被更新,并且 ϵ \epsilon ϵ 值将按指定的速率逐渐降低。
这将在 CartPole-v1 环境中训练一个智能体,进行 1000 个 episode 的训练,每个 episode 最多运行 1000 步。在训练过程中,如果智能体的平均得分超过 200 分,训练将停止,并且训练期间的最终模型将保存在名为 dqn.pth 的文件中。
import tensorflow as tf import numpy as np import random class DQNAgent: def __init__(self, state_size, action_size): self.state_size = state_size self.action_size = action_size self.memory = [] self.gamma = 0.95 self.epsilon = 1.0 self.epsilon_min = 0.01 self.epsilon_decay = 0.995 self.learning_rate = 0.001 self.model = self._build_model() def _build_model(self): model = tf.keras.models.Sequential() model.add(tf.keras.layers.Dense(24, input_dim=self.state_size, activation='relu')) model.add(tf.keras.layers.Dense(24, activation='relu')) model.add(tf.keras.layers.Dense(self.action_size, activation='linear')) model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=self.learning_rate)) return model def remember(self, state, action, reward, next_state, done): self.memory.append((state, action, reward, next_state, done)) def act(self, state): if np.random.rand() <= self.epsilon: return random.randrange(self.action_size) act_values = self.model.predict(state) return np.argmax(act_values[0]) def replay(self, batch_size): minibatch = random.sample(self.memory, batch_size) for state, action, reward, next_state, done in minibatch: target = reward if not done: target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0])) target_f = self.model.predict(state) target_f[0][action] = target self.model.fit(state, target_f, epochs=1, verbose=0) if self.epsilon > self.epsilon_min: self.epsilon *= self.epsilon_decay def load(self, name): self.model.load_weights(name) def save(self, name): self.model.save_weights(name)
运行示例
import gym # 定义环境和智能体 env = gym.make('CartPole-v1') state_size = env.observation_space.shape[0] action_size = env.action_space.n agent = DQNAgent(state_size, action_size) # 训练智能体 batch_size = 32 num_episodes = 1000 for e in range(num_episodes): state = env.reset() state = np.reshape(state, [1, state_size]) score = 0 done = False while not done: action = agent.act(state) next_state, reward, done, _ = env.step(action) next_state = np.reshape(next_state, [1, state_size]) agent.remember(state, action, reward, next_state, done) state = next_state score += reward print("episode: {}/{}, score: {}".format(e, num_episodes, score)) if len(agent.memory) > batch_size: agent.replay(batch_size)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。