当前位置:   article > 正文

Python数据分析案例52——基于SSA-LSTM的风速预测(麻雀优化)

Python数据分析案例52——基于SSA-LSTM的风速预测(麻雀优化)

案例背景

又要开始更新时间序列水论文的系列的方法了,前面基于各种不同神经网络层,还有注意力机制做了一些缝合模型。
其实论文里面用的多的可能是优化算法和模态分解,这两个我还没出专门的例子,这几天正好出一个优化算法的例子来做一个时间序列模型的缝合版。
想看更多的发论文用的模型可以参考我数据分析案例之前的文章,或者关注我后面的文章。

其实优化算法在python里面的生态不如MATLAB,现有的包很少,所以都是现写的。我自己也有优化算法专栏,以后有机会都写上去。本次的Python版的麻雀算法就是手写的,网上基本没有。

本次就简单点,使用优化算法里面表现较好的麻雀优化算法,优化算法我也做过一些测试,虽然都是各有优势,但是从通用性和整体表现来看,麻雀优化算法表现是较好的,那些什么混沌麻雀,自适应麻雀也差不多,可能在特殊的情况下表现会好一些。什么,你问问为什么不用粒子群,退火,遗传这种算法?emmmm,你自己去找些函数试试就知道他们比麻雀算法差多少了。。。


数据介绍

本次数据集有两个csv,一个桥面风速,一个气象站风速。

一般来说,桥面风速是好测量的,气象站的风速是被认为是真实的风速,所以我们当前的用气象站的风速作为y,之前的桥面风速和气象站风速作为X。

当然,需要本次案例演示数据和全部代码文件的同学还是可以参考:风速预测


代码实现

导入包,深度学习的包有点多、

  1. import os
  2. import math
  3. import time
  4. import datetime
  5. import random as rn
  6. import numpy as np
  7. import pandas as pd
  8. import matplotlib.pyplot as plt
  9. %matplotlib inline
  10. plt.rcParams ['font.sans-serif'] ='SimHei' #显示中文
  11. plt.rcParams ['axes.unicode_minus']=False #显示负号
  12. from sklearn.model_selection import train_test_split
  13. from sklearn.preprocessing import MinMaxScaler,StandardScaler
  14. from sklearn.metrics import mean_absolute_error
  15. from sklearn.metrics import mean_squared_error,r2_score
  16. import tensorflow as tf
  17. import keras
  18. from keras.layers import Layer
  19. import keras.backend as K
  20. from keras.models import Model, Sequential
  21. from keras.layers import GRU, Dense,Conv1D, MaxPooling1D,GlobalMaxPooling1D,Embedding,Dropout,Flatten,SimpleRNN,LSTM
  22. from keras.callbacks import EarlyStopping
  23. #from tensorflow.keras import regularizers
  24. #from keras.utils.np_utils import to_categorical
  25. from tensorflow.keras import optimizers

读取数据,展示前五行

  1. data0=pd.concat([pd.read_excel('bridge2.xlsx').set_index('时间'),
  2. pd.read_excel('weather_station.xlsx').set_index('时间')],axis=1).sort_index().fillna(0)
  3. data0.head()

一行代码读取两个文件,并且合并,代码风格还是简洁优雅的。

注意想换成自己的数据集的话,要预测的y放在最后一列。

构建训练集和测试集

时间序列的预测一套滑动窗口,构建的函数如下

  1. def build_sequences(text, window_size=24):
  2. #text:list of capacity
  3. x, y = [],[]
  4. for i in range(len(text) - window_size):
  5. sequence = text[i:i+window_size]
  6. target = text[i+window_size]
  7. x.append(sequence)
  8. y.append(target)
  9. return np.array(x), np.array(y)
  10. def get_traintest(data,train_ratio=0.8,window_size=24):
  11. train_size=int(len(data)*train_ratio)
  12. train=data[:train_size]
  13. test=data[train_size-window_size:]
  14. X_train,y_train=build_sequences(train,window_size=window_size)
  15. X_test,y_test=build_sequences(test,window_size=window_size)
  16. return X_train,y_train[:,-1],X_test,y_test[:,-1]

