当前位置:   article > 正文

LSTM 易用代码 (pytorch)_pytorch lstm代码

pytorch lstm代码

本文意在飞速使用LSTM,在数学建模中能更加快速。数据输入支持一维数据(单变量预测)或者为二维数据(多变量同时预测)。包含置信区间的计算。推荐使用 jupyter,因为可以保存训练步骤,重写画图代码更加便捷。

完整代码下载链接

数据输入 api
  1. def data_basic():
  2. """2023美赛C:https://www.pancake2021.work/wp-content/uploads/Problem_C_Data_Wordle.xlsx"""
  3. date, data = Utils.openfile("Problem_C_Data_Wordle.xlsx", data_col=[1], date_col=0)
  4. return date, data
  5. def data01():
  6. # 测试二维数组多变量
  7. data1 = np.sin(np.arange(300) * np.pi / 50) + np.random.randn(300) * 0.1
  8. data2 = np.sin(np.arange(300) * np.pi / 25) + np.random.randn(300) * 0.1 + 3
  9. return np.c_[data1[:, np.newaxis], data2[:, np.newaxis]]
  10. def data02():
  11. # 测试一维数组
  12. return np.sin(np.arange(300) * np.pi / 50) + np.random.randn(300) * 0.1
  13. def data03():
  14. # 测试二维数组单变量
  15. return data02()[:, np.newaxis]

在代码中有4种测试样例,其中 函数输入 比较好理解。为了方便文件读取可以采用

Utils.openfile(file_location_name, data_col=[indexes of data], date_col=index-of-date)

其中 file_location_name 即为文件所在位置,数据需满足第一行是列名或手动更改代码。data_col 即为数据所在的列,可以为字符串数组或者index数组。date_col即为日期所在的列,可以为None即没有。

初始化网络
  1. window_size = 10
  2. lstm = CustomLSTM(data, window_size, scale=False)
  3. # lstm = CustomLSTM(data, window_size, date, scale=True)

window_size 即为 seq2seq 划窗大小。默认以window_size大小的窗口预测下一个数据(并没有写一个窗口预测多个数据)。其中date可无,在画图中用1~n的数字代替。scale即是否使用MinMaxScaler,在训练时可以都做尝试。

lstm.init_lstm(hidden=64, num_layers=1)

即网络的 hidden dim 和 lstm 的num layer。

可以自己在 Model.py 中 LSTM类中增加或减少层 (不建议减少)

训练网络
  1. # max_batch 表示是否以整一个数据作为 batch 不做分割
  2. _ = lstm.fit(num_epochs=2000, lr=0.01, max_batch=True, is_copy=False)
  3. result = lstm.fit(num_epochs=2000, lr=0.001, max_batch=True, is_copy=True)
  4. # lstm.train(num_epochs=1000, batch_size=55, max_batch=False)

该模板可以启用batch操作,但是在LSTM中感觉没有必要,直接使用max_batch就行,即用整一个batch进行训练;num_epochs即为循环次数;lr 为 learning rate ;is_copy 表示是否复制结果,这样中心训练的时候可以进行对比。该代码不考虑train test splite操作,可以先用大的lr进行粗训练再降低lr进行训练。

调整窗口大小重新训练
  1. lstm.slice(20)
  2. lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
  3. result = lstm.fit(num_epochs=500, max_batch=True)
预测有真实值的数据
  1. # 预测之前全部数据,是否启用 Dropout
  2. predict = result.predicted(is_random=False)
Summary
  1. # 打印 summary
  2. r2, mse, rmse, mae, mape, smape = result.summary()
Predict
  1. # 预测之后 n 步数据,is_random 表示是否启用 Dropou,如果随机的话启用 Dropout 。如果要使用 plot 功能,请保持 n 不变。 若要改变 n 则要从该行重新执行全部以下全部代码
  2. n = 60
  3. further_predict = result.predict(n, is_random=False)
指定 index 变量 n 步预测 1-alpha 置信区间
  1. # alpha = 0.05: 0.95置信区间
  2. alpha = 0.05
  3. # 预测置信区间,函数具有 cache 保存功能
  4. # n: 预测 n 步;n_sample 采样个数
  5. conf = result.conf_int_sampled(n, alpha=alpha, n_sample=1000)
  6. further_predict_sampled = result.sampled_result
  7. # 之前(有真实值)的数据的置信区间。下列方法可二选一,推荐使用 conf_predicted_sampled
  8. # 标准方法
  9. conf_predicted = result.conf_int_predicted(alpha)
  10. # 采样方法,函数具有 cache 保存功能
  11. conf_predicted_sampled = result.conf_int_predicted_sampled(alpha=alpha, n_sample=1000)
  12. predict_sampled = result.sampled_result_
  13. # 需要首先求得置信区间才能 plot_confidence
  14. # 输出的 conf 均为三维 ndarray 分别代表:变量index,每个变量预测index,(lower mean upper)

