当前位置:   article > 正文

「实战案例:使用 LSTM 进行单变量时间序列预测(附Python完整代码)_lstm实际应用案例

lstm实际应用案例

LSTM介绍

        LSTM(长短时记忆网络)是一种特殊类型的循环神经网络(RNN),用于处理和预测时间序列数据。相比于传统的RNN,LSTM具有更好的处理长期依赖性和记忆能力。关键思想是引入了一个称为"记忆单元"的组件。记忆单元能够接收输入、输出和保留信息,以便在整个序列中传递和保存重要的信息。这种机制使得LSTM能够更好地处理长序列,避免梯度消失问题,并且具有较长的记忆。其结构包括三个主要的门控单元:输入门、遗忘门和输出门。输入门决定了多少输入信息应该被记忆,遗忘门决定了多少旧的记忆应该被遗忘,输出门决定了多少记忆应该输出给下一个时间步。训练过程通常使用反向传播算法和梯度下降法来更新网络参数。在训练过程中,LSTM可以通过学习时间序列数据中的模式和规律来预测未来的值。

单变量时间预测

数据是github上下载ETTh1.csv转换的,我用的是时间和OT列

预测结果图

数据集

        是从github上下载的油温数据ETTh1.csv,只用了里面的时间列和OT列,具体地址不记得了,需要数据集的和我说一下。