对x和y进行标准化

  1. data=data0.to_numpy()
  2. scaler = MinMaxScaler()
  3. scaler = scaler.fit(data[:,:-1])
  4. X=scaler.transform(data[:,:-1])
  5. y_scaler = MinMaxScaler()
  6. y_scaler = y_scaler.fit(data[:,-1].reshape(-1,1))
  7. y=y_scaler.transform(data[:,-1].reshape(-1,1))

划分训练集和测试集

  1. train_ratio=0.8 #训练集比例
  2. window_size=64 #滑动窗口大小,即循环神经网络的时间步长
  3. X_train,y_train,X_test,y_test=get_traintest(np.c_[X,y],window_size=window_size,train_ratio=train_ratio)
  4. print(X_train.shape,y_train.shape,X_test.shape,y_test.shape)

数据可视化

  1. y_test = y_scaler.inverse_transform(y_test.reshape(-1,1))
  2. test_size=int(len(data)*(1-train_ratio))
  3. plt.figure(figsize=(10,5),dpi=256)
  4. plt.plot(data0.index[:-test_size],data0.iloc[:,-1].iloc[:-test_size],label='Train',color='#FA9905')
  5. plt.plot(data0.index[-test_size:],data0.iloc[:,-1].iloc[-(test_size):],label='Test',color='#FB8498',linestyle='dashed')
  6. plt.legend()
  7. plt.ylabel('Predict Series',fontsize=16)
  8. plt.xlabel('Time',fontsize=16)
  9. plt.show()

训练函数的准备

下面继续自定义函数,评价指标

  1. def set_my_seed():
  2. os.environ['PYTHONHASHSEED'] = '0'
  3. np.random.seed(1)
  4. rn.seed(12345)
  5. tf.random.set_seed(123)
  6. def evaluation(y_test, y_predict):
  7. mae = mean_absolute_error(y_test, y_predict)
  8. mse = mean_squared_error(y_test, y_predict)
  9. rmse = np.sqrt(mean_squared_error(y_test, y_predict))
  10. mape=(abs(y_predict -y_test)/ y_test).mean()
  11. #r_2=r2_score(y_test, y_predict)
  12. return mse, rmse, mae, mape #r_2

我们使用回归问题常用的mse, rmse, mae, mape作为预测效果的评价指标。

自定义注意力机制的类

  1. class AttentionLayer(Layer): #自定义注意力层
  2. def __init__(self, **kwargs):
  3. super(AttentionLayer, self).__init__(**kwargs)
  4. def build(self, input_shape):
  5. self.W = self.add_weight(name='attention_weight',
  6. shape=(input_shape[-1], input_shape[-1]),
  7. initializer='random_normal',
  8. trainable=True)
  9. self.b = self.add_weight(name='attention_bias',
  10. shape=(input_shape[1], input_shape[-1]),
  11. initializer='zeros',
  12. trainable=True)
  13. super(AttentionLayer, self).build(input_shape)
  14. def call(self, x):
  15. # Applying a simpler attention mechanism
  16. e = K.tanh(K.dot(x, self.W) + self.b)
  17. a = K.softmax(e, axis=1)
  18. output = x * a
  19. return output
  20. def compute_output_shape(self, input_shape):
  21. return input_shape

