赞
踩
本文意在飞速使用LSTM,在数学建模中能更加快速。数据输入支持一维数据(单变量预测)或者为二维数据(多变量同时预测)。包含置信区间的计算。推荐使用 jupyter,因为可以保存训练步骤,重写画图代码更加便捷。
- def data_basic():
- """2023美赛C:https://www.pancake2021.work/wp-content/uploads/Problem_C_Data_Wordle.xlsx"""
- date, data = Utils.openfile("Problem_C_Data_Wordle.xlsx", data_col=[1], date_col=0)
- return date, data
-
-
- def data01():
- # 测试二维数组多变量
- data1 = np.sin(np.arange(300) * np.pi / 50) + np.random.randn(300) * 0.1
- data2 = np.sin(np.arange(300) * np.pi / 25) + np.random.randn(300) * 0.1 + 3
- return np.c_[data1[:, np.newaxis], data2[:, np.newaxis]]
-
-
- def data02():
- # 测试一维数组
- return np.sin(np.arange(300) * np.pi / 50) + np.random.randn(300) * 0.1
-
-
- def data03():
- # 测试二维数组单变量
- return data02()[:, np.newaxis]
在代码中有4种测试样例,其中 函数输入 比较好理解。为了方便文件读取可以采用
Utils.openfile(file_location_name, data_col=[indexes of data], date_col=index-of-date)
其中 file_location_name 即为文件所在位置,数据需满足第一行是列名或手动更改代码。data_col 即为数据所在的列,可以为字符串数组或者index数组。date_col即为日期所在的列,可以为None即没有。
- window_size = 10
- lstm = CustomLSTM(data, window_size, scale=False)
- # lstm = CustomLSTM(data, window_size, date, scale=True)
window_size 即为 seq2seq 划窗大小。默认以window_size大小的窗口预测下一个数据(并没有写一个窗口预测多个数据)。其中date可无,在画图中用1~n的数字代替。scale即是否使用MinMaxScaler,在训练时可以都做尝试。
lstm.init_lstm(hidden=64, num_layers=1)
即网络的 hidden dim 和 lstm 的num layer。
可以自己在 Model.py 中 LSTM类中增加或减少层 (不建议减少)
- # max_batch 表示是否以整一个数据作为 batch 不做分割
- _ = lstm.fit(num_epochs=2000, lr=0.01, max_batch=True, is_copy=False)
- result = lstm.fit(num_epochs=2000, lr=0.001, max_batch=True, is_copy=True)
- # lstm.train(num_epochs=1000, batch_size=55, max_batch=False)
该模板可以启用batch操作,但是在LSTM中感觉没有必要,直接使用max_batch就行,即用整一个batch进行训练;num_epochs即为循环次数;lr 为 learning rate ;is_copy 表示是否复制结果,这样中心训练的时候可以进行对比。该代码不考虑train test splite操作,可以先用大的lr进行粗训练再降低lr进行训练。
- lstm.slice(20)
- lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
- result = lstm.fit(num_epochs=500, max_batch=True)
- # 预测之前全部数据,是否启用 Dropout
- predict = result.predicted(is_random=False)
- # 打印 summary
- r2, mse, rmse, mae, mape, smape = result.summary()
- # 预测之后 n 步数据,is_random 表示是否启用 Dropou,如果随机的话启用 Dropout 。如果要使用 plot 功能,请保持 n 不变。 若要改变 n 则要从该行重新执行全部以下全部代码
- n = 60
- further_predict = result.predict(n, is_random=False)
- # alpha = 0.05: 0.95置信区间
- alpha = 0.05
-
- # 预测置信区间,函数具有 cache 保存功能
-
- # n: 预测 n 步;n_sample 采样个数
- conf = result.conf_int_sampled(n, alpha=alpha, n_sample=1000)
- further_predict_sampled = result.sampled_result
-
- # 之前(有真实值)的数据的置信区间。下列方法可二选一,推荐使用 conf_predicted_sampled
-
- # 标准方法
- conf_predicted = result.conf_int_predicted(alpha)
- # 采样方法,函数具有 cache 保存功能
- conf_predicted_sampled = result.conf_int_predicted_sampled(alpha=alpha, n_sample=1000)
- predict_sampled = result.sampled_result_
-
- # 需要首先求得置信区间才能 plot_confidence
- # 输出的 conf 均为三维 ndarray 分别代表:变量index,每个变量预测index,(lower mean upper)
采样方法指:启用 Dropout 层运行 n_sample 次得到样本,计算置信区间,同时采样方法得到的平均值来表示预测结果更加符合符合实际。普通方法即运行一次得到的结果。
添加了 bootstrap 求 sample conf。但个人不建议使用。
- result.plot_loss()
- result.plot()
- result.plot_sampled()
-
- # conf_predicted=None/conf_predicted_sampled/conf_predicted . 默认为 conf_predicted_sampled 的数据
- # index 为显示第几个变量,一般单变量写 0 即可
- result.plot_confidence(index=0, alpha=0.05, conf_predicted=conf_predicted)
- result.plot_confidence(index=0, alpha=0.05, conf_predicted=conf_predicted_sampled)
-
- # 该函数是上述所有函数的总结,上述为基本历程,该函数可以让读者自由匹配需要使用哪些数据
- """
- names: Union[List[str], None] 表示每个变量的名字
- index: Union[List[int], None, int] 表示绘制哪几个 index, None 即为全部
- conf_predicted = conf_predicted / conf_predicted_sampled / None 已有数据置信区间
- conf_predicte = conf / None 预测之后数据置信区间
- conf_alpha: float eg: 0.05
- result_ = predict / predict_sampled / None 对已有数据预测的数据
- result = further_predict / further_predict_sampled / None 对之后预测的数据
- """
-
- result.plot_all(names=None, index=None, conf_predicted=conf_predicted, conf_predict=conf, conf_alpha=0.05, result=further_predict_sampled, result_=predict_sampled)
plot_loss() 画 loss 曲线。plot() 画所有变量曲线。plot_confidence(index) 画变量为index的置信区间。
- # 你可以重写画图让图变得好看
- class LSTMResult(LSTMResultBase):
- def __init__(self, resultLSTM: Union[CustomLSTM, LSTMResultBase] = None):
- super().__init__(resultLSTM)
-
- def plot_confidence(self, index=0, alpha=0.05, conf_predicted=None):
- super().plot_confidence(index, alpha, conf_predicted)
-
- def plot_loss(self):
- pass
-
- def plot(self, names=None):
- pass
- # 当你重写 LSTMResult 后,可以进行如下操作
- result = LSTMResult(result)
- lstm = result.resultLSTM
-
- result.plot()
- # 保存模型
- result.save()
- result = LSTMResultBase.load()
- lstm = result.resultLSTM
- """
- File : main.py
- Preferred Python IDE: Jupyter
- """
-
- import numpy as np
- from CustomModel import CustomLSTM
- import Utils
-
-
- def data_basic():
- """2023美赛C:https://www.pancake2021.work/wp-content/uploads/Problem_C_Data_Wordle.xlsx"""
- date, data = Utils.openfile("Problem_C_Data_Wordle.xlsx", data_col=[1], date_col=0)
- return date, data
-
-
- def data01():
- # 测试二维数组多变量
- data1 = np.sin(np.arange(200) * np.pi / 50) + np.random.randn(200) * 0.1
- data2 = np.sin(np.arange(200) * np.pi / 25) + np.random.randn(200) * 0.1 + 3
- return np.c_[data1[:, np.newaxis], data2[:, np.newaxis]]
-
-
- def data02():
- # 测试一维数组
- return np.sin(np.arange(200) * np.pi / 50) + np.random.randn(200) * 0.1
-
-
- def data03():
- # 测试二维数组单变量
- return data02()[:, np.newaxis]
-
-
- # 加载数据,如果要使用 date,date 需要满足 excel 里面日期的格式或自行写代码设置时间戳
- # date, data = data_basic()
- data = data01()
-
- # 初始化网络
- window_size = 10
- lstm = CustomLSTM(data, window_size, scale=False)
- # lstm = CustomLSTM(data, window_size, date, scale=True)
- lstm.init_lstm(hidden=64, num_layers=3)
-
- # 训练网络
- # max_batch 表示是否以整一个数据作为 batch 不做分割
- _ = lstm.fit(num_epochs=2000, lr=0.01, max_batch=True, is_copy=False)
- result = lstm.fit(num_epochs=2000, lr=0.001, max_batch=True, is_copy=True)
- # result = lstm.fit(num_epochs=1000, lr=0.01, batch_size=55, max_batch=False)
-
- # 调整窗口大小重新训练
- # lstm.slice(20)
- # lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
- # lstm.fit(num_epochs=50, batch_size=40, max_batch=False)
-
- # 预测之前全部数据
- predict = result.predicted(is_random=False)
-
- # 打印 summary
- r2, mse, rmse, mae, mape, smape = result.summary()
-
- # 预测之后 n 步数据
- n = 60
- further_predict = result.predict(n)
-
- # 指定 index 变量 n 步预测 1-alpha 置信区间
- alpha = 0.05
- conf_predicted = result.conf_int_predicted(alpha)
- conf_predicted_sampled = result.conf_int_predicted_sampled(alpha=alpha, n_sample=1000)
- conf = result.conf_int_sampled(n, alpha=alpha, n_sample=1000)
-
- # 画图
- result.plot_loss()
- result.plot()
- result.plot_confidence(index=0, alpha=0.05, conf_predicted=None)
- """
- File : CustomModel.py
- """
-
- import numpy as np
- import warnings
- import torch
- import torch.nn as nn
- from sklearn.preprocessing import MinMaxScaler
- from sklearn.utils.validation import check_is_fitted, check_array
- from copy import deepcopy
- from Model import LSTM
-
- warnings.simplefilter("always")
-
- # gpu or cpu
- _device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
-
-
- # 添加可以按列 inverse 的函数
- class MyScaler(MinMaxScaler):
- def __init__(self):
- super().__init__()
-
- def inverse_transform_col(self, x, n_col):
- check_is_fitted(self)
- x = check_array(
- x, copy=self.copy, dtype=(np.float64, np.float32, np.float16), force_all_finite="allow-nan"
- )
- x -= self.min_[n_col]
- x /= self.scale_[n_col]
- return x
-
-
- class CustomLSTM:
- X, Y, data, model, criterion, window, losses, scale = [None] * 8
-
- def __init__(self, data: np.array, window, date=None, scale=False):
- self.data = data
- self.input_dim = self.init_data()
- self.output_dim = self.input_dim
- self.date = date
- self.scale = scale
- # 是否归一化
- if scale:
- self.scaler = MyScaler()
- self.data = self.scaler.fit_transform(self.data)
- self.slice(window)
-
- # data 可为一维数组或二维数组,强制把一维数组转化为二维数组
- def init_data(self):
- assert (length := len(self.data.shape)) in [1, 2]
- if length == 1:
- self.data = self.data[:, np.newaxis]
- return len(self.data[0])
-
- # 已弃用下列函数,改为修改 h0, c0 保证不报错
- # 检查总数据大小是否能整除 batch
- def check_batch(self, batch_size):
- length = self.X.shape[0]
- # 保证 batch_size 比总长度小
- assert length >= batch_size
- if batch_size * (length // batch_size) != length:
- warnings.warn(f'数据大小为{length}, batch大小为{batch_size},无法整除,会损失{(length % batch_size) / length * 100}%数据', DeprecationWarning)
-
- # 重置参数
- def init_param(self):
- self.model, self.criterion, self.losses = [None] * 3
-
- # 以 window 为窗口大小切片形成整个batch
- def slice(self, window):
- self.init_param()
- self.window = window
- x, y = [], []
- for i in range(len(self.data) - window):
- x.append(self.data[i:i + window])
- y.append(self.data[i + window])
- x = np.array(x)
- y = np.array(y)
- x = torch.from_numpy(x).float() # (batch_size, sequence_length, input_size)
- y = torch.from_numpy(y).float()
- print(f"数据格式: X = {x.shape}, Y = {y.shape}")
- self.X = x.to(_device)
- self.Y = y.to(_device)
-
- # 初始化 LSTM model
- def init_lstm(self, hidden=64, num_layers=1):
- self.init_param()
- self.model = LSTM(self.input_dim, hidden, self.output_dim, num_layers).to(_device)
- self.criterion = nn.MSELoss().to(_device)
-
- # 总数据产生 batch 并可以进行 shuffle
- @staticmethod
- def iterate_batches(inputs, targets, batch_size, shuffle=True):
- assert len(inputs) == len(targets)
- indices = np.arange(n := len(inputs))
- if shuffle:
- np.random.shuffle(indices)
- for start_idx in range(0, len(inputs) - batch_size + 1, batch_size):
- excerpt = indices[start_idx:start_idx + batch_size]
- yield inputs[excerpt], targets[excerpt], None
- if (left := n % batch_size) != 0:
- excerpt = indices[-left:]
- yield inputs[excerpt], targets[excerpt], left
-
- # 开始训练
- def fit(self, num_epochs=100, lr=0.01, batch_size=128, max_batch=False, is_copy=False):
- losses = []
- avg_loss = 0
- if self.model is None:
- raise ValueError("请先使用CustomLSTM.init_lstm初始化网络")
- optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
- self.model.train()
- if max_batch:
- batch_size = self.X.shape[0]
- else:
- pass
- # self.check_batch(batch_size)
- for epoch in range(num_epochs):
- loss_all = 0
- self.model.init_hidden(batch_size)
- index = 0
- for index, (batch_x, batch_y, left) in enumerate(CustomLSTM.iterate_batches(self.X, self.Y, batch_size, shuffle=False)):
- optimizer.zero_grad()
- outputs = self.model(batch_x, left)
- loss = self.criterion(outputs, batch_y)
- loss.backward()
- optimizer.step()
- loss_all += loss.detach().cpu()
- losses.append(loss_all / (index + 1))
- avg_loss += loss_all / (index + 1)
- if epoch % 20 == 0:
- print('Epoch [{}/{}], Loss: {:.4f}, AVG Loss of 20 sample: {:.4f}'.format(epoch + 1, num_epochs, loss_all / (index + 1), avg_loss / 20))
- avg_loss = 0
- if self.losses is not None:
- self.losses = self.losses + losses
- else:
- self.losses = losses
- self.model.init_hidden(1)
- from LSTMResult import LSTMResultBase
- return LSTMResultBase(deepcopy(self) if is_copy else self)
- """
- File : LSTMResult.py
- """
-
- import Utils
- import numpy as np
- import warnings
- import inspect
- import torch
- import matplotlib.pyplot as plt
- import pickle
- import time
- from functools import cache
- from RunTime import cal_time
- import CustomModel
-
- warnings.simplefilter("always")
-
- # gpu or cpu
- _device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
-
-
- class LSTMResultBase(object):
- # result 代表预测之后的数据的结果; result_ 代表预测已有真实值的数据的结果
- # sampled result 比 result 更有说服力,克服了 Dropout 随机性问题,更贴近实际,更推荐使用 sampled result 数据。
- result, conf, result_, conf_predicted, conf_predicted_sampled, sampled_result, sampled_result_ = [None] * 7
-
- # 在内存中保存原始训练数据
- def __init__(self, resultLSTM: CustomModel.CustomLSTM = None):
- if isinstance(resultLSTM, CustomModel.CustomLSTM):
- self.resultLSTM = resultLSTM
- elif isinstance(resultLSTM, LSTMResultBase):
- self.resultLSTM = resultLSTM.resultLSTM
- self.copy_param(resultLSTM)
- else:
- warnings.warn(f'输入数据类型:{type(resultLSTM)},非标准数据,请输入 CustomLSTM or LSTMResultBase', DeprecationWarning)
- self.__doc__ = resultLSTM.__doc__
-
- def copy_param(self, parent):
- self.result, self.conf, self.result_, self.conf_predicted, self.conf_predicted_sampled, self.sampled_result, self.sampled_result_ = \
- parent.result, parent.conf, parent.result_, parent.conf_predicted, parent.conf_predicted_sampled, parent.sampled_result, parent.sampled_result_
-
- # 获取变量,不采用拷贝方法
- def __getattribute__(self, item):
- get = lambda name: object.__getattribute__(self, name)
-
- try:
- return get(item)
- except AttributeError:
- pass
-
- try:
- results = get('resultLSTM')
- except AttributeError:
- raise ImportError("CustomLSTM load error")
-
- try:
- attribute = getattr(results, item)
- if inspect.ismethod(attribute):
- warnings.warn(f"调用{attribute},可能出错,请尽量不要在此调用 CustomLSTM", DeprecationWarning)
- return attribute
- except AttributeError:
- AttributeError(f"{item} not in bot CustomLSTM or LSTMResult")
-
- def __dir__(self):
- return [x for x in dir(self.resultLSTM) if not inspect.ismethod(getattr(self.resultLSTM, x))] + dir(LSTMResultBase)
-
- def __getstate__(self):
- return self.__dict__
-
- def __setstate__(self, dict_):
- self.__dict__.update(dict_)
-
- # 预测从 window_size 到结束数据的预测值,以便与真实值做比较
- @cal_time()
- def predicted(self, is_random=False):
- if self.model is None:
- raise ValueError("请先使用CustomLSTM.train")
- if is_random:
- self.model.train()
- else:
- self.model.eval()
- self.model.init_hidden(len(self.X))
- with torch.no_grad():
- predicted = self.model(self.X).to(_device)
- predicted = predicted.detach().cpu().numpy()
- self.result_ = np.array(predicted)
- if self.scale:
- self.result_ = self.scaler.inverse_transform(self.result_)
- return self.result_
-
- # 预测之后 n 天的值
- @cal_time()
- def predict(self, n, is_random=False):
- if self.model is None:
- raise ValueError("请先使用CustomLSTM.train")
- if is_random:
- self.model.train()
- else:
- self.model.eval()
- self.model.init_hidden(1)
- _data = self.data[-self.window:, :].tolist()
- y = []
- with torch.no_grad():
- for _ in range(n):
- x = torch.tensor(_data).float().unsqueeze(0).to(_device)
- _result = self.model(x).detach().cpu().tolist()
- y.append(_result[0])
- _data.append(_result[0])
- _data.pop(0)
- self.result = np.array(y)
- if self.scale:
- self.result = self.scaler.inverse_transform(self.result)
- return self.result
-
- # 保存整个模型
- def save(self, name="./data"):
- with open(name, 'wb') as f:
- pickle.dump(self, f)
-
- # 加载模型
- @staticmethod
- def load(name="./data"):
- with open(name, "rb") as f:
- return pickle.load(f)
-
- # 标准计算已有真实数据的 conf
- def conf_int_predicted(self, alpha=0.05):
- if self.result_ is None:
- raise ValueError("请先使用LSTMResultBase.predicted")
- y_pred = self.result_
- y_true = self.data[self.window:]
- n_param = self.data.shape[1]
- # 变量index,每个变量个数,(lower mean upper)
- conf = np.zeros(shape=(n_param, len(y_true), 3))
- for i in range(n_param):
- conf[i] = Utils.ci(y_true[:, i], y_pred[:, i], alpha=alpha)
- self.conf_predicted = conf
- return conf
-
- # 采用 sample 方法计算已有真实数据的置信区间,推荐使用该方法
- @cache
- def conf_int_predicted_sampled(self, alpha=0.05, n_sample=1000, method=Utils.sample_ci):
- result_ = np.copy(self.result_)
- n = self.data.shape[0] - self.window
- n_param = self.data.shape[1]
- # 变量index,每个变量个数,(lower mean upper)
- conf = np.zeros(shape=(n_param, n, 3))
-
- sample = np.zeros(shape=(n_sample, n, n_param))
- for i in range(n_sample):
- print(f"[{i + 1}/{n_sample}] ", end="")
- sample[i] = self.predicted(is_random=True)
-
- for i in range(n_param):
- conf[i] = method(sample[:, :, i], alpha=alpha)
- # conf[i] = Utils.sample_ci(sample[:, :, i], alpha=alpha)
- # conf[i] = Utils.bootstrap_ci_sampled(sample[:, :, i], alpha=alpha)
-
- self.conf_predicted_sampled = conf
- self.sampled_result_ = conf[:, :, 1].reshape(n_param, n).T
- self.result_ = result_
- return conf
-
- # 预测数据置信区间,采用 sample 方法计算
- @cache
- def conf_int_sampled(self, n, alpha=0.05, n_sample=1000, method=Utils.sample_ci):
- result = np.copy(self.result)
- n_param = self.data.shape[1]
- sample = np.zeros(shape=(n_sample, n, n_param))
- conf = np.zeros(shape=(n_param, n, 3))
- for i in range(n_sample):
- print(f"[{i + 1}/{n_sample}] ", end="")
- sample[i] = self.predict(n, is_random=True)
-
- for i in range(n_param):
- conf[i] = method(sample[:, :, i], alpha=alpha)
- # conf[i] = Utils.sample_ci(sample[:, :, i], alpha=alpha)
- # conf[i] = Utils.bootstrap_ci_sampled(sample[:, :, i], alpha=alpha)
-
- self.conf = conf
- self.sampled_result = conf[:, :, 1].reshape(n_param, n).T
- self.result = result
- return conf
-
- # 绘制 loss 曲线
- def plot_loss(self):
- if self.losses is None:
- raise ValueError("loss 不存在,请先进行训练")
- plt.figure(figsize=(12, 6))
- plt.plot(self.losses)
- plt.xlabel("Epoch")
- plt.ylabel("Loss")
- plt.show()
-
- def plot_all(self, names=None, index=None, conf_predicted=None, conf_predict=None, conf_alpha=None, result=None, result_=None):
- if names is None:
- names = [''] * len(self.data[0])
- x = np.arange(len(self.data))
- x_further = np.arange(len(self.data), len(self.data) + len(self.result))
- if index is None:
- index = range(len(self.data[0]))
- elif isinstance(index, int):
- index = [index]
- conf_name = '{} Confidence interval' if conf_alpha is None else '{} ' + f'{1 - conf_alpha} Confidence interval'
- plt.figure(figsize=(12, 6))
- for i in index:
- y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
- plt.plot(x, y_true, label=f'{names[i]} True Values')
- if result_ is not None:
- plt.plot(x[self.window:], result_[:, i:i + 1], label=f'{names[i]} Predictions')
- if result is not None:
- plt.plot(x_further, result[:, i:i + 1], label=f"{names[i]} Further Predictions")
- if conf_predicted is not None:
- plt.fill_between(x[self.window:], conf_predicted[i, :, 0], conf_predicted[i, :, 2], alpha=0.2, label=conf_name.format(names[i]))
- if conf_predict is not None:
- plt.fill_between(x_further, conf_predict[i, :, 0], conf_predict[i, :, 2], alpha=0.2, label=conf_name.format(names[i]))
-
- self.process_x_label()
- plt.legend()
- plt.show()
-
- # 因为可以为二维数组即对多个变量进行预测,names即为每个变量的名字
- def plot(self, names=None):
- if self.result is None:
- raise ValueError("请先使用LSTMResultBase.predict")
- if self.result_ is None:
- raise ValueError("请先使用LSTMResultBase.predicted")
- if names is None:
- names = [''] * len(self.data[0])
- x = np.arange(len(self.data))
- x_further = np.arange(len(self.data), len(self.data) + len(self.result))
-
- plt.figure(figsize=(12, 6))
- for i in range(len(self.data[0])):
- y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
- plt.plot(x, y_true, label=f'{names[i]} True Values')
- plt.plot(x[self.window:], self.result_[:, i:i + 1], label=f'{names[i]} Predictions')
- plt.plot(x_further, self.result[:, i:i + 1], label=f"{names[i]} Further Predictions")
- self.process_x_label()
- plt.legend()
- plt.show()
-
- def plot_sampled(self, names=None):
- if self.sampled_result is None:
- raise ValueError("请先使用LSTMResultBase.conf_int_sampled")
- if self.sampled_result_ is None:
- raise ValueError("请先使用LSTMResultBase.conf_int_predicted_sampled")
- if names is None:
- names = [''] * len(self.data[0])
- x = np.arange(len(self.data))
- x_further = np.arange(len(self.data), len(self.data) + len(self.result))
-
- plt.figure(figsize=(12, 6))
- for i in range(len(self.data[0])):
- y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
- plt.plot(x, y_true, label=f'{names[0]} True Values')
- plt.plot(x[self.window:], self.sampled_result_[:, i:i + 1], label=f'{names[0]} Predictions')
- plt.plot(x_further, self.sampled_result[:, i:i + 1], label=f"{names[0]} Further Predictions")
- self.process_x_label()
- plt.legend()
- plt.show()
-
- # 把 x 轴设置为时间
- def process_x_label(self):
- if self.date is None:
- return
- begin = self.date[0]
- end = self.date[-1]
- between = (end - begin) // (len(self.date) - 1)
- n = len(self.result)
- length = len(self.data) + n
- gap = length // 5
- # "%Y-%m-%d %H:%M:%S"
- plt.xticks(np.arange(0, length, gap), [time.strftime("%Y-%m-%d", time.localtime(i)) for i in range(begin, end + between * n + 1, between * gap)],
- rotation=45)
- plt.tight_layout()
-
- # 对单个变量画置信区间
- def plot_confidence(self, index=0, alpha=0.05, conf_predicted=None):
- if self.result is None:
- raise ValueError("请先使用LSTMResultBase.predict")
- if self.result_ is None:
- raise ValueError("请先使用LSTMResultBase.predicted")
- if conf_predicted is None:
- try:
- conf_predicted = self.conf_predicted_sampled[index]
- except IndexError:
- try:
- conf_predicted = self.conf_predicted[index]
- except IndexError:
- raise ValueError("请先使用LSTMResultBase.conf_int_predicted_sampled/conf_int_predicted")
- else:
- conf_predicted = conf_predicted[index]
- try:
- conf = self.conf[index]
- except IndexError:
- raise ValueError("请先使用LSTMResultBase.conf_int_predicted")
-
- plt.figure(figsize=(12, 6))
- x = np.arange(len(self.data))
- n = len(self.result)
- x_further = np.arange(len(self.data), len(self.data) + n)
- y_true = self.data[:, index: index + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, index: index + 1], index)
-
- plt.plot(x, y_true, label='True Values')
- plt.plot(x[self.window:], conf_predicted[:, 1], label='Predictions')
- plt.plot(x_further, conf[:, 1], label="Further Predictions")
-
- plt.fill_between(x[self.window:], conf_predicted[:, 0], conf_predicted[:, 2], alpha=0.2, label=f'{1 - alpha} Confidence interval')
- plt.fill_between(x_further, conf[:, 0], conf[:, 2], alpha=0.2, label=f'{1 - alpha} Confidence interval')
-
- self.process_x_label()
- plt.legend()
- plt.show()
-
- # 打印 summary, 即 r^2 评价函数等
- def summary(self):
- if self.result_ is None:
- raise ValueError("请先使用LSTMResultBase.predicted")
- data = self.data[self.window:, :]
- if self.scale:
- data = self.scaler.inverse_transform(data)
- print("==========Summary Begin===========")
- print("R2 =", score_r2 := Utils.r2(self.result_, data))
- print("MSE =", score_mse := Utils.mse(self.result_, data))
- print("RMSE =", score_rmse := np.sqrt(score_mse))
- print("MAE =", score_mae := Utils.mae(self.result_, data))
- print("MAPE =", score_mape := Utils.mape(self.result_, data, replace=self.scale))
- print("SMAPE =", score_smape := Utils.smape(self.result_, data))
- print(f"Average is {score_r2.mean()} {score_mse.mean()} {score_rmse.mean()} {score_mae.mean()} {score_mape.mean()} {score_smape.mean()}")
- print("===========Summary end============")
- return score_r2, score_mse, score_rmse, score_mae, score_mape, score_smape
- """
- File : Model.py
- """
-
- import torch
- import torch.nn as nn
-
- # gpu or cpu
- _device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
-
-
- # 定义LSTM模型
- class LSTM(nn.Module):
- hidden = None
-
- def __init__(self, input_size, hidden_size, output_size, num_layers):
- super().__init__()
- self.hidden_size = hidden_size
- self.num_layers = num_layers
- # As batch_first=True, input: (batch_size, sequence_length, input_size)
- self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
- self.lstm2 = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
- self.out = nn.Sequential(
- nn.Linear(hidden_size, hidden_size),
- nn.ReLU(),
- nn.Dropout(0.5),
- nn.Linear(hidden_size, output_size)
- )
- # 可选操作,可以把下一行注释
- self.apply(LSTM.init_weights)
-
- def forward(self, x, left=None):
- self.lstm.flatten_parameters()
- self.lstm2.flatten_parameters()
- # 防止 loss.backward 报错
- hidden = [each.data for each in self.hidden] if left is None else [each.data[:, :left, :] for each in self.hidden]
- x, hidden = self.lstm(x, hidden)
- x, self.hidden = self.lstm2(x, hidden)
- x = x[:, -1, :]
- x = self.out(x)
- return x
-
- # (h0, c0)
- def init_hidden(self, batch_size):
- weight = next(self.parameters()).data
- self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(_device),
- weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(_device))
-
- @staticmethod
- def init_weights(m):
- if type(m) == nn.LSTM:
- for name, param in m.named_parameters():
- if 'weight_ih' in name:
- torch.nn.init.orthogonal_(param.data)
- elif 'weight_hh' in name:
- torch.nn.init.orthogonal_(param.data)
- elif 'bias' in name:
- param.data.fill_(0)
- elif type(m) == nn.Conv1d or type(m) == nn.Linear:
- torch.nn.init.orthogonal_(m.weight)
- m.bias.data.fill_(0)
- """
- File : RunTime.py
- """
-
- import time
- from functools import wraps
-
-
- # 用于计算函数运行时间,自定义修饰器,带有ms/s/min/hour/day单位
- def cal_time(unit='s'):
- units = ['ms', 's', 'min', 'hour', 'day']
- times = [1000, 1, 1 / 60, 1 / 3600, 1 / 3600 / 24]
- try:
- time_process = times[units.index(unit.lower())]
- except Exception as e:
- raise e
-
- def input_func(func):
-
- @wraps(func)
- def wrap(*args, **kwargs):
- begin_time = time.time()
- ans = func(*args, **kwargs)
- print(func.__name__, f"用时 {(time.time() - begin_time) * time_process} {unit}")
- return ans
-
- return wrap
-
- return input_func
- """
- File : Utils.py
- """
-
- import numpy as np
- import pandas as pd
- import scipy.stats as st
- from typing import Union, List
-
-
- # 置信区间
-
- def ci(y_true: np.ndarray, y_pred: np.ndarray, alpha: float = 0.05, use_t: bool = True) -> np.ndarray:
- residuals = y_true - y_pred
- n = len(residuals)
- if use_t:
- dist = st.t
- df = n - 1
- t_value = dist.ppf(1 - alpha / 2, df)
- else:
- dist = st.norm
- t_value = dist.ppf(1 - alpha / 2)
- if y_true.ndim == 1:
- std_err = np.std(residuals, ddof=1) / np.sqrt(n)
- else:
- std_err = np.std(residuals, ddof=1, axis=0) / np.sqrt(n)
- err = t_value * std_err
- return np.c_[y_pred - err, y_pred, y_pred + err]
-
-
- # sample 置信区间
-
- def sample_ci(sample: np.ndarray, alpha: float = 0.05, use_t: bool = True) -> np.ndarray:
- n = len(sample)
- if use_t:
- dist = st.t
- df = n - 1
- t_value = dist.ppf(1 - alpha / 2, df)
- else:
- dist = st.norm
- t_value = dist.ppf(1 - alpha / 2)
- means = sample.mean(axis=0)
- stds = sample.std(axis=0, ddof=1)
- lower = means - t_value * stds / np.sqrt(n)
- upper = means + t_value * stds / np.sqrt(n)
- return np.c_[lower, means, upper]
-
-
- # sample bootstrap 置信区间 sample: (n_sample, n)
-
- def bootstrap_ci_sampled(sample: np.ndarray, alpha: float = 0.05, n_bootstraps: int = 1000) -> np.ndarray:
- n = len(sample[0])
- sample_size = len(sample)
- lower = np.zeros(shape=(n,))
- means = np.zeros(shape=(n,))
- upper = np.zeros(shape=(n,))
- for i in range(n):
- bootstrap_sample = np.random.choice(sample[:, i], size=(sample_size, n_bootstraps), replace=True)
- bootstrap_means = np.mean(bootstrap_sample, axis=1)
- lower[i] = np.percentile(bootstrap_means, 100 * alpha / 2)
- upper[i] = np.percentile(bootstrap_means, 100 * (1 - alpha / 2))
- means[i] = np.mean(bootstrap_means)
- return np.c_[lower, means, upper]
-
-
- # 输入均为二维 numpy , 数据产生在列,输出一维 numpy 评价 value
-
- def r2(y_pred, y_true):
- return 1 - ((y_pred - y_true) ** 2).sum(axis=0) / ((y_true.mean(axis=0) - y_true) ** 2).sum(axis=0)
-
-
- def mse(y_pred, y_true):
- return ((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred)
-
-
- def rmse(y_pred, y_true):
- return np.sqrt(((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred))
-
-
- def mae(y_pred, y_true):
- return (np.absolute(y_true - y_pred)).sum(axis=0) / len(y_true)
-
-
- def mape(y_pred, y_true, replace=False):
- # 防止 y_true 含 0
- if replace:
- y_true = np.copy(y_true)
- y_true[y_true == 0] = 1
- return (np.abs((y_pred - y_true) / y_true)).mean(axis=0) * 100
-
-
- def smape(y_pred, y_true):
- return 2.0 * (np.abs(y_pred - y_true) / (np.abs(y_pred) + np.abs(y_true))).mean(axis=0) * 100
-
-
- # 添加快捷打开文件操作
-
- def openfile(name: str, data_col: Union[None, List[str], List[int]] = None, date_col: Union[None, int] = None) -> (np.ndarray, np.ndarray):
- file_type = name.split(".")[-1]
- if file_type == "csv":
- df = pd.read_csv(name, encoding='GBK')
- elif file_type == "xlsx" or file_type == "xls":
- df = pd.read_excel(name)
- else:
- raise TypeError(f"{name} 类型不是 csv, xls, xlsx")
- # df = df[["列名字1", "列名字2"]]
- df_data = df.iloc[:, data_col] if data_col is not None else df
- date = df.iloc[:, date_col].astype(
- 'int64') // 1e9 if date_col is not None else None
- return np.array(date, dtype=int), np.array(df_data)
2023-4-10:修改了 bootstrap错误;优化了dir(LSTMResultBase);添加plot_all 自定义函数;修复了错误 warnings 不显示;对 sample 和 single 数据分离;取消 batch_size 检测改为修改 h0, c0;把 predicted 放入 resultbase 中;修复了 save 数据丢失问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。