赞
踩
看到网上一个个代码都要钱,自己写了个LSTM分享一下,新手写的代码,有问题轻喷。。。
主程序,文件名随便
- import torch
- import time
- import pandas as pd
- import numpy as np
- import torch.nn as nn
- from sklearn.preprocessing import MinMaxScaler
-
- from func import setup_seed, sliding_window, cmpt_error # 这部分自己写的函数
-
-
- # LSTM
- class LSTM(nn.Module):
- def __init__(self, input_size, hidden_size, output_size):
- super().__init__()
- self.hidden_size = hidden_size
- self.lstm = nn.LSTM(input_size, hidden_size) # 默认单层LSTM
- self.fc = nn.Linear(hidden_size, output_size)
-
- def forward(self, x):
- out = self.lstm(x)
- out = self.fc(out[:, -1, :])
- return out
-
-
- # 设置种子,保证预测精度可复现
- # setup_seed(10)
-
- # 读取数据
- df = pd.read_csv("data.csv", parse_dates=["timestamp"])
-
- # 数据集划分
- boundary_date = pd.to_datetime("2020-12-31 23:00:00") # 2017~2020年为训练集,2020~2021年为测试集
- mask = df["timestamp"] <= boundary_date
- train = df.loc[mask].iloc[:, 1:] # 得到训练集,用训练集作为归一化模板
-
- # 归一化
- scaler = MinMaxScaler()
- scaler_train = MinMaxScaler()
- scaler.fit(train)
- scaler_train.fit(train.iloc[:, :1])
- normalized_data = scaler.transform(df.iloc[:, 1:]) # 用训练集作模板归一化整个数据集
-
- # 基础参数设置
- time_step = 30 # 时间步长,就是利用多少组历史数据进行预测
- forecast_step = 1 # 预测步长,即预测未来第几步的数据
- feature_size = 6 # 输入特征数
-
- # 构造训练集和测试集
- [train_input, train_output, test_input, test_output] = sliding_window(normalized_data, len(train), time_step,
- forecast_step, feature_size,
- sample_feature_compression=False)
-
- # 输入、输出维度
- input_dim = len(train_input[0, 0, :])
- output_dim = 1
- hidden_dim = 20 # 炼丹
- # 设置默认张量类型,否则会因为类型不同报错,因为ndarray默认为float64,tensor默认为float32
- torch.set_default_tensor_type(torch.DoubleTensor)
-
- # 使用GPU运行
- device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
-
- # 转换为tensor
- train_inputs_tensor = torch.from_numpy(train_input).to(device)
- labels = torch.from_numpy(train_output).to(device)
- test_inputs_tensor = torch.from_numpy(test_input).to(device)
-
- # 指定参数和损失函数
- epochs = 5000 # 迭代次数
- learning_rate = 0.003 # 学习率
-
- # 多次运行,方便求误差平均值
- train_prediction_set = []
- prediction_set = []
- error = []
- start = time.perf_counter() # 运行开始时间
- # 多次运行取平均值
- multi_times = 1 # 运行次数
- for times in range(multi_times):
- # 输入、输出神经元数为input_dim、output_dim,隐含层神经元数为hidden_dim
- model = LSTM(input_dim, hidden_dim, output_dim).to(device)
- if times == 0:
- print(model) # 查看神经网络模型
- # 指定优化器为Adam,优化目标为model的参数,给定学习率
- optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
- criterion = nn.MSELoss() # 损失函数
- # 训练模型
- train_predicted = 0 # 用来保存训练集预测数据
- for epoch in range(epochs):
- # 迭代梯度清零
- optimizer.zero_grad()
- # 前向传播
- train_outputs_tensor = model(train_inputs_tensor)
- # 计算损失
- loss = criterion(train_outputs_tensor, labels)
- # 反向传播
- loss.backward()
- # 更新权重参数
- optimizer.step()
- # 每500次训练输出一次损失值
- if (epoch + 1) % 100 == 0:
- print(f'epoch {epoch + 1}, loss {loss}')
- if epoch == epochs - 1:
- train_predicted = train_outputs_tensor.detach().cpu().numpy()
-
- # 预测结果
- predicted = model(test_inputs_tensor)[0].detach().cpu().numpy()
- # 逆缩放
- train_predicted = scaler_train.inverse_transform(train_predicted) # 训练集预测数据
- predicted = scaler_train.inverse_transform(predicted) # 预测值
- target = scaler_train.inverse_transform(test_output) # 目标值
- # 计算误差
- error.append(cmpt_error(predicted, target))
- # 保存每次预测结果
- train_prediction_set.append(train_predicted)
- prediction_set.append(predicted)
-
- end = time.perf_counter() # 运行结束时间
- runTime = end - start
- print("Run time: ", runTime) # 输出运行时间
-
- # 数据排序
- train_prediction_set = np.array(train_prediction_set)[:, :, 0].T
- prediction_set = np.array(prediction_set)[:, :, 0].T
- error = np.array(error).T
- prediction_set = np.vstack([train_prediction_set, prediction_set])
- error_prediction = pd.DataFrame(np.vstack([error, prediction_set])) # 将误差和预测数据堆叠起来,方便排序
- error_prediction = error_prediction.sort_values(by=2, axis=1) # NRMSE在第三行,以NRMSE从小到大排序
-
- # 保存数据
- # error_prediction.iloc[3:, :]是因为前三行是误差,如果用了更多的误差指标记得修改
- prediction_set = pd.DataFrame(np.array(error_prediction.iloc[3:, :]), columns=[i for i in range(1, multi_times + 1)])
- error = pd.DataFrame(np.array(error_prediction.iloc[:3, :]), columns=[i for i in range(1, multi_times + 1)],
- index=['MAE', 'RMSE', 'NRMSE'])
- prediction_set.to_excel('LSTM.xlsx', index=False, sheet_name='LSTM')
- with pd.ExcelWriter('LSTM.xlsx', mode='a', engine='openpyxl') as writer:
- error.to_excel(writer, sheet_name='error')
文件名error_calculation.py
- import math
-
-
- def mae(predicted, target):
- """计算平均绝对误差MAE。"""
- return (abs(target - predicted)).mean()
-
-
- def mse(predicted, target):
- """计算均方误差MSE。"""
- return ((target - predicted) ** 2).mean()
-
-
- def rmse(predicted, target):
- """计算均方根误差RMSE。"""
- return math.sqrt(mse(predicted, target))
-
-
- def nrmse(predicted, target):
- """计算正规化均方根误差NRMSE,采用极差(最大和最小值之差)来正规化。"""
- return rmse(predicted, target) / (target.max() - target.min())
文件名func.py
- import torch
- import random
- import numpy as np
- import pandas as pd
-
- from error_calculation import mae, rmse, nrmse
-
-
- def setup_seed(seed):
- """设置随机数种子,保证每次运行结果相同"""
- torch.manual_seed(seed)
- torch.cuda.manual_seed_all(seed)
- np.random.seed(seed)
- random.seed(seed)
- # torch.backends.cudnn.deterministic = True # 本行对精度影响不大,但会明显降低运行效率,不需要高精度的话可以注释掉
-
-
- def sliding_window(normalized_data, train_length, time_step, forecast_step, feature_size=1,
- sample_feature_compression=True):
- """用滑动窗口将标准化数据集的样本划分为训练集和测试集,sample_feature_compression是选择是否将这个时间步长的特征压缩为向量"""
- inputs = []
- outputs = []
- for i in range(len(normalized_data) - time_step - forecast_step + 1): # 构造的数据集长度可以该式计算得到
- package = []
- # 将不同特征打包
- for j in range(feature_size):
- package.append(normalized_data[i:i + time_step][:, j])
- # 构造输入和输出,将整个时间步长的数据保存到input中,将未来的数据保存到output中
- if sample_feature_compression:
- inputs.append(np.array(package).reshape(1, -1)[0, :])
- else:
- inputs.append(np.array(package).T)
- outputs.append(normalized_data[i + time_step][0])
- inputs = np.array(inputs)
- outputs = np.array(outputs).reshape(-1, 1)
- # 划分训练集和测试集
- train_input = inputs[:train_length - time_step - forecast_step + 1]
- train_output = outputs[:train_length - time_step - forecast_step + 1]
- test_input = inputs[train_length - time_step - forecast_step + 1:]
- test_output = outputs[train_length - time_step - forecast_step + 1:]
-
- return [train_input, train_output, test_input, test_output]
-
-
- def cmpt_error(predicted, target):
- """对比校正值和标准值,并输出误差"""
- # 对比校正值和标准值
- contrast = pd.DataFrame(np.hstack((predicted, target)), columns=['预测值', '目标值'])
- print(contrast)
- # 输出误差
- mae1 = mae(predicted, target)
- rmse1 = rmse(predicted, target)
- nrmse1 = nrmse(predicted, target)
-
- print('预测MAE误差:', mae1)
- print('预测RMSE误差:', rmse1)
- print(f'预测NRMSE误差:{"%.2f" % (nrmse1 * 100)}%')
-
- return [mae1, rmse1, nrmse1]
东西丢一个文件夹,运行主程序就行,不过光伏输出功率每日的变化较大,误差也不小。至于data.csv上传了,设定的0积分下载,不知道大家能不能免费下载。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。