自定义模型的构建

  1. def build_model(X_train,mode='LSTM',hidden_dim=[32,16]):
  2. set_my_seed()
  3. model = Sequential()
  4. if mode=='MLP':
  5. model.add(Dense(hidden_dim[0],activation='relu',input_shape=(X_train.shape[-2],X_train.shape[-1])))
  6. model.add(Flatten())
  7. model.add(Dense(hidden_dim[1],activation='relu'))
  8. elif mode=='LSTM':
  9. # LSTM
  10. model.add(LSTM(hidden_dim[0],return_sequences=True, input_shape=(X_train.shape[-2],X_train.shape[-1])))#
  11. model.add(LSTM(hidden_dim[1]))
  12. #model.add(Flatten())
  13. #model.add(Dense(hidden_dim[1], activation='relu'))
  14. elif mode=='GRU':
  15. #GRU
  16. model.add(GRU(hidden_dim[0],return_sequences=True, input_shape=(X_train.shape[-2],X_train.shape[-1])))
  17. model.add(GRU(hidden_dim[1]))
  18. elif mode == 'Attention-LSTM':
  19. model.add(LSTM(hidden_dim[0], return_sequences=True, input_shape=(X_train.shape[-2], X_train.shape[-1])))
  20. model.add(AttentionLayer())
  21. #model.add(LSTM(hidden_dim[1]))#, return_sequences=False
  22. model.add(Flatten())
  23. model.add(Dense(hidden_dim[1], activation='relu'))
  24. #model.add(Dense(4, activation='relu'))
  25. elif mode=='SSA-LSTM':
  26. # LSTM
  27. model.add(LSTM(hidden_dim[0],input_shape=(X_train.shape[-2],X_train.shape[-1])))#return_sequences=True,
  28. model.add(Dense(hidden_dim[1], activation='relu'))
  29. model.add(Dense(1))
  30. model.compile(optimizer='Adam', loss='mse',metrics=[tf.keras.metrics.RootMeanSquaredError(),"mape","mae"])
  31. return model

自定义画损失图函数和预测对比函数

  1. def plot_loss(hist,imfname=''):
  2. plt.subplots(1,4,figsize=(16,2))
  3. for i,key in enumerate(hist.history.keys()):
  4. n=int(str('14')+str(i+1))
  5. plt.subplot(n)
  6. plt.plot(hist.history[key], 'k', label=f'Training {key}')
  7. plt.title(f'{imfname} Training {key}')
  8. plt.xlabel('Epochs')
  9. plt.ylabel(key)
  10. plt.legend()
  11. plt.tight_layout()
  12. plt.show()
  13. def plot_fit(y_test, y_pred):
  14. plt.figure(figsize=(4,2))
  15. plt.plot(y_test, color="red", label="actual")
  16. plt.plot(y_pred, color="blue", label="predict")
  17. plt.title(f"拟合值和真实值对比")
  18. plt.xlabel("Time")
  19. plt.ylabel('power')
  20. plt.legend()
  21. plt.show()

可能有的小伙伴觉得看不懂了,没关系,我都是高度的封装,不需要知道每个函数里面的细节,大概知道他们是做什么的就行。因为下面要把他们全部打包为训练函数,改一下参数就可以使用不同的模型,很方便,

  1. df_eval_all=pd.DataFrame(columns=['MSE','RMSE','MAE','MAPE'])
  2. df_preds_all=pd.DataFrame()
  3. def train_fuc(mode='LSTM',batch_size=64,epochs=30,hidden_dim=[32,16],verbose=0,show_loss=True,show_fit=True):
  4. #构建模型
  5. s = time.time()
  6. set_my_seed()
  7. model=build_model(X_train=X_train,mode=mode,hidden_dim=hidden_dim)
  8. earlystop = EarlyStopping(monitor='loss', min_delta=0, patience=5)
  9. hist=model.fit(X_train, y_train,batch_size=batch_size,epochs=epochs,callbacks=[earlystop],verbose=verbose)
  10. if show_loss:
  11. plot_loss(hist)
  12. #预测
  13. y_pred = model.predict(X_test)
  14. y_pred = y_scaler.inverse_transform(y_pred)
  15. #print(f'真实y的形状:{y_test.shape},预测y的形状:{y_pred.shape}')
  16. if show_fit:
  17. plot_fit(y_test, y_pred)
  18. e=time.time()
  19. print(f"运行时间为{round(e-s,3)}")
  20. df_preds_all[mode]=y_pred.reshape(-1,)
  21. s=list(evaluation(y_test, y_pred))
  22. df_eval_all.loc[f'{mode}',:]=s
  23. s=[round(i,3) for i in s]
  24. print(f'{mode}的预测效果为:MSE:{s[0]},RMSE:{s[1]},MAE:{s[2]},MAPE:{s[3]}')
  25. print("=======================================运行结束==========================================")
  26. return s[0]

