赞
踩
目录
1. 实例1:官方的example——lstm_stateful.py
2. 实例2:用Keras实现有状态LSTM——电量消费预测
有状态的RNN,能在训练中维护跨批次的状态信息,即为当前批次的训练数据计算的状态值,可以用作下一批次训练数据的初始隐藏状态。因为Keras RNN默认是无状态的,这需要显示设置。stateful代表除了每个样本内的时间步内传递,而且每个样本之间会有信息(c,h)传递,而stateless指的只是样本内的信息传递。
能让模型学习到你输入的samples之间的时序特征,适合一些长序列的预测,哪个sample在前,哪个sample在后对模型是有影响的。
优点:更小的网络,或更少的训练时间。
缺点:需要使用反应数据周期性的批大小来训练网络,并在每个训练批后重置状态。
stateless LSTM:
输入samples后,默认就会shuffle,可以说是每个sample独立,之间无前后关系,适合输入一些没有关系的样本。
当使用有状态 RNN 时,假定:
要在 RNN 中使用状态,需要:
重置累积状态:
在Keras中stateless LSTM中的stateless指的是?
注意,此文所说的stateful是指的在Keras中特有的,是batch之间的记忆cell状态传递。而非说的是LSTM论文模型中表示那些记忆门,遗忘门,c,h等等在同一sequence中不同timesteps时间步之间的状态传递。
假定我们的输入X是一个三维矩阵,shape = (nb_samples, timesteps, input_dim),每一个row代表一个sample,每个sample都是一个sequence小序列。X[i]表示输入矩阵中第i个sample。步长啥的我们先不用管。
当我们在默认状态stateless下,Keras会在训练每个sequence小序列(=sample)开始时,将LSTM网络中的记忆状态参数reset初始化(指的是c,h而并非权重w),即调用model.reset_states()。
https://github.com/keras-team/keras/blob/master/examples/lstm_stateful.py
代码如下:
数据集:UCI机器学习库下载 ElectricityLoadDiagrams20112014
- from __future__ import division, print_function
- from keras.layers.core import Dense
- from keras.layers.recurrent import LSTM
- from keras.models import Sequential
- from sklearn.preprocessing import MinMaxScaler
- import numpy as np
- import math
- import os
-
- DATA_DIR = "../data"
-
- data = np.load(os.path.join(DATA_DIR, "LD_250.npy"))
-
- STATELESS = False
-
- NUM_TIMESTEPS = 20
- HIDDEN_SIZE = 10
- BATCH_SIZE = 96 # 24 hours (15 min intervals)
- NUM_EPOCHS = 5
-
- # 归一化:range (0, 1)
- data = data.reshape(-1, 1)
- scaler = MinMaxScaler(feature_range=(0, 1), copy=False)
- data = scaler.fit_transform(data)
-
- # 把输入张量调整为LSTM需要的三维向量,即(样例,时间步,特征)
- X = np.zeros((data.shape[0], NUM_TIMESTEPS))
- Y = np.zeros((data.shape[0], 1))
- for i in range(len(data) - NUM_TIMESTEPS - 1):
- X[i] = data[i:i + NUM_TIMESTEPS].T
- Y[i] = data[i + NUM_TIMESTEPS + 1]
-
- # 将X变形为三维 (samples, timesteps, features)
- X = np.expand_dims(X, axis=2)
- # 在第axis=2维加上一维数据,即(data.shape[0],,NUM_TIMESTEPS,1),最后一维代表1个特征
-
- # 划分数据集
- sp = int(0.7 * len(data))
- Xtrain, Xtest, Ytrain, Ytest = X[0:sp], X[sp:], Y[0:sp], Y[sp:]
- print(Xtrain.shape, Xtest.shape, Ytrain.shape, Ytest.shape)
-
- # 无状态
- if STATELESS:
- # stateless
- model = Sequential()
- model.add(LSTM(HIDDEN_SIZE, input_shape=(NUM_TIMESTEPS, 1),
- return_sequences=False))
- model.add(Dense(1))
-
- # 有状态
- else:
- # stateful
- model = Sequential()
- model.add(LSTM(HIDDEN_SIZE, stateful=True,
- batch_input_shape=(BATCH_SIZE, NUM_TIMESTEPS, 1),
- return_sequences=False))
- model.add(Dense(1))
-
- model.compile(loss="mean_squared_error", optimizer="adam",
- metrics=["mean_squared_error"])
-
- # 无状态
- if STATELESS:
- # stateless
- model.fit(Xtrain, Ytrain, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE,
- validation_data=(Xtest, Ytest),
- shuffle=False)
-
- # 有状态
- else:
- # 将训练和测试数据设为BATCH_SIZE的倍数
- train_size = (Xtrain.shape[0] // BATCH_SIZE) * BATCH_SIZE
- test_size = (Xtest.shape[0] // BATCH_SIZE) * BATCH_SIZE
- Xtrain, Ytrain = Xtrain[0:train_size], Ytrain[0:train_size]
- Xtest, Ytest = Xtest[0:test_size], Ytest[0:test_size]
- print(Xtrain.shape, Xtest.shape, Ytrain.shape, Ytest.shape)
- for i in range(NUM_EPOCHS):
- print("Epoch {:d}/{:d}".format(i+1, NUM_EPOCHS))
- model.fit(Xtrain, Ytrain, batch_size=BATCH_SIZE, epochs=1,
- validation_data=(Xtest, Ytest),
- shuffle=False)
- model.reset_states()
-
- score, _ = model.evaluate(Xtest, Ytest, batch_size=BATCH_SIZE)
- rmse = math.sqrt(score)
- print("\nMSE: {:.3f}, RMSE: {:.3f}".format(score, rmse))
-
- pre = model.predict(Xtest, batch_size=BATCH_SIZE)
-
- plt.figure(figsize=(12,5))
- # plt.subplot(211)
- plt.plot(pre[:1000],'r',label='predctions')
- # plt.subplot(212)
- plt.plot(Ytest[:1000],label='origin')
- plt.legend()
- plt.show()
结果:
本实战代码地址:GitHub
https://github.com/youyuge34/Cosine_Stateful_Lstm/blob/master/Stateful_Lstm_Keras.ipynb
- import numpy as np
-
- from __future__ import print_function
- from standard_plots import *
-
- from keras.models import Sequential
- from keras.layers import Dense, LSTM, Dropout
- import matplotlib.pyplot as plt
-
- # 创建数据集
- dataset = np.cos(np.arange(1000)*(20*np.pi/1000))
- plt_plot(y = dataset, title = 'cos')
-
- # 转换数据为LSTM输入格式: dataset matrix
- def create_dataset(dataset, look_back=1):
- dataX, dataY = [], []
- for i in range(len(dataset)-look_back):
- dataX.append(dataset[i:(i+look_back)])
- dataY.append(dataset[i + look_back])
- return np.array(dataX), np.array(dataY)
-
-
- look_back =40
- dataset = np.cos(np.arange(1000)*(20*np.pi/1000))
-
- # 归一化,y值域为(0,1)
- dataset = (dataset+1) / 2.
-
- # split into train and test sets
- train_size = int(len(dataset) * 0.8)
- test_size = len(dataset) - train_size
- train, test = dataset[:train_size], dataset[train_size:]
-
- trainX, trainY = create_dataset(train, look_back)
- print(trainX.shape)
- print(trainY.shape)
-
- trainX = np.squeeze(trainX)
- testX = np.squeeze(testX)
- model4 = Sequential()
- model4.add(Dense(units=32,input_dim=look_back,activation="relu"))
- model4.add(Dropout(0.3))
- for i in range(2):
- model4.add(Dense(units=32,activation="relu"))
- model4.add(Dropout(0.3))
- model4.add(Dense(1))
- model4.compile(loss='mse', optimizer='adagrad')
- model4.fit(trainX, trainY, epochs=400, batch_size=32, verbose=0)
-
- x = np.hstack((trainX[-1][1:],(trainY[-1])))
- preds = []
- pred_num = 500
- for i in np.arange(pred_num):
- pred = model4.predict(x.reshape((1,-1)),batch_size = batch_size)
- preds.append(pred.squeeze())
- x = np.append(x[1:],pred)
-
- # print(preds[:20])
- # print(np.array(preds).shape)
- plt.figure(figsize=(12,5))
- plt.plot(np.arange(pred_num),np.array(preds),'r',label='predctions')
- cos_y = (np.cos(np.arange(pred_num)*(20*np.pi/1000))+1)/ 2.
- plt.plot(np.arange(pred_num),cos_y,label='origin')
- plt.legend()
- plt.show()
预测结果:
- # 20长度的滑动窗口进行预测
- look_back = 20
-
- #归一化,y值域为(0,1)
- dataset = (dataset+1) / 2.
-
- # split into train and test sets
- train_size = int(len(dataset) * 0.8)
- test_size = len(dataset) - train_size
- train, test = dataset[:train_size], dataset[train_size:]
-
- trainX, trainY = create_dataset(train, look_back)
- testX, testY = create_dataset(test, look_back)
-
- trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1))
- testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1))
- '''
- trainX.shape = (780, 20, 1)
- testX.shape = (180, 20, 1)
- trainY.shape = (780,)
- testY.shape = (180,)
- '''
-
- batch_size = 1
-
- # 建立模型
- # 无状态LSTM
- model = Sequential()
- model.add(LSTM(32, input_shape=(20, 1)))
- model.add(Dropout(0.2))
- model.add(Dense(1))
-
- model.compile(loss = 'mse', optimizer = 'adam')
- model.fit(trainX,trainY,batch_size = batch_size,epochs=30, verbose=2)
-
- # 预测
- x = np.vstack((trainX[-1][1:],(trainY[-1]))) # 获取trainX的最后一个窗口数据,来预测未来数据
- preds = []
- pred_num = 500
- for i in np.arange(pred_num):
- pred = model.predict(x.reshape((1,-1,1)),batch_size = batch_size)
- preds.append(pred.squeeze())
- x = np.vstack((x[1:],pred))
-
- # print(preds[:20])
- # print(np.array(preds).shape)
- plt.figure(figsize=(12,5))
- plt.plot(np.arange(pred_num),np.array(preds),'r',label='predctions')
- cos_y = (np.cos(np.arange(pred_num)*(20*np.pi/1000))+1)/ 2.
- plt.plot(np.arange(pred_num),cos_y,label='origin')
- plt.legend()
- plt.show()
输出结果:
- # 有状态 LSTM network
- batch_size = 1
- model2 = Sequential()
- model2.add(LSTM(32, batch_input_shape=(batch_size, look_back, 1), stateful=True))
- model2.add(Dropout(0.2))
- model2.add(Dense(1))
- model2.compile(loss='mse', optimizer='adam')
- for i in range(30):
- model2.fit(trainX, trainY, epochs=1, batch_size=batch_size, shuffle=False)
- model2.reset_states()
-
- x = np.vstack((trainX[-1][1:],(trainY[-1])))
- preds = []
- pred_num = 500
- for i in np.arange(pred_num):
- pred = model2.predict(x.reshape((1,-1,1)),batch_size = batch_size)
- preds.append(pred.squeeze())
- x = np.vstack((x[1:],pred))
-
- # print(preds[:20])
- # print(np.array(preds).shape)
- plt.figure(figsize=(12,5))
- plt.plot(np.arange(pred_num),np.array(preds),'r',label='predctions')
- cos_y = (np.cos(np.arange(pred_num)*(20*np.pi/1000))+1)/ 2.
- plt.plot(np.arange(pred_num),cos_y,label='origin')
- plt.legend()
- plt.show()
输出结果:
- look_back =40
- dataset = np.cos(np.arange(1000)*(20*np.pi/1000))
- #归一化,y值域为(0,1)
- dataset = (dataset+1) / 2.
-
- # split into train and test sets
- train_size = int(len(dataset) * 0.8)
- test_size = len(dataset) - train_size
- train, test = dataset[:train_size], dataset[train_size:]
-
- trainX, trainY = create_dataset(train, look_back)
- testX, testY = create_dataset(test, look_back)
-
- trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1))
- testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1))
-
- # 建立模型:有状态
- batch_size = 1
- model3 = Sequential()
- model3.add(LSTM(32, batch_input_shape=(batch_size, look_back, 1), stateful=True, return_sequences=True))
- model3.add(Dropout(0.3))
- model3.add(LSTM(32, batch_input_shape=(batch_size, look_back, 1), stateful=True))
- model3.add(Dropout(0.3))
- model3.add(Dense(1))
- model3.compile(loss='mean_squared_error', optimizer='adam')
- for i in range(100):
- model3.fit(trainX, trainY, epochs=1, batch_size=batch_size, verbose=0, shuffle=False)
- model3.reset_states()
-
- # 预测
- x = np.vstack((trainX[-1][1:],(trainY[-1])))
- preds = []
- pred_num = 500
- for i in np.arange(pred_num):
- pred = model3.predict(x.reshape((1,-1,1)),batch_size = batch_size)
- preds.append(pred.squeeze())
- x = np.vstack((x[1:],pred))
-
- # print(preds[:20])
- # print(np.array(preds).shape)
- plt.figure(figsize=(12,5))
- plt.plot(np.arange(pred_num),np.array(preds),'r',label='predctions')
- cos_y = (np.cos(np.arange(pred_num)*(20*np.pi/1000))+1)/ 2.
- plt.plot(np.arange(pred_num),cos_y,label='origin')
- plt.legend()
- plt.show()
输出结果:
因此,有状态模型生成的结果略优于无状态模型。
警告: 永远不要在不熟悉stateful LSTM的情况下使用它
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。