采样方法指:启用 Dropout 层运行 n_sample 次得到样本,计算置信区间,同时采样方法得到的平均值来表示预测结果更加符合符合实际。普通方法即运行一次得到的结果。

添加了 bootstrap 求 sample conf。但个人不建议使用。

画图
  1. result.plot_loss()
  2. result.plot()
  3. result.plot_sampled()
  4. # conf_predicted=None/conf_predicted_sampled/conf_predicted . 默认为 conf_predicted_sampled 的数据
  5. # index 为显示第几个变量,一般单变量写 0 即可
  6. result.plot_confidence(index=0, alpha=0.05, conf_predicted=conf_predicted)
  7. result.plot_confidence(index=0, alpha=0.05, conf_predicted=conf_predicted_sampled)
  8. # 该函数是上述所有函数的总结,上述为基本历程,该函数可以让读者自由匹配需要使用哪些数据
  9. """
  10. names: Union[List[str], None] 表示每个变量的名字
  11. index: Union[List[int], None, int] 表示绘制哪几个 index, None 即为全部
  12. conf_predicted = conf_predicted / conf_predicted_sampled / None 已有数据置信区间
  13. conf_predicte = conf / None 预测之后数据置信区间
  14. conf_alpha: float eg: 0.05
  15. result_ = predict / predict_sampled / None 对已有数据预测的数据
  16. result = further_predict / further_predict_sampled / None 对之后预测的数据
  17. """
  18. result.plot_all(names=None, index=None, conf_predicted=conf_predicted, conf_predict=conf, conf_alpha=0.05, result=further_predict_sampled, result_=predict_sampled)

plot_loss() 画 loss 曲线。plot() 画所有变量曲线。plot_confidence(index) 画变量为index的置信区间。

可以重写画图,让图更加好看
  1. # 你可以重写画图让图变得好看
  2. class LSTMResult(LSTMResultBase):
  3. def __init__(self, resultLSTM: Union[CustomLSTM, LSTMResultBase] = None):
  4. super().__init__(resultLSTM)
  5. def plot_confidence(self, index=0, alpha=0.05, conf_predicted=None):
  6. super().plot_confidence(index, alpha, conf_predicted)
  7. def plot_loss(self):
  8. pass
  9. def plot(self, names=None):
  10. pass
重新画图
  1. # 当你重写 LSTMResult 后,可以进行如下操作
  2. result = LSTMResult(result)
  3. lstm = result.resultLSTM
  4. result.plot()
保存数据和模型
  1. # 保存模型
  2. result.save()
加载保存数据
  1. result = LSTMResultBase.load()
  2. lstm = result.resultLSTM