所有的函数都准备完了,下面初始化参数,开始准备训练模型

  1. window_size=64
  2. batch_size=64
  3. epochs=30
  4. hidden_dim=[32,16]
  5. verbose=0
  6. show_fit=True
  7. show_loss=True
  8. mode='LSTM'  #MLP,GRU

MLP模型训练

train_fuc(mode='MLP',batch_size=batch_size,epochs=epochs,hidden_dim=hidden_dim,verbose=1)

可以看到这个训练函数运行完后,可以清晰的看到每个训练轮的损失,损失的变化图,预测的效果对比图,还有评价指标的计算结果。

换模型也很便捷,只需要该mode这一个参数就行。

GRU模型训练

修改一下mode就行,其他参数你可以改也可以不改

train_fuc(mode='GRU',batch_size=batch_size,epochs=epochs,hidden_dim=hidden_dim,verbose=1)

LSTM训练

train_fuc(mode='LSTM',batch_size=batch_size,epochs=epochs,hidden_dim=hidden_dim,verbose=1)

Attention-LSTM模型的训练

train_fuc(mode='Attention-LSTM',batch_size=batch_size,epochs=epochs,hidden_dim=hidden_dim,verbose=1)

好像加了注意力机制的效果只变好了一点点。


麻雀搜索优化算法(SSA)

