赞
踩
单特征:数据集中只包含2列,时间列+价格列,仅利用价格来预测价格
目录
3、创建配置类,将LSTM的各个超参数声明为变量,便于后续使用
自建数据集--【load.xlsx】。包含2列:
实现基于时间序列的单特征价格预测
- # 字符串前的r表示一个"原始字符串",raw string
- # 文件路径中包含多个反斜杠。如果我们不使用原始字符串(即不使用r前缀),那么Python会尝试解析\U、\N等作为转义序列,这会导致错误
- data = pd.read_excel(r'E:\load.xlsx', index_col='date')
- # print(data)
- prices = data['price'].values
- # print(prices)
打印data:
打印prices:
- scaler = MinMaxScaler(feature_range=(0, 1))
- scaled_prices = scaler.fit_transform(prices.reshape(-1, 1)) # 二维数组
- # print(scaled_prices)
打印归一化后的价格数据:
- class Config():
- timestep = 7 # 时间步长,滑动窗口大小
- feature_size = 1 # 每个步长对应的特征数量,这里只使用1维,每天的价格数据
- batch_size = 1 # 批次大小
- output_size = 1 # 单输出任务,输出层为1,预测未来1天的价格
- hidden_size = 128 # 隐藏层大小
- num_layers = 1 # lstm的层数
- learning_rate = 0.0001 # 学习率
- epochs = 500 # 迭代轮数
- model_name = 'lstm' # 模型名
- best_loss = 0 # 记录损失
- config = Config()
'运行
- # 创建时间序列数据
- X, y = [], []
- for i in range(len(scaled_prices) - config.timestep):
- # 从当前索引i开始,取sequence_length个连续的价格数据点,并将其作为特征添加到列表 X 中。
- X.append(scaled_prices[i: i + config.timestep])
- # 将紧接着这sequence_length个时间点的下一个价格数据点作为目标添加到列表y中。
- y.append(scaled_prices[i + config.timestep])
- X = np.array(X)
- y = np.array(y)
打印特征数据:
打印标签数据:
- # 确定测试集的大小
- test_size = int(len(scaled_prices) * 0.1)
-
- # 为了确保训练集和测试集的划分不会破坏时间序列的连续性,我们需要从时间序列的开头开始划分训练集。
- X_train = X[:-test_size]
- y_train = y[:-test_size]
- X_test = X[-test_size:]
- y_test = y[-test_size:]
- X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
- y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
- X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
- y_test_tensor = torch.tensor(y_test, dtype=torch.float32)
打印X_train_tensor:
打印y_train_tensor:
- # 形成训练数据集
- train_data = TensorDataset(X_train_tensor, y_train_tensor)
- test_data = TensorDataset(X_test_tensor, y_test_tensor)
-
- # 将数据加载成迭代器
- train_loader = torch.utils.data.DataLoader(train_data, batch_size=config.batch_size, shuffle=False)
- test_loader = torch.utils.data.DataLoader(test_data, batch_size=config.batch_size, shuffle=False)
打印迭代器:
(1)配置LSTM网络的基本超参数[特征大小、隐藏层大小、网络层数、输出层大小]
(2)定义网络最后的全连接层[fc]
(3)定义神经网络模型的前向传播过程[forward( )]
①获取输入数据的批量大小
②初始化隐藏层状态
③LSTM运算
④获取LSTM网络的最后一个时间步的输出,LSTM网络的输出形状:(batch_size, sequence_length, hidden_size)
⑤输出经过全连接层,产生一个值
- class LSTM(nn.Module):
- # __init__方法用于初始化模型的各种层和参数
- def __init__(self, feature_size, hidden_size, num_layers, output_size):
- super(LSTM, self).__init__()
- self.hidden_size = hidden_size
- self.num_layers = num_layers
- # 参数batch_first=True意味着输入张量的形状应该是[batch_size, sequence_length, input_size]
- self.lstm = nn.LSTM(feature_size, hidden_size, num_layers, batch_first=True)
- # input_size 是全连接层的输入特征数,output_size 是输出特征数
- self.fc = nn.Linear(hidden_size, output_size)
-
- # forward方法定义了输入数据通过模型各层的前向传播过程。
- # 模型的forward方法被自动调用,并且X_train_batch作为参数传递给这个方法。
- def forward(self, x, hidden=None):
- # 获取输入数据x的第一维度的大小,即批量大小(batch size)
- # x的形状通常是 (batch_size, sequence_length, feature_size)
- batch_size = x.shape[0]
-
- # 初始化隐藏层状态
- # 如果隐藏状态hidden没有被提供(即为 None),则使用全零张量来初始化LSTM的隐藏状态h_0和细胞状态c_0
- if hidden is None:
- h_0 = x.data.new(self.num_layers, batch_size, self.hidden_size).fill_(0).float()
- c_0 = x.data.new(self.num_layers, batch_size, self.hidden_size).fill_(0).float()
- else:
- h_0, c_0 = hidden
-
- # LSTM运算
- # 将输入数据 x 和初始化的隐藏状态 (h_0, c_0) 传递给LSTM层,进行前向计算。
- # LSTM层的输出包含两部分:输出序列lstm_output 和 最终的隐藏状态
- # LSTM层在其内部单元中使用了sigmoid和tanh激活函数来处理输入和循环连接
- # self.lstm(x, (h_0, c_0))时,LSTM层内部的计算已经包括了激活函数的应用
- lstm_output, (h_0, c_0) = self.lstm(x, (h_0, c_0))
-
- # 取最后一个时间步的输出
- # lstm_output 的形状是 (batch_size, sequence_length, hidden_size)
- # ::第一个维度(批量维度)取全部。
- # -1:第二个维度(序列长度维度)取最后一个元素。在 Python 中,-1 索引表示从末尾开始计数的第一个元素。
- # ::第三个维度(隐藏层大小维度)取全部。
- final_output = lstm_output[:, -1, :]
-
- # 全连接层
- # 回归任务的目标是预测一个连续值,而不是进行分类,因此输出层应该能够直接产生实数范围的输出
- # 在PyTorch中,如果你不指定激活函数,nn.Linear层默认就是线性的,即它不会对其输出应用任何非线性函数
- output = self.fc(final_output)
-
- return output
(1)通过创建的LSTM类传入参数
(2)定义损失函数
(3)定义梯度下降优化器
- # 创建了一个LSTM模型的实例
- model = LSTM(config.feature_size, config.hidden_size, config.num_layers, config.output_size)
- # 损失函数
- loss_function = nn.MSELoss()
- # 创建一个Adam优化器的实例,用于在训练过程中更新模型的权重
- optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)
(1)调用train()方法模型训练
(2)利用训练数据集进行模型预测
(3)计算预测值和真实值之间的误差
(4)利用误差反向传播
(5)使用优化器更新权重
(1)调用eval()方法模型验证
(2)传入测试数据集传入模型,得出测试数据集的预测值
(3)计算预测值和真实值之间的误差
(4)累加损失
(5)利用 损失累加值 和 测试集长度 计算平均损失
(6)将 测试集的预测值 和 真实值 组合成数组
(7)将预测值 和 真实值 反归一化,转化为真实值
为什么要将模型的输出或目标变量转换为原始尺度呢? (1)解释性:当模型的输出被缩放或标准化时,它们可能不再具有直观的解释性。 例如,如果目标变量是股票价格,并且您使用 MinMaxScaler 将其缩放到 [0, 1] 范围内,那么模型输出的数值就不再代表实际的股票价格。 通过逆变换,您可以将输出重新映射到其原始尺度,从而更容易地解释和沟通结果。 (2)性能评估:在评估模型性能时,通常需要使用与原始数据尺度相同的指标。 例如,如果您要计算均方误差(MSE)或平均绝对误差(MAE),则需要在原始尺度上进行这些计算,以便正确评估预测误差的大小。 (3)可视化:当您想要将模型的预测结果与实际数据一起可视化时,使用原始尺度可以确保图表和图形具有正确的轴标签和比例,从而更容易进行视觉比较。
- # 训练模型
- for epoch in range(config.epochs):
- # train()函数是torch.nn.Module类的一个方法
- model.train()
- train_loss_sum = 0.0
- # 形成进度条
- train_bar = tqdm(train_loader)
- for data in train_bar:
- X_train_batch, y_train_batch = data
- # 这行代码清除了之前优化步骤中累积的所有梯度。由于PyTorch在反向传播时默认会累积梯度,所以在每个新的优化步骤开始之前,需要手动清除它们。
- optimizer.zero_grad()
- # 通过模型进行前向传播以获取预测值(y_train_pred)
- y_train_pred = model(X_train_batch)
- # 预测值和真实值(y_train_batch)之间的损失。
- loss = loss_function(y_train_pred, y_train_batch.view(-1))
- # 进行反向传播。它计算损失相对于模型参数的梯度,这些梯度之后将被用于更新模型的权重。
- loss.backward()
- # 根据之前计算的梯度来更新模型的权重。具体来说,它使用之前定义的Adam优化器来调整模型的参数以最小化损失。
- optimizer.step()
- train_bar.desc = "train epoch[{} / {}] loss:{:.3f}".format(epoch + 1, config.epochs, loss)
-
- # 当你想要使用已经训练好的模型来预测新的、未见过的数据时,你需要确保模型处于评估模式
- # 在每个epoch结束后,,模型验证
- # model.eval()会改变模型中特定层的行为,如Dropout和BatchNorm层,使它们在推理时保持固定
- # 在训练模式中,这些层可能会引入随机性或根据训练数据的统计信息进行归一化;而在评估模式中,它们会保持确定性行为,以确保输出的一致性
- model.eval()
- test_loss_sum = 0.0
- y_test_preds = [] # 初始化一个空列表来保存所有预测结果
- y_test_true = [] # 初始化一个空列表来保存所有真实值
- test_bar = tqdm(test_loader)
- # 使用了torch.no_grad()上下文管理器来确保在验证过程中不计算梯度,因为不需要更新模型的权重,从而节省内存
- with torch.no_grad():
- for data in test_bar:
- X_test_batch, y_test_batch = data
- # 将测试数据X_test传递给模型,并接收模型的预测输出
- # 由于梯度计算被禁用,这个前向传播过程会更快,并且不会消耗额外的资源来存储梯度信息。
- y_test_pred = model(X_test_batch) # 获取当前批次的预测结果
- # 通常用于累积测试损失(test loss)
- test_loss = loss_function(y_test_pred, y_test_batch.view(-1))
- # test_loss.item()将PyTorch张量(tensor)中的单个值(在这种情况下是一个标量,即损失值)转换为Python数字(通常是浮点数)
- # 在评估模型性能时,能够计算整个测试集的平均损失,而不仅仅是单个批次或样本的损失
- # 通过累积所有批次或样本的损失,并在最后除以总的批次数或样本数,你可以得到一个更准确的模型性能估计
- test_loss_sum += test_loss.item() # 累加损失
- y_test_preds.append(y_test_pred.detach().numpy()) # 将预测结果追加到列表中
- y_test_true.append(y_test_batch.detach().numpy()) # 将真实值追加到列表中
- test_loss_avg = test_loss_sum / len(test_loader) # 计算平均测试损失
- y_test_preds= np.concatenate(y_test_preds, axis=0) # 将所有批次的预测结果连接成一个数组
- y_test_true = np.concatenate(y_test_true, axis=0) # 将所有批次的真实值连接成一个数组
-
- # 反归一化预测值和真实值
- # scaler.inverse_transform(...):使用之前拟合的 MinMaxScaler 将缩放后的预测值转换回其原始尺度
- # 以便在原始尺度上评估模型性能
- y_test_preds_unnormalized = scaler.inverse_transform(y_test_preds)
- y_test_true_unnormalized = scaler.inverse_transform(y_test_true.reshape(-1, 1))
-
- if test_loss_avg < config.best_loss:
- config.best_loss = test_loss_avg
- print("训练结束")
- # 绘制真实值和预测值的对比图
- # 创建一个新的图形窗口,并设置其大小为宽10英寸、高5英寸。
- plt.figure(figsize=(10, 5))
- # 绘制真实值(y_test)的折线图,并为其添加标签 "True Values"。
- plt.plot(y_test_true_unnormalized, label='True Values')
- # 绘制预测值(predicted)的折线图。这条线被标记为 "Predicted Values"。
- plt.plot(y_test_preds_unnormalized, label='Predictions')
- plt.title('Comparison of True Values and Predictions')
- plt.xlabel('Time Steps')
- plt.ylabel('Prices')
- plt.legend()
- plt.show()
- import pandas as pd
- import numpy as np
- import torch
- from sklearn.preprocessing import MinMaxScaler
- from torch import nn
- from torch.utils.data import TensorDataset
- from tqdm import tqdm
- import matplotlib.pyplot as plt
-
- class Config():
- timestep = 1 # 时间步长,滑动窗口大小
- feature_size = 1 # 每个步长对应的特征数量,这里只使用1维,每天的价格数据
- batch_size = 1 # 批次大小
- output_size = 1 # 单输出任务,输出层为1
- hidden_size = 128 # 隐藏层大小
- num_layers = 1 # bilstm的层数
- learning_rate = 0.0001 # 学习率
- epochs = 500 # 迭代轮数
- model_name = 'lstm' # 模型名
- best_loss = 0 # 记录损失
- config = Config()
-
- qy_data = pd.read_excel(r'C:\Users\Desktop\load.xlsx', index_col='date')
- prices = qy_data['price'].values
-
- scaler = MinMaxScaler()
- scaled_prices = scaler.fit_transform(prices.reshape(-1, 1))
-
- # 创建时间序列数据
- X, y = [], []
- for i in range(len(scaled_prices) - config.timestep):
- X.append(scaled_prices[i: i + config.timestep])
- y.append(scaled_prices[i + config.timestep])
- X = np.array(X)
- y = np.array(y)
-
- # 确定测试集的大小
- test_size = int(len(scaled_prices) * 0.1)
-
- # 创建训练和测试数据
- X_train, y_train = [], []
- X_test, y_test = [], []
-
- # 构建训练数据
- for i in range(len(scaled_prices) - config.timestep - test_size):
- X_train.append(scaled_prices[i:i + config.timestep])
- y_train.append(scaled_prices[i + config.timestep])
-
- # 构建测试数据(取时间序列末尾的数据)
- for i in range(len(scaled_prices) - config.timestep - test_size, len(scaled_prices) - config.timestep):
- X_test.append(scaled_prices[i:i + config.timestep])
- y_test.append(scaled_prices[i + config.timestep])
-
- # 将列表转换为NumPy数组
- X_train = np.array(X_train)
- y_train = np.array(y_train)
- X_test = np.array(X_test)
- y_test = np.array(y_test)
-
- # 将数据转换为PyTorch张量
- X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
- y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
- X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
- y_test_tensor = torch.tensor(y_test, dtype=torch.float32)
-
- # 形成训练数据集
- train_data = TensorDataset(X_train_tensor, y_train_tensor)
- test_data = TensorDataset(X_test_tensor, y_test_tensor)
-
- # 将数据加载成迭代器
- train_loader = torch.utils.data.DataLoader(train_data, batch_size=config.batch_size, shuffle=False)
- test_loader = torch.utils.data.DataLoader(test_data, batch_size=config.batch_size, shuffle=False)
-
- # 定义LSTM网络
- class LSTM(nn.Module):
- # __init__方法用于初始化模型的各种层和参数
- def __init__(self, feature_size, hidden_size, num_layers, output_size):
- super(LSTM, self).__init__()
- self.hidden_size = hidden_size
- self.num_layers = num_layers
- self.lstm = nn.LSTM(feature_size, hidden_size, num_layers, batch_first=True)
- self.fc = nn.Linear(hidden_size, output_size)
-
- # forward方法定义了输入数据通过模型各层的前向传播过程。
- # 模型的forward方法被自动调用,并且X_train_batch作为参数传递给这个方法。
- def forward(self, x, hidden=None):
- batch_size = x.shape[0]
-
- if hidden is None:
- h_0 = x.data.new(self.num_layers, batch_size, self.hidden_size).fill_(0).float()
- c_0 = x.data.new(self.num_layers, batch_size, self.hidden_size).fill_(0).float()
- else:
- h_0, c_0 = hidden
-
- lstm_output, (h_0, c_0) = self.lstm(x, (h_0, c_0))
-
- lstm_output = self.dropout(lstm_output)
-
- final_output = lstm_output[:, -1, :]
-
- output = self.fc(final_output)
-
- return output
-
-
- # 这行代码创建了一个LSTM模型的实例
- model = LSTM(config.feature_size, config.hidden_size, config.num_layers, config.output_size, config.dropout)
- loss_function = nn.MSELoss()
- optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)
-
- # 训练模型
- for epoch in range(config.epochs):
- model.train()
- train_loss_sum = 0.0
- # 形成进度条
- train_bar = tqdm(train_loader)
- for data in train_bar:
- X_train_batch, y_train_batch = data
- optimizer.zero_grad()
- y_train_pred = model(X_train_batch)
- loss = loss_function(y_train_pred, y_train_batch.view(-1))
- loss.backward()
- optimizer.step()
- train_bar.desc = "train epoch[{} / {}] loss:{:.3f}".format(epoch + 1, config.epochs, loss)
-
- model.eval()
- test_loss_sum = 0.0
- y_test_preds = [] # 初始化一个空列表来保存所有预测结果
- y_test_true = [] # 初始化一个空列表来保存所有真实值
- test_bar = tqdm(test_loader)
- with torch.no_grad():
- for data in test_bar:
- X_test_batch, y_test_batch = data
- y_test_pred = model(X_test_batch) # 获取当前批次的预测结果
- test_loss = loss_function(y_test_pred, y_test_batch.view(-1))
- test_loss_sum += test_loss.item() # 累加损失
- y_test_preds.append(y_test_pred.detach().numpy()) # 将预测结果追加到列表中
- y_test_true.append(y_test_batch.detach().numpy()) # 将真实值追加到列表中
- test_loss_avg = test_loss_sum / len(test_loader) # 计算平均测试损失
- y_test_preds= np.concatenate(y_test_preds, axis=0) # 将所有批次的预测结果连接成一个数组
- y_test_true = np.concatenate(y_test_true, axis=0) # 将所有批次的真实值连接成一个数组
-
- # 反归一化预测值和真实值
- # scaler.inverse_transform(...):使用之前拟合的 MinMaxScaler 将缩放后的预测值转换回其原始尺度。
- # 以便在原始尺度上评估模型性能
- y_test_preds_unnormalized = scaler.inverse_transform(y_test_preds)
- y_test_true_unnormalized = scaler.inverse_transform(y_test_true.reshape(-1, 1))
-
-
- if test_loss_avg < config.best_loss:
- config.best_loss = test_loss_avg
- torch.save(model.state_dict(), config.save_path)
- print("训练结束")
-
- plt.figure(figsize=(10, 5))
- plt.plot(y_test_true_unnormalized, label='True Values')
- plt.plot(y_test_preds_unnormalized, label='Predictions')
- plt.title('Comparison of True Values and Predictions')
- plt.xlabel('Time Steps')
- plt.ylabel('Prices')
- plt.legend()
- plt.show()
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。