整体代码

  1. """
  2. File : main.py
  3. Preferred Python IDE: Jupyter
  4. """
  5. import numpy as np
  6. from CustomModel import CustomLSTM
  7. import Utils
  8. def data_basic():
  9. """2023美赛C:https://www.pancake2021.work/wp-content/uploads/Problem_C_Data_Wordle.xlsx"""
  10. date, data = Utils.openfile("Problem_C_Data_Wordle.xlsx", data_col=[1], date_col=0)
  11. return date, data
  12. def data01():
  13. # 测试二维数组多变量
  14. data1 = np.sin(np.arange(200) * np.pi / 50) + np.random.randn(200) * 0.1
  15. data2 = np.sin(np.arange(200) * np.pi / 25) + np.random.randn(200) * 0.1 + 3
  16. return np.c_[data1[:, np.newaxis], data2[:, np.newaxis]]
  17. def data02():
  18. # 测试一维数组
  19. return np.sin(np.arange(200) * np.pi / 50) + np.random.randn(200) * 0.1
  20. def data03():
  21. # 测试二维数组单变量
  22. return data02()[:, np.newaxis]
  23. # 加载数据,如果要使用 date,date 需要满足 excel 里面日期的格式或自行写代码设置时间戳
  24. # date, data = data_basic()
  25. data = data01()
  26. # 初始化网络
  27. window_size = 10
  28. lstm = CustomLSTM(data, window_size, scale=False)
  29. # lstm = CustomLSTM(data, window_size, date, scale=True)
  30. lstm.init_lstm(hidden=64, num_layers=3)
  31. # 训练网络
  32. # max_batch 表示是否以整一个数据作为 batch 不做分割
  33. _ = lstm.fit(num_epochs=2000, lr=0.01, max_batch=True, is_copy=False)
  34. result = lstm.fit(num_epochs=2000, lr=0.001, max_batch=True, is_copy=True)
  35. # result = lstm.fit(num_epochs=1000, lr=0.01, batch_size=55, max_batch=False)
  36. # 调整窗口大小重新训练
  37. # lstm.slice(20)
  38. # lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
  39. # lstm.fit(num_epochs=50, batch_size=40, max_batch=False)
  40. # 预测之前全部数据
  41. predict = result.predicted(is_random=False)
  42. # 打印 summary
  43. r2, mse, rmse, mae, mape, smape = result.summary()
  44. # 预测之后 n 步数据
  45. n = 60
  46. further_predict = result.predict(n)
  47. # 指定 index 变量 n 步预测 1-alpha 置信区间
  48. alpha = 0.05
  49. conf_predicted = result.conf_int_predicted(alpha)
  50. conf_predicted_sampled = result.conf_int_predicted_sampled(alpha=alpha, n_sample=1000)
  51. conf = result.conf_int_sampled(n, alpha=alpha, n_sample=1000)
  52. # 画图
  53. result.plot_loss()
  54. result.plot()
  55. result.plot_confidence(index=0, alpha=0.05, conf_predicted=None)
  1. """
  2. File : CustomModel.py
  3. """
  4. import numpy as np
  5. import warnings
  6. import torch
  7. import torch.nn as nn
  8. from sklearn.preprocessing import MinMaxScaler
  9. from sklearn.utils.validation import check_is_fitted, check_array
  10. from copy import deepcopy
  11. from Model import LSTM
  12. warnings.simplefilter("always")
  13. # gpu or cpu
  14. _device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  15. # 添加可以按列 inverse 的函数
  16. class MyScaler(MinMaxScaler):
  17. def __init__(self):
  18. super().__init__()
  19. def inverse_transform_col(self, x, n_col):
  20. check_is_fitted(self)
  21. x = check_array(
  22. x, copy=self.copy, dtype=(np.float64, np.float32, np.float16), force_all_finite="allow-nan"
  23. )
  24. x -= self.min_[n_col]
  25. x /= self.scale_[n_col]
  26. return x
  27. class CustomLSTM:
  28. X, Y, data, model, criterion, window, losses, scale = [None] * 8
  29. def __init__(self, data: np.array, window, date=None, scale=False):
  30. self.data = data
  31. self.input_dim = self.init_data()
  32. self.output_dim = self.input_dim
  33. self.date = date
  34. self.scale = scale
  35. # 是否归一化
  36. if scale:
  37. self.scaler = MyScaler()
  38. self.data = self.scaler.fit_transform(self.data)
  39. self.slice(window)
  40. # data 可为一维数组或二维数组,强制把一维数组转化为二维数组
  41. def init_data(self):
  42. assert (length := len(self.data.shape)) in [1, 2]
  43. if length == 1:
  44. self.data = self.data[:, np.newaxis]
  45. return len(self.data[0])
  46. # 已弃用下列函数,改为修改 h0, c0 保证不报错
  47. # 检查总数据大小是否能整除 batch
  48. def check_batch(self, batch_size):
  49. length = self.X.shape[0]
  50. # 保证 batch_size 比总长度小
  51. assert length >= batch_size
  52. if batch_size * (length // batch_size) != length:
  53. warnings.warn(f'数据大小为{length}, batch大小为{batch_size},无法整除,会损失{(length % batch_size) / length * 100}%数据', DeprecationWarning)
  54. # 重置参数
  55. def init_param(self):
  56. self.model, self.criterion, self.losses = [None] * 3
  57. # 以 window 为窗口大小切片形成整个batch
  58. def slice(self, window):
  59. self.init_param()
  60. self.window = window
  61. x, y = [], []
  62. for i in range(len(self.data) - window):
  63. x.append(self.data[i:i + window])
  64. y.append(self.data[i + window])
  65. x = np.array(x)
  66. y = np.array(y)
  67. x = torch.from_numpy(x).float() # (batch_size, sequence_length, input_size)
  68. y = torch.from_numpy(y).float()
  69. print(f"数据格式: X = {x.shape}, Y = {y.shape}")
  70. self.X = x.to(_device)
  71. self.Y = y.to(_device)
  72. # 初始化 LSTM model
  73. def init_lstm(self, hidden=64, num_layers=1):
  74. self.init_param()
  75. self.model = LSTM(self.input_dim, hidden, self.output_dim, num_layers).to(_device)
  76. self.criterion = nn.MSELoss().to(_device)
  77. # 总数据产生 batch 并可以进行 shuffle
  78. @staticmethod
  79. def iterate_batches(inputs, targets, batch_size, shuffle=True):
  80. assert len(inputs) == len(targets)
  81. indices = np.arange(n := len(inputs))
  82. if shuffle:
  83. np.random.shuffle(indices)
  84. for start_idx in range(0, len(inputs) - batch_size + 1, batch_size):
  85. excerpt = indices[start_idx:start_idx + batch_size]
  86. yield inputs[excerpt], targets[excerpt], None
  87. if (left := n % batch_size) != 0:
  88. excerpt = indices[-left:]
  89. yield inputs[excerpt], targets[excerpt], left
  90. # 开始训练
  91. def fit(self, num_epochs=100, lr=0.01, batch_size=128, max_batch=False, is_copy=False):
  92. losses = []
  93. avg_loss = 0
  94. if self.model is None:
  95. raise ValueError("请先使用CustomLSTM.init_lstm初始化网络")
  96. optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
  97. self.model.train()
  98. if max_batch:
  99. batch_size = self.X.shape[0]
  100. else:
  101. pass
  102. # self.check_batch(batch_size)
  103. for epoch in range(num_epochs):
  104. loss_all = 0
  105. self.model.init_hidden(batch_size)
  106. index = 0
  107. for index, (batch_x, batch_y, left) in enumerate(CustomLSTM.iterate_batches(self.X, self.Y, batch_size, shuffle=False)):
  108. optimizer.zero_grad()
  109. outputs = self.model(batch_x, left)
  110. loss = self.criterion(outputs, batch_y)
  111. loss.backward()
  112. optimizer.step()
  113. loss_all += loss.detach().cpu()
  114. losses.append(loss_all / (index + 1))
  115. avg_loss += loss_all / (index + 1)
  116. if epoch % 20 == 0:
  117. print('Epoch [{}/{}], Loss: {:.4f}, AVG Loss of 20 sample: {:.4f}'.format(epoch + 1, num_epochs, loss_all / (index + 1), avg_loss / 20))
  118. avg_loss = 0
  119. if self.losses is not None:
  120. self.losses = self.losses + losses
  121. else:
  122. self.losses = losses
  123. self.model.init_hidden(1)
  124. from LSTMResult import LSTMResultBase
  125. return LSTMResultBase(deepcopy(self) if is_copy else self)
  1. """
  2. File : LSTMResult.py
  3. """
  4. import Utils
  5. import numpy as np
  6. import warnings
  7. import inspect
  8. import torch
  9. import matplotlib.pyplot as plt
  10. import pickle
  11. import time
  12. from functools import cache
  13. from RunTime import cal_time
  14. import CustomModel
  15. warnings.simplefilter("always")
  16. # gpu or cpu
  17. _device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  18. class LSTMResultBase(object):
  19. # result 代表预测之后的数据的结果; result_ 代表预测已有真实值的数据的结果
  20. # sampled result 比 result 更有说服力,克服了 Dropout 随机性问题,更贴近实际,更推荐使用 sampled result 数据。
  21. result, conf, result_, conf_predicted, conf_predicted_sampled, sampled_result, sampled_result_ = [None] * 7
  22. # 在内存中保存原始训练数据
  23. def __init__(self, resultLSTM: CustomModel.CustomLSTM = None):
  24. if isinstance(resultLSTM, CustomModel.CustomLSTM):
  25. self.resultLSTM = resultLSTM
  26. elif isinstance(resultLSTM, LSTMResultBase):
  27. self.resultLSTM = resultLSTM.resultLSTM
  28. self.copy_param(resultLSTM)
  29. else:
  30. warnings.warn(f'输入数据类型:{type(resultLSTM)},非标准数据,请输入 CustomLSTM or LSTMResultBase', DeprecationWarning)
  31. self.__doc__ = resultLSTM.__doc__
  32. def copy_param(self, parent):
  33. self.result, self.conf, self.result_, self.conf_predicted, self.conf_predicted_sampled, self.sampled_result, self.sampled_result_ = \
  34. parent.result, parent.conf, parent.result_, parent.conf_predicted, parent.conf_predicted_sampled, parent.sampled_result, parent.sampled_result_
  35. # 获取变量,不采用拷贝方法
  36. def __getattribute__(self, item):
  37. get = lambda name: object.__getattribute__(self, name)
  38. try:
  39. return get(item)
  40. except AttributeError:
  41. pass
  42. try:
  43. results = get('resultLSTM')
  44. except AttributeError:
  45. raise ImportError("CustomLSTM load error")
  46. try:
  47. attribute = getattr(results, item)
  48. if inspect.ismethod(attribute):
  49. warnings.warn(f"调用{attribute},可能出错,请尽量不要在此调用 CustomLSTM", DeprecationWarning)
  50. return attribute
  51. except AttributeError:
  52. AttributeError(f"{item} not in bot CustomLSTM or LSTMResult")
  53. def __dir__(self):
  54. return [x for x in dir(self.resultLSTM) if not inspect.ismethod(getattr(self.resultLSTM, x))] + dir(LSTMResultBase)
  55. def __getstate__(self):
  56. return self.__dict__
  57. def __setstate__(self, dict_):
  58. self.__dict__.update(dict_)
  59. # 预测从 window_size 到结束数据的预测值,以便与真实值做比较
  60. @cal_time()
  61. def predicted(self, is_random=False):
  62. if self.model is None:
  63. raise ValueError("请先使用CustomLSTM.train")
  64. if is_random:
  65. self.model.train()
  66. else:
  67. self.model.eval()
  68. self.model.init_hidden(len(self.X))
  69. with torch.no_grad():
  70. predicted = self.model(self.X).to(_device)
  71. predicted = predicted.detach().cpu().numpy()
  72. self.result_ = np.array(predicted)
  73. if self.scale:
  74. self.result_ = self.scaler.inverse_transform(self.result_)
  75. return self.result_
  76. # 预测之后 n 天的值
  77. @cal_time()
  78. def predict(self, n, is_random=False):
  79. if self.model is None:
  80. raise ValueError("请先使用CustomLSTM.train")
  81. if is_random:
  82. self.model.train()
  83. else:
  84. self.model.eval()
  85. self.model.init_hidden(1)
  86. _data = self.data[-self.window:, :].tolist()
  87. y = []
  88. with torch.no_grad():
  89. for _ in range(n):
  90. x = torch.tensor(_data).float().unsqueeze(0).to(_device)
  91. _result = self.model(x).detach().cpu().tolist()
  92. y.append(_result[0])
  93. _data.append(_result[0])
  94. _data.pop(0)
  95. self.result = np.array(y)
  96. if self.scale:
  97. self.result = self.scaler.inverse_transform(self.result)
  98. return self.result
  99. # 保存整个模型
  100. def save(self, name="./data"):
  101. with open(name, 'wb') as f:
  102. pickle.dump(self, f)
  103. # 加载模型
  104. @staticmethod
  105. def load(name="./data"):
  106. with open(name, "rb") as f:
  107. return pickle.load(f)
  108. # 标准计算已有真实数据的 conf
  109. def conf_int_predicted(self, alpha=0.05):
  110. if self.result_ is None:
  111. raise ValueError("请先使用LSTMResultBase.predicted")
  112. y_pred = self.result_
  113. y_true = self.data[self.window:]
  114. n_param = self.data.shape[1]
  115. # 变量index,每个变量个数,(lower mean upper)
  116. conf = np.zeros(shape=(n_param, len(y_true), 3))
  117. for i in range(n_param):
  118. conf[i] = Utils.ci(y_true[:, i], y_pred[:, i], alpha=alpha)
  119. self.conf_predicted = conf
  120. return conf
  121. # 采用 sample 方法计算已有真实数据的置信区间,推荐使用该方法
  122. @cache
  123. def conf_int_predicted_sampled(self, alpha=0.05, n_sample=1000, method=Utils.sample_ci):
  124. result_ = np.copy(self.result_)
  125. n = self.data.shape[0] - self.window
  126. n_param = self.data.shape[1]
  127. # 变量index,每个变量个数,(lower mean upper)
  128. conf = np.zeros(shape=(n_param, n, 3))
  129. sample = np.zeros(shape=(n_sample, n, n_param))
  130. for i in range(n_sample):
  131. print(f"[{i + 1}/{n_sample}] ", end="")
  132. sample[i] = self.predicted(is_random=True)
  133. for i in range(n_param):
  134. conf[i] = method(sample[:, :, i], alpha=alpha)
  135. # conf[i] = Utils.sample_ci(sample[:, :, i], alpha=alpha)
  136. # conf[i] = Utils.bootstrap_ci_sampled(sample[:, :, i], alpha=alpha)
  137. self.conf_predicted_sampled = conf
  138. self.sampled_result_ = conf[:, :, 1].reshape(n_param, n).T
  139. self.result_ = result_
  140. return conf
  141. # 预测数据置信区间,采用 sample 方法计算
  142. @cache
  143. def conf_int_sampled(self, n, alpha=0.05, n_sample=1000, method=Utils.sample_ci):
  144. result = np.copy(self.result)
  145. n_param = self.data.shape[1]
  146. sample = np.zeros(shape=(n_sample, n, n_param))
  147. conf = np.zeros(shape=(n_param, n, 3))
  148. for i in range(n_sample):
  149. print(f"[{i + 1}/{n_sample}] ", end="")
  150. sample[i] = self.predict(n, is_random=True)
  151. for i in range(n_param):
  152. conf[i] = method(sample[:, :, i], alpha=alpha)
  153. # conf[i] = Utils.sample_ci(sample[:, :, i], alpha=alpha)
  154. # conf[i] = Utils.bootstrap_ci_sampled(sample[:, :, i], alpha=alpha)
  155. self.conf = conf
  156. self.sampled_result = conf[:, :, 1].reshape(n_param, n).T
  157. self.result = result
  158. return conf
  159. # 绘制 loss 曲线
  160. def plot_loss(self):
  161. if self.losses is None:
  162. raise ValueError("loss 不存在,请先进行训练")
  163. plt.figure(figsize=(12, 6))
  164. plt.plot(self.losses)
  165. plt.xlabel("Epoch")
  166. plt.ylabel("Loss")
  167. plt.show()
  168. def plot_all(self, names=None, index=None, conf_predicted=None, conf_predict=None, conf_alpha=None, result=None, result_=None):
  169. if names is None:
  170. names = [''] * len(self.data[0])
  171. x = np.arange(len(self.data))
  172. x_further = np.arange(len(self.data), len(self.data) + len(self.result))
  173. if index is None:
  174. index = range(len(self.data[0]))
  175. elif isinstance(index, int):
  176. index = [index]
  177. conf_name = '{} Confidence interval' if conf_alpha is None else '{} ' + f'{1 - conf_alpha} Confidence interval'
  178. plt.figure(figsize=(12, 6))
  179. for i in index:
  180. y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
  181. plt.plot(x, y_true, label=f'{names[i]} True Values')
  182. if result_ is not None:
  183. plt.plot(x[self.window:], result_[:, i:i + 1], label=f'{names[i]} Predictions')
  184. if result is not None:
  185. plt.plot(x_further, result[:, i:i + 1], label=f"{names[i]} Further Predictions")
  186. if conf_predicted is not None:
  187. plt.fill_between(x[self.window:], conf_predicted[i, :, 0], conf_predicted[i, :, 2], alpha=0.2, label=conf_name.format(names[i]))
  188. if conf_predict is not None:
  189. plt.fill_between(x_further, conf_predict[i, :, 0], conf_predict[i, :, 2], alpha=0.2, label=conf_name.format(names[i]))
  190. self.process_x_label()
  191. plt.legend()
  192. plt.show()
  193. # 因为可以为二维数组即对多个变量进行预测,names即为每个变量的名字
  194. def plot(self, names=None):
  195. if self.result is None:
  196. raise ValueError("请先使用LSTMResultBase.predict")
  197. if self.result_ is None:
  198. raise ValueError("请先使用LSTMResultBase.predicted")
  199. if names is None:
  200. names = [''] * len(self.data[0])
  201. x = np.arange(len(self.data))
  202. x_further = np.arange(len(self.data), len(self.data) + len(self.result))
  203. plt.figure(figsize=(12, 6))
  204. for i in range(len(self.data[0])):
  205. y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
  206. plt.plot(x, y_true, label=f'{names[i]} True Values')
  207. plt.plot(x[self.window:], self.result_[:, i:i + 1], label=f'{names[i]} Predictions')
  208. plt.plot(x_further, self.result[:, i:i + 1], label=f"{names[i]} Further Predictions")
  209. self.process_x_label()
  210. plt.legend()
  211. plt.show()
  212. def plot_sampled(self, names=None):
  213. if self.sampled_result is None:
  214. raise ValueError("请先使用LSTMResultBase.conf_int_sampled")
  215. if self.sampled_result_ is None:
  216. raise ValueError("请先使用LSTMResultBase.conf_int_predicted_sampled")
  217. if names is None:
  218. names = [''] * len(self.data[0])
  219. x = np.arange(len(self.data))
  220. x_further = np.arange(len(self.data), len(self.data) + len(self.result))
  221. plt.figure(figsize=(12, 6))
  222. for i in range(len(self.data[0])):
  223. y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
  224. plt.plot(x, y_true, label=f'{names[0]} True Values')
  225. plt.plot(x[self.window:], self.sampled_result_[:, i:i + 1], label=f'{names[0]} Predictions')
  226. plt.plot(x_further, self.sampled_result[:, i:i + 1], label=f"{names[0]} Further Predictions")
  227. self.process_x_label()
  228. plt.legend()
  229. plt.show()
  230. # 把 x 轴设置为时间
  231. def process_x_label(self):
  232. if self.date is None:
  233. return
  234. begin = self.date[0]
  235. end = self.date[-1]
  236. between = (end - begin) // (len(self.date) - 1)
  237. n = len(self.result)
  238. length = len(self.data) + n
  239. gap = length // 5
  240. # "%Y-%m-%d %H:%M:%S"
  241. plt.xticks(np.arange(0, length, gap), [time.strftime("%Y-%m-%d", time.localtime(i)) for i in range(begin, end + between * n + 1, between * gap)],
  242. rotation=45)
  243. plt.tight_layout()
  244. # 对单个变量画置信区间
  245. def plot_confidence(self, index=0, alpha=0.05, conf_predicted=None):
  246. if self.result is None:
  247. raise ValueError("请先使用LSTMResultBase.predict")
  248. if self.result_ is None:
  249. raise ValueError("请先使用LSTMResultBase.predicted")
  250. if conf_predicted is None:
  251. try:
  252. conf_predicted = self.conf_predicted_sampled[index]
  253. except IndexError:
  254. try:
  255. conf_predicted = self.conf_predicted[index]
  256. except IndexError:
  257. raise ValueError("请先使用LSTMResultBase.conf_int_predicted_sampled/conf_int_predicted")
  258. else:
  259. conf_predicted = conf_predicted[index]
  260. try:
  261. conf = self.conf[index]
  262. except IndexError:
  263. raise ValueError("请先使用LSTMResultBase.conf_int_predicted")
  264. plt.figure(figsize=(12, 6))
  265. x = np.arange(len(self.data))
  266. n = len(self.result)
  267. x_further = np.arange(len(self.data), len(self.data) + n)
  268. y_true = self.data[:, index: index + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, index: index + 1], index)
  269. plt.plot(x, y_true, label='True Values')
  270. plt.plot(x[self.window:], conf_predicted[:, 1], label='Predictions')
  271. plt.plot(x_further, conf[:, 1], label="Further Predictions")
  272. plt.fill_between(x[self.window:], conf_predicted[:, 0], conf_predicted[:, 2], alpha=0.2, label=f'{1 - alpha} Confidence interval')
  273. plt.fill_between(x_further, conf[:, 0], conf[:, 2], alpha=0.2, label=f'{1 - alpha} Confidence interval')
  274. self.process_x_label()
  275. plt.legend()
  276. plt.show()
  277. # 打印 summary, 即 r^2 评价函数等
  278. def summary(self):
  279. if self.result_ is None:
  280. raise ValueError("请先使用LSTMResultBase.predicted")
  281. data = self.data[self.window:, :]
  282. if self.scale:
  283. data = self.scaler.inverse_transform(data)
  284. print("==========Summary Begin===========")
  285. print("R2 =", score_r2 := Utils.r2(self.result_, data))
  286. print("MSE =", score_mse := Utils.mse(self.result_, data))
  287. print("RMSE =", score_rmse := np.sqrt(score_mse))
  288. print("MAE =", score_mae := Utils.mae(self.result_, data))
  289. print("MAPE =", score_mape := Utils.mape(self.result_, data, replace=self.scale))
  290. print("SMAPE =", score_smape := Utils.smape(self.result_, data))
  291. print(f"Average is {score_r2.mean()} {score_mse.mean()} {score_rmse.mean()} {score_mae.mean()} {score_mape.mean()} {score_smape.mean()}")
  292. print("===========Summary end============")
  293. return score_r2, score_mse, score_rmse, score_mae, score_mape, score_smape
  1. """
  2. File : Model.py
  3. """
  4. import torch
  5. import torch.nn as nn
  6. # gpu or cpu
  7. _device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  8. # 定义LSTM模型
  9. class LSTM(nn.Module):
  10. hidden = None
  11. def __init__(self, input_size, hidden_size, output_size, num_layers):
  12. super().__init__()
  13. self.hidden_size = hidden_size
  14. self.num_layers = num_layers
  15. # As batch_first=True, input: (batch_size, sequence_length, input_size)
  16. self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
  17. self.lstm2 = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
  18. self.out = nn.Sequential(
  19. nn.Linear(hidden_size, hidden_size),
  20. nn.ReLU(),
  21. nn.Dropout(0.5),
  22. nn.Linear(hidden_size, output_size)
  23. )
  24. # 可选操作,可以把下一行注释
  25. self.apply(LSTM.init_weights)
  26. def forward(self, x, left=None):
  27. self.lstm.flatten_parameters()
  28. self.lstm2.flatten_parameters()
  29. # 防止 loss.backward 报错
  30. hidden = [each.data for each in self.hidden] if left is None else [each.data[:, :left, :] for each in self.hidden]
  31. x, hidden = self.lstm(x, hidden)
  32. x, self.hidden = self.lstm2(x, hidden)
  33. x = x[:, -1, :]
  34. x = self.out(x)
  35. return x
  36. # (h0, c0)
  37. def init_hidden(self, batch_size):
  38. weight = next(self.parameters()).data
  39. self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(_device),
  40. weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(_device))
  41. @staticmethod
  42. def init_weights(m):
  43. if type(m) == nn.LSTM:
  44. for name, param in m.named_parameters():
  45. if 'weight_ih' in name:
  46. torch.nn.init.orthogonal_(param.data)
  47. elif 'weight_hh' in name:
  48. torch.nn.init.orthogonal_(param.data)
  49. elif 'bias' in name:
  50. param.data.fill_(0)
  51. elif type(m) == nn.Conv1d or type(m) == nn.Linear:
  52. torch.nn.init.orthogonal_(m.weight)
  53. m.bias.data.fill_(0)
  1. """
  2. File : RunTime.py
  3. """
  4. import time
  5. from functools import wraps
  6. # 用于计算函数运行时间,自定义修饰器,带有ms/s/min/hour/day单位
  7. def cal_time(unit='s'):
  8. units = ['ms', 's', 'min', 'hour', 'day']
  9. times = [1000, 1, 1 / 60, 1 / 3600, 1 / 3600 / 24]
  10. try:
  11. time_process = times[units.index(unit.lower())]
  12. except Exception as e:
  13. raise e
  14. def input_func(func):
  15. @wraps(func)
  16. def wrap(*args, **kwargs):
  17. begin_time = time.time()
  18. ans = func(*args, **kwargs)
  19. print(func.__name__, f"用时 {(time.time() - begin_time) * time_process} {unit}")
  20. return ans
  21. return wrap
  22. return input_func
  1. """
  2. File : Utils.py
  3. """
  4. import numpy as np
  5. import pandas as pd
  6. import scipy.stats as st
  7. from typing import Union, List
  8. # 置信区间
  9. def ci(y_true: np.ndarray, y_pred: np.ndarray, alpha: float = 0.05, use_t: bool = True) -> np.ndarray:
  10. residuals = y_true - y_pred
  11. n = len(residuals)
  12. if use_t:
  13. dist = st.t
  14. df = n - 1
  15. t_value = dist.ppf(1 - alpha / 2, df)
  16. else:
  17. dist = st.norm
  18. t_value = dist.ppf(1 - alpha / 2)
  19. if y_true.ndim == 1:
  20. std_err = np.std(residuals, ddof=1) / np.sqrt(n)
  21. else:
  22. std_err = np.std(residuals, ddof=1, axis=0) / np.sqrt(n)
  23. err = t_value * std_err
  24. return np.c_[y_pred - err, y_pred, y_pred + err]
  25. # sample 置信区间
  26. def sample_ci(sample: np.ndarray, alpha: float = 0.05, use_t: bool = True) -> np.ndarray:
  27. n = len(sample)
  28. if use_t:
  29. dist = st.t
  30. df = n - 1
  31. t_value = dist.ppf(1 - alpha / 2, df)
  32. else:
  33. dist = st.norm
  34. t_value = dist.ppf(1 - alpha / 2)
  35. means = sample.mean(axis=0)
  36. stds = sample.std(axis=0, ddof=1)
  37. lower = means - t_value * stds / np.sqrt(n)
  38. upper = means + t_value * stds / np.sqrt(n)
  39. return np.c_[lower, means, upper]
  40. # sample bootstrap 置信区间 sample: (n_sample, n)
  41. def bootstrap_ci_sampled(sample: np.ndarray, alpha: float = 0.05, n_bootstraps: int = 1000) -> np.ndarray:
  42. n = len(sample[0])
  43. sample_size = len(sample)
  44. lower = np.zeros(shape=(n,))
  45. means = np.zeros(shape=(n,))
  46. upper = np.zeros(shape=(n,))
  47. for i in range(n):
  48. bootstrap_sample = np.random.choice(sample[:, i], size=(sample_size, n_bootstraps), replace=True)
  49. bootstrap_means = np.mean(bootstrap_sample, axis=1)
  50. lower[i] = np.percentile(bootstrap_means, 100 * alpha / 2)
  51. upper[i] = np.percentile(bootstrap_means, 100 * (1 - alpha / 2))
  52. means[i] = np.mean(bootstrap_means)
  53. return np.c_[lower, means, upper]
  54. # 输入均为二维 numpy , 数据产生在列,输出一维 numpy 评价 value
  55. def r2(y_pred, y_true):
  56. return 1 - ((y_pred - y_true) ** 2).sum(axis=0) / ((y_true.mean(axis=0) - y_true) ** 2).sum(axis=0)
  57. def mse(y_pred, y_true):
  58. return ((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred)
  59. def rmse(y_pred, y_true):
  60. return np.sqrt(((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred))
  61. def mae(y_pred, y_true):
  62. return (np.absolute(y_true - y_pred)).sum(axis=0) / len(y_true)
  63. def mape(y_pred, y_true, replace=False):
  64. # 防止 y_true 含 0
  65. if replace:
  66. y_true = np.copy(y_true)
  67. y_true[y_true == 0] = 1
  68. return (np.abs((y_pred - y_true) / y_true)).mean(axis=0) * 100
  69. def smape(y_pred, y_true):
  70. return 2.0 * (np.abs(y_pred - y_true) / (np.abs(y_pred) + np.abs(y_true))).mean(axis=0) * 100
  71. # 添加快捷打开文件操作
  72. def openfile(name: str, data_col: Union[None, List[str], List[int]] = None, date_col: Union[None, int] = None) -> (np.ndarray, np.ndarray):
  73. file_type = name.split(".")[-1]
  74. if file_type == "csv":
  75. df = pd.read_csv(name, encoding='GBK')
  76. elif file_type == "xlsx" or file_type == "xls":
  77. df = pd.read_excel(name)
  78. else:
  79. raise TypeError(f"{name} 类型不是 csv, xls, xlsx")
  80. # df = df[["列名字1", "列名字2"]]
  81. df_data = df.iloc[:, data_col] if data_col is not None else df
  82. date = df.iloc[:, date_col].astype(
  83. 'int64') // 1e9 if date_col is not None else None
  84. return np.array(date, dtype=int), np.array(df_data)
更新日志:

2023-4-10:修改了 bootstrap错误;优化了dir(LSTMResultBase);添加plot_all 自定义函数;修复了错误 warnings 不显示;对 sample 和 single 数据分离;取消 batch_size 检测改为修改 h0, c0;把 predicted 放入 resultbase 中;修复了 save 数据丢失问题。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/89847
推荐阅读
相关标签
  

闽ICP备14008679号