这里直接写上SSA的源代码,python版本的网上几乎是没有的

  1. import numpy as np
  2. import random
  3. import copy
  4. from matplotlib import pyplot as plt
  5. from mpl_toolkits.mplot3d import Axes3D
  6. ''' 种群初始化函数 '''
  7. def initial(pop, dim, ub, lb):
  8. X = np.zeros([pop, dim])
  9. for i in range(pop):
  10. for j in range(dim):
  11. X[i, j] = random.random()*(ub[j] - lb[j]) + lb[j]
  12. return X,lb,ub
  13. '''边界检查函数'''
  14. def BorderCheck(X,ub,lb,pop,dim):
  15. for i in range(pop):
  16. for j in range(dim):
  17. if X[i,j]>ub[j]:
  18. X[i,j] = ub[j]
  19. elif X[i,j]<lb[j]:
  20. X[i,j] = lb[j]
  21. return X
  22. '''计算适应度函数'''
  23. def CaculateFitness(X,fun):
  24. pop = X.shape[0]
  25. fitness = np.zeros([pop, 1])
  26. for i in range(pop):
  27. fitness[i] = fun(X[i, :])
  28. return fitness
  29. '''适应度排序'''
  30. def SortFitness(Fit):
  31. fitness = np.sort(Fit, axis=0)
  32. index = np.argsort(Fit, axis=0)
  33. return fitness,index
  34. '''根据适应度对位置进行排序'''
  35. def SortPosition(X,index):
  36. Xnew = np.zeros(X.shape)
  37. for i in range(X.shape[0]):
  38. Xnew[i,:] = X[index[i],:]
  39. return Xnew
  40. '''麻雀发现者更新'''
  41. def PDUpdate(X,PDNumber,ST,Max_iter,dim):
  42. X_new = copy.copy(X)
  43. R2 = random.random()
  44. for j in range(PDNumber):
  45. if R2<ST:
  46. X_new[j,:] = X[j,:]*np.exp(-j/(np.random.random()*Max_iter))
  47. else:
  48. X_new[j,:] = X[j,:] + np.random.randn()*np.ones([1,dim])
  49. return X_new
  50. '''麻雀加入者更新'''
  51. def JDUpdate(X,PDNumber,pop,dim):
  52. X_new = copy.copy(X)
  53. for j in range(PDNumber+1,pop):
  54. if j>(pop - PDNumber)/2 + PDNumber:
  55. X_new[j,:]= np.random.randn()*np.exp((X[-1,:] - X[j,:])/j**2)
  56. else:
  57. #产生-1,1的随机数
  58. A = np.ones([dim,1])
  59. for a in range(dim):
  60. if(random.random()>0.5):
  61. A[a]=-1
  62. AA = np.dot(A,np.linalg.inv(np.dot(A.T,A)))
  63. X_new[j,:]= X[1,:] + np.abs(X[j,:] - X[1,:])*AA.T
  64. return X_new
  65. '''危险更新'''
  66. def SDUpdate(X,pop,SDNumber,fitness,BestF):
  67. X_new = copy.copy(X)
  68. Temp = range(pop)
  69. RandIndex = random.sample(Temp, pop)
  70. SDchooseIndex = RandIndex[0:SDNumber]
  71. for j in range(SDNumber):
  72. if fitness[SDchooseIndex[j]]>BestF:
  73. X_new[SDchooseIndex[j],:] = X[0,:] + np.random.randn()*np.abs(X[SDchooseIndex[j],:] - X[1,:])
  74. elif fitness[SDchooseIndex[j]] == BestF:
  75. K = 2*random.random() - 1
  76. X_new[SDchooseIndex[j],:] = X[SDchooseIndex[j],:] + K*(np.abs( X[SDchooseIndex[j],:] - X[-1,:])/(fitness[SDchooseIndex[j]] - fitness[-1] + 10E-8))
  77. return X_new
  78. '''麻雀搜索算法'''
  79. def SSA(pop,dim,lb,ub,Max_iter,fun):
  80. ST = 0.6 #预警值
  81. PD = 0.7 #发现者的比列,剩下的是加入者
  82. SD = 0.2 #意识到有危险麻雀的比重
  83. PDNumber = int(pop*PD) #发现者数量
  84. SDNumber = int(pop*SD) #意识到有危险麻雀数量
  85. X,lb,ub = initial(pop, dim, ub, lb) #初始化种群
  86. fitness = CaculateFitness(X,fun) #计算适应度值
  87. fitness,sortIndex = SortFitness(fitness) #对适应度值排序
  88. X = SortPosition(X,sortIndex) #种群排序
  89. GbestScore = copy.copy(fitness[0])
  90. GbestPositon = np.zeros([1,dim])
  91. GbestPositon[0,:] = copy.copy(X[0,:])
  92. Curve = np.zeros([Max_iter,1])
  93. for i in range(Max_iter):
  94. BestF = fitness[0]
  95. X = PDUpdate(X,PDNumber,ST,Max_iter,dim)#发现者更新
  96. X = JDUpdate(X,PDNumber,pop,dim) #加入者更新
  97. X = SDUpdate(X,pop,SDNumber,fitness,BestF) #危险更新
  98. X = BorderCheck(X,ub,lb,pop,dim) #边界检测
  99. fitness = CaculateFitness(X,fun) #计算适应度值
  100. fitness,sortIndex = SortFitness(fitness) #对适应度值排序
  101. X = SortPosition(X,sortIndex) #种群排序
  102. if(fitness[0]<=GbestScore): #更新全局最优
  103. GbestScore = copy.copy(fitness[0])
  104. GbestPositon[0,:] = copy.copy(X[0,:])
  105. Curve[i] = GbestScore
  106. return GbestScore,GbestPositon,Curve

这个模块可以写外面,从工程的角度来看放在一个py里面导入是最合理的。但是我们这是案例,考虑到简洁和易学性,所以我们都放在一个文件里面了。

优化算法定义完成后,定义目标函数

  1. #import SSA
  2. def fobj(X):
  3. s=train_fuc(mode='SSA-LSTM',batch_size=int(X[0]),epochs=int(X[1]),hidden_dim=[int(X[2]),int(X[3])],verbose=0,show_loss=False,show_fit=False)
  4. return s

 进行优化算法的训练:

GbestScore1,GbestPositon1,Curve1 = SSA(pop=2,dim=4,lb=[8,20,30,12],ub=[40,40,80,42],Max_iter=2,fun=fobj) 

 我这里由于时间问题,我种群数量pop只用了2个,一般是30个,迭代次数一般是100-200次,我就改了2次,因为新电脑的TensorFlow不支持GPU加速,算的太慢了.......就没去搜索那么多次,就填了个较小的数字做演示好了。

打印最优的参数解和最优的适应度值

  1. print('最优适应度值:',GbestScore1)
  2. GbestPositon1=[int(i)for i in GbestPositon1[0]]
  3. print('最优解为:',GbestPositon1)

 带入最优解:

  1. train_fuc(mode='SSA-LSTM',batch_size=GbestPositon1[0],epochs=GbestPositon1[1],
  2. hidden_dim=[GbestPositon1[2],GbestPositon1[3]],show_loss=True,show_fit=True,verbose=1)

虽然没搜索几次,但是这个效果还是不错的。


查看评价指标对比

好了,所有的模型都训练和预测了,评价指标都算完了,我们当然想对比了,我前面写训练函数都已经留了一手,预测的结果和效果都存下来 了,和我一样一步步运行下来的可以直接查看预测效果。

df_eval_all

还是不直观,画图吧

  1. bar_width = 0.4
  2. colors=['c', 'orange','g', 'tomato','b', 'm', 'y', 'lime', 'k','orange','pink','grey','tan']
  3. fig, ax = plt.subplots(2,2,figsize=(8,6),dpi=128)
  4. for i,col in enumerate(df_eval_all.columns):
  5. n=int(str('22')+str(i+1))
  6. plt.subplot(n)
  7. df_col=df_eval_all[col]
  8. m =np.arange(len(df_col))
  9. plt.bar(x=m,height=df_col.to_numpy(),width=bar_width,color=colors)#
  10. #plt.xlabel('Methods',fontsize=12)
  11. names=df_col.index
  12. plt.xticks(range(len(df_col)),names,fontsize=10)
  13. plt.xticks(rotation=40)
  14. plt.ylabel(col,fontsize=14)
  15. plt.tight_layout()
  16. #plt.savefig('柱状图.jpg',dpi=512)
  17. plt.show()

可以清楚地看见,SSA-lstm的效果最好,其次是GRU,然后是LSTM和attention-lstm。

所以说优化算法还是有效的,

继续画图他们的预测效果对比图:

  1. plt.figure(figsize=(10,5),dpi=256)
  2. for i,col in enumerate(df_preds_all[['MLP','GRU','LSTM','Attention-LSTM','SSA-LSTM']].columns):
  3. plt.plot(data0.index[-test_size-1:],df_preds_all[col],label=col) # ,color=colors[i]
  4. plt.plot(data0.index[-test_size-1:],y_test.reshape(-1,),label='Actual',color='k',linestyle=':',lw=2)
  5. plt.legend()
  6. plt.ylabel('wind',fontsize=16)
  7. plt.xlabel('Date',fontsize=16)
  8. #plt.savefig('点估计线对比.jpg',dpi=256)
  9. plt.show()

也可以从这个图清楚的看到预测效果对比


总结

在这个案例里面的,SSA-LSTM效果好于GRU好于LSTM和attention-LSTM,说明优化算的效果是可以的,当然同学们还有时间可以用SSA-GRU,SSA-attention-LSTM都去试试,,看谁的效果好。模型修改就该buildmodel这个函数,很简单的,搭积木,要什么层就写什么层的名字就行。
画个数据也是很容易实验的。

这是优化算法+神经网络的方法啦, 修改不同的优化算法就用自己自定义的算法替换就行,我后面的优化算法的专栏可能也会更新的,最近也有粉丝问问能不能出一个VMD或者CEEMDAN这些模态分解的对比,有时间我都写出来,可以关注我后面的文章。


创作不易,看官觉得写得还不错的话点个关注和赞吧,本人会持续更新python数据分析领域的代码文章~(需要定制类似的代码可私信)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/849199
推荐阅读
相关标签
  

闽ICP备14008679号