完整代码

  1. import csv
  2. import numpy as np
  3. import pandas as pd
  4. import matplotlib.pyplot as plt
  5. import seaborn as sns
  6. from sklearn.preprocessing import MinMaxScaler
  7. from sklearn.metrics import r2_score
  8. import tensorflow as tf
  9. from tensorflow.python.keras import Sequential, layers, utils
  10. def predict_next(model, sample, epoch=20):
  11. temp1 = list(sample[:, 0])
  12. for i in range(epoch):
  13. sample = sample.reshape(1, x_Seq_len, 1)
  14. pred = model.predict(sample)
  15. value = pred.tolist()[0][0]
  16. temp1.append(value)
  17. sample = np.array(temp1[i + 1 : i + x_Seq_len + 1])
  18. return temp1
  19. def create_new_dataset(dataset, seq_len=12):
  20. """基于原始数据集构造新的序列特征数据集
  21. Params:
  22. dataset : 原始数据集
  23. seq_len : 序列长度(时间跨度)
  24. Returns:
  25. X, y
  26. """
  27. X = [] # 初始特征数据集为空列表
  28. y = [] # 初始标签数据集为空列表,y标签为样本的下一个点,即预测点
  29. start = 0 # 初始位置
  30. end = dataset.shape[0] - seq_len # 截止位置,dataset.shape[0]就是有多少条
  31. for i in range(start, end): # for循环构造特征数据集
  32. sample = dataset[i : i + seq_len] # 基于时间跨度seq_len创建样本
  33. label = dataset[i + seq_len] # 创建sample对应的标签
  34. X.append(sample) # 保存sample
  35. y.append(label) # 保存label
  36. # 返回特征数据集和标签集
  37. return np.array(X), np.array(y)
  38. def split_dataset(X, y, train_ratio=0.8):
  39. """基于X和y,切分为train和test
  40. Params:
  41. X : 特征数据集
  42. y : 标签数据集
  43. train_ratio : 训练集占X的比例
  44. Returns:
  45. X_train, X_test, y_train, y_test
  46. """
  47. X_len = len(X) # 特征数据集X的样本数量
  48. train_data_len = int(X_len * train_ratio) # 训练集的样本数量
  49. X_train = X[:train_data_len] # 训练集
  50. y_train = y[:train_data_len] # 训练标签集
  51. X_test = X[train_data_len:] # 测试集
  52. y_test = y[train_data_len:] # 测试集标签集
  53. # 返回值
  54. return X_train, X_test, y_train, y_test
  55. # 功能函数:基于新的X_train, X_test, y_train, y_test创建批数据(batch dataset)
  56. def create_batch_data(X, y, batch_size=32, data_type=1):
  57. """基于训练集和测试集,创建批数据
  58. Params:
  59. X : 特征数据集
  60. y : 标签数据集
  61. batch_size : batch的大小,即一个数据块里面有几个样本
  62. data_type : 数据集类型(测试集表示1,训练集表示2)
  63. Returns:
  64. train_batch_data 或 test_batch_data
  65. """
  66. if data_type == 1: # 测试集
  67. dataset = tf.data.Dataset.from_tensor_slices(
  68. (tf.constant(X), tf.constant(y))
  69. ) # 封装X和y,成为tensor类型
  70. test_batch_data = dataset.batch(batch_size) # 构造批数据
  71. # 返回
  72. return test_batch_data
  73. else: # 训练集
  74. dataset = tf.data.Dataset.from_tensor_slices(
  75. (tf.constant(X), tf.constant(y))
  76. ) # 封装X和y,成为tensor类型
  77. train_batch_data = dataset.cache().shuffle(1000).batch(batch_size) # 构造批数据
  78. # 返回
  79. return train_batch_data
  80. if __name__ == "__main__":
  81. plt.rcParams["font.sans-serif"] = ["SimHei"] # 显示中文标签
  82. plt.rcParams["axes.unicode_minus"] = False # 解决负数问题
  83. x_Seq_len = 32 # 16
  84. # dataset = pd.read_csv("transform\data\data_pre_all_new.csv")
  85. dataset = pd.read_csv("transform\ETTh1.csv")
  86. dataset = dataset.iloc[:, [0, 7]]
  87. dataset.columns = ["date", "OT"]
  88. dataset["date"] = pd.to_datetime(
  89. dataset["date"],
  90. format="%Y-%m-%d %H:%M:%S",
  91. # dataset["date"],
  92. # format="%Y-%m-%d",
  93. )
  94. aa = dataset.date
  95. dataset.index = dataset.date # 将其索引变为时间
  96. dataset.drop(columns="date", axis=1, inplace=True)
  97. plt.figure()
  98. plt.plot(dataset)
  99. plt.show()
  100. """
  101. 数据清洗
  102. """
  103. # 缺失值处理
  104. # 查看是否有缺失值
  105. print(dataset.info()) # 无缺失值
  106. # print(dataset[dataset.isnull()==False])#无
  107. # dataset['总有功功率(kw)']=dataset['总有功功率(kw)'].fillna(0) 对缺失值填值处理
  108. # dataset1=dataset[dataset['总有功功率(kw)'].notnull()] 剔除存在缺失值的数据,自己选择一直缺失值处理的方法
  109. # 异常值处理
  110. # dataset = dataset.reset_index(drop=True)
  111. # dataset = pd.concat([aa, dataset], axis=1)
  112. # print(dataset) # (17420, 2) 索引
  113. # exit()
  114. """
  115. 箱型图查看
  116. """
  117. f, ax = plt.subplots()
  118. sns.boxplot(y="OT", data=dataset, ax=ax)
  119. plt.show()
  120. s = dataset.describe()
  121. # 基本统计量,存在异常值的将其筛选出来进行处理,可以用中位数填值或者众数填值,方法任选,这里没有异常值就没有处理
  122. q1 = s.loc["25%"]
  123. q3 = s.loc["75%"]
  124. iqr = q3 - q1 # 分位差
  125. mi = q1 - 1.5 * iqr # 下限,低于这个为异常值
  126. ma = q3 + 1.5 * iqr # 上限,高于这个为异常值
  127. # print(dataset)
  128. # print(dataset.shape) #(17420, 1)
  129. # 寻找异常点,获得异常点索引值,删除索引值所在行数据
  130. dataset = dataset.drop(
  131. index=(
  132. dataset[
  133. ((dataset.OT) > float(ma.values)) | ((dataset.OT) < float(mi.values))
  134. ].index
  135. )
  136. )
  137. # print(dataset.shape) # (16949, 1)
  138. # print(dataset)
  139. # 筛选异常值
  140. # 归一化处理,均值为0,方差为1
  141. scaler = MinMaxScaler()
  142. dataset["OT"] = scaler.fit_transform(dataset["OT"].values.reshape(-1, 1))
  143. # 将归一化的数据保持
  144. with open("data.csv", "w", encoding="utf-8", newline="") as f:
  145. w = csv.writer(f)
  146. w.writerow(dataset["OT"])
  147. # 归一化后的绘图
  148. dataset["OT"].plot()
  149. plt.show()
  150. """
  151. 特征提取(特征工程)
  152. """
  153. dataset_new = dataset
  154. # X为特征数据集,y为标签数据集
  155. X, y = create_new_dataset(dataset_new.values, seq_len=x_Seq_len)
  156. # X_train为数据训练集,X_test为数据测试集,y_train为标签训练集,y_test为标签测试集合
  157. X_train, X_test, y_train, y_test = split_dataset(X, y)
  158. # 基于新的X_train, X_test, y_train, y_test创建批数据(batch dataset)
  159. # 测试批数据
  160. test_batch_dataset = create_batch_data(X_test, y_test, batch_size=24, data_type=1)
  161. # 训练批数据
  162. train_batch_dataset = create_batch_data(
  163. X_train, y_train, batch_size=24, data_type=2
  164. )
  165. """
  166. 构建模型
  167. """
  168. model = Sequential(
  169. [layers.LSTM(8, input_shape=(x_Seq_len, 1)), layers.Dense(1)]
  170. ) # 8
  171. # 定义 checkpoint,保存权重文件
  172. file_path = "best_checkpoint.hdf5" # 将数据加载到内存
  173. checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
  174. filepath=file_path,
  175. monitor="loss",
  176. mode="min",
  177. save_best_only=True,
  178. save_weights_only=True,
  179. )
  180. """
  181. 编译运行预测
  182. """
  183. # 模型编译
  184. model.compile(optimizer="adam", loss="mae")
  185. # 模型训练(次数200
  186. history = model.fit(
  187. train_batch_dataset,
  188. epochs=100, # 100
  189. validation_data=test_batch_dataset,
  190. callbacks=[checkpoint_callback],
  191. )
  192. # 显示 train loss 和 val loss
  193. plt.figure()
  194. plt.plot(history.history["loss"], label="train loss")
  195. plt.plot(history.history["val_loss"], label="val loss")
  196. plt.title("LOSS")
  197. plt.xlabel("Epochs")
  198. plt.ylabel("Loss")
  199. plt.legend(loc="best")
  200. plt.show()
  201. # 模型验证
  202. test_pred = model.predict(X_test, verbose=1)
  203. plt.figure()
  204. d1 = plt.plot(y_test, label="True")
  205. d2 = plt.plot(test_pred, label="pred")
  206. plt.legend([d1, d2], labels=["True", "pred"])
  207. plt.show()
  208. # 计算r2
  209. score = r2_score(y_test, test_pred)
  210. print("r^2 的值: ", score)
  211. # 绘制test中前100个点的真值与预测值
  212. y_true = y_test # 真实值
  213. y_pred = test_pred # 预测值
  214. fig, axes = plt.subplots(2, 1)
  215. ax0 = axes[0].plot(y_true, marker="o", color="red", label="true")
  216. ax1 = axes[1].plot(y_pred, marker="*", color="blue", label="pred")
  217. plt.show()
  218. """
  219. 模型测试
  220. """
  221. # 选择test中的最后一个样本
  222. sample = X_test[-1]
  223. sample = sample.reshape(1, sample.shape[0], 1)
  224. # 模型预测
  225. sample_pred = model.predict(sample) # predict()预测标签值
  226. ture_data = X_test[-1] # 真实test的最后20个数据点
  227. # 预测后48个点
  228. preds = predict_next(model, ture_data, 48)
  229. # 绘图
  230. plt.figure()
  231. plt.plot(preds, color="yellow", label="Prediction")
  232. plt.plot(ture_data, color="blue", label="Truth")
  233. plt.xlabel("Epochs")
  234. plt.ylabel("Value")
  235. plt.legend(loc="best")
  236. plt.show()
  237. relative_error = 0
  238. """模型精确度计算"""
  239. for i in range(len(y_pred)):
  240. relative_error += (abs(y_pred[i] - y_true[i]) / y_true[i]) ** 2
  241. acc = 1 - np.sqrt(relative_error / len(y_pred))
  242. print(f"模型的测试准确率为:", acc)

  

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/1015878
推荐阅读
相关标签
  

闽ICP备14008679号