赞
踩
参考文章
源码
本文以代码注释的形式对源码进行解读。
区别于CNN网络的RNN网络对于序列数据处理具有明显的优势性,通常应用在自然语言处理领域,LSTM是特殊的RNN网络,参考文章《如何从RNN起步,一步一步通俗理解LSTM》把RNN与LSTM原理讲述清楚。
如图所示简单总结,LSTM计算单元如上图,输入序列
x
t
x_t
xt经过变化后与前序传来的
h
t
−
1
h_{t-1}
ht−1进行几个激活函数的组合计算。得到下一个隐状态
h
t
h_t
ht值,下图为变换计算通式。具体去看推荐的参考文章,里面讲的很清楚,本文重点在LSTM项目代码的解读。
源码来自github
,框架用的Keras
,先说一下这个项目的任务:
下图为LSTM模型示意图,模型的输入层尺寸为(49,1)
一次输入49个向量,每个向量长度为1,这里的49个向量,相当于上图的
x
1
,
x
2
,
x
3
,
.
.
.
.
.
.
,
x
49
x_1,x_2,x_3,......,x_{49}
x1,x2,x3,......,x49
第2
部分就是LSTM层,输入的49
个向量经过LSTM输出49个向量
每个向量长度为100,如下图所示,每个
y
t
y_t
yt的长度为100。
然后经过Droppout
再经过两次LSTM后将输出的最后一个向量取出,也就是取出
y
49
y_{49}
y49,如下图所示:
该项目是配置好的,打开项目,在配置好环境后,可直接运行run.py
,通过Debug
可以更为细致的查看代码。
下图是股票的数据集,主要用到close
闭盘价格跟Volume
成交量
下图为sinewave的正弦数据的数据集。
该项目中,将模型控制写在json
文件中:
模型文件如下:
{
"data": {
"filename": "sinewave.csv", 数据集的文件名
"columns": [
"sinewave" 数据集文件中的要读取的列文件对应的第一行的名字,股票预测时候读取`close`跟`Volume`
],
"sequence_length": 50, 输入模型的序列长度
"train_test_split": 0.85, 将数据集分为训练集跟测试集,划分的比例
"normalise": false 是否要将向量标准化
},
"training": {
"epochs": 2, 训练的迭代次数
"batch_size":32 每个批次的数据批量,一次训练丢32组序列长度为50的数据到模型里
},
"model": {
"loss": "mse", 损失函数用MSE
"optimizer": "adam",
"save_dir": "saved_models",模型保存路径
"layers": [
{
"type": "lstm", 第一个lstm模块
"neurons": 100, 神经元100个,对应文章前面所述的将50个长度为1的序列向量,处理为50个长度为100的向量
"input_timesteps": 49,输入序列数据的长度
"input_dim": 1, 输入序数据的向量维度
"return_seq": true 是否将输出的所有50个序列向量都返回
},
{
"type": "dropout",
"rate": 0.2
},
{
"type": "lstm", 第二个lstm模块
"neurons": 100,
"return_seq": true
},
{
"type": "lstm", 第三个lstm模块
"neurons": 100,
"return_seq": false 只返回最后一个向量,这时候数据为【batc_size,100】
},
{
"type": "dropout",
"rate": 0.2
},
{
"type": "dense", 全连接层
"neurons": 1,
"activation": "linear" 通过全连接层将最后一个输出序列的维度从100->1 即为预测的第51个值
}
]
}
}
加载后的数据集如下图所示:
模型输入时序序列的前49
个值,经过模型后输出一个值,这个值与第50
个值进行MSE
损失求解,然后反向梯度传播。训练模型。模型训练完毕后。模型预测时,通过1-50
获取第51个值,2-51
获取52
的值,以此类推,通过50
个输入数据,可以获取到下一段的50
个数据。
下列代码为数据加载跟预处理
这部分代码在run.py
的main
函数里
configs = json.load(open('config.json', 'r')) #打开对应的配置文件
#创建模型保存路径
if not os.path.exists(configs['model']['save_dir']): os.makedirs(configs['model']['save_dir'])
#数据加载
data = DataLoader(
os.path.join('data', configs['data']['filename']),
configs['data']['train_test_split'],
configs['data']['columns']
)
#数据集获取
x, y = data.get_train_data(
seq_len=configs['data']['sequence_length'],
normalise=configs['data']['normalise']
)
import math
import numpy as np
import pandas as pd
class DataLoader():
"""A class for loading and transforming data for the lstm model"""
def __init__(self, filename, split, cols):
#打开CSV文件
dataframe = pd.read_csv(filename)
#划分训练集跟测试集
i_split = int(len(dataframe) * split)
#获取训练集
self.data_train = dataframe.get(cols).values[:i_split]
#获取测试集
self.data_test = dataframe.get(cols).values[i_split:]
#获取数据集长度
self.len_train = len(self.data_train)
self.len_test = len(self.data_test)
self.len_train_windows = None
def get_train_data(self, seq_len, normalise):
'''
Create x, y train data windows
Warning: batch method, not generative, make sure you have enough memory to
load data, otherwise use generate_training_window() method.
'''
#创建两个装数据集的列表
data_x = []
data_y = []
for i in range(self.len_train - seq_len):
#从第一个数据遍历到最后一组数,每组按【1-50】,【2-51】,...,【:len_train - seq_len】划分
x, y = self._next_window(i, seq_len, normalise)
#将取出的数据存入列表,并且重复操作,直到全部取完。
data_x.append(x)
data_y.append(y)
#将制备好的数据集转为numpy格式后返回
return np.array(data_x), np.array(data_y)
def _next_window(self, i, seq_len, normalise):
'''Generates the next data window from the given index location i'''
#获取规定的序列长度的数据,将其作为滑动窗口。如从【1-50】滑动到【2-51】
window = self.data_train[i:i+seq_len]
#是否对这个窗口内的数据进行标准化
window = self.normalise_windows(window, single_window=True)[0] if normalise else window
#滑动窗口里最后1数据作为损失函数的真实数据也就是y,前49个数据作为输入数据,也就是x
x = window[:-1]
y = window[-1, [0]]
return x, y
model = Model()
model.build_model(configs)
进入模型Model()
,通过Model()
类中的build_model
搭建模型
import os
import math
import numpy as np
import datetime as dt
from numpy import newaxis
from core.utils import Timer
from keras.layers import Dense, Activation, Dropout, LSTM
from keras.models import Sequential, load_model
from keras.callbacks import EarlyStopping, ModelCheckpoint
class Model():
"""A class for an building and inferencing an lstm model"""
def __init__(self):
#构建一个序列化的container,可以把想要在神经网络中添加的操作都放进去,按顺序进行执行。
self.model = Sequential()
def load_model(self, filepath):
print('[Model] Loading model from file %s' % filepath)
#预测阶段用,将训练好的模型权值加载到模型中
self.model = load_model(filepath)
def build_model(self, configs):
#统计模型加载时间
timer = Timer()
timer.start()
#读取配置文件的每一层
for layer in configs['model']['layers']:
#读取神经元数量
neurons = layer['neurons'] if 'neurons' in layer else None
#读取droput的值
dropout_rate = layer['rate'] if 'rate' in layer else None
#读取激活函数
activation = layer['activation'] if 'activation' in layer else None
#读取是否返回全部序列
return_seq = layer['return_seq'] if 'return_seq' in layer else None
#读取输入序列长度
input_timesteps = layer['input_timesteps'] if 'input_timesteps' in layer else None
#读取输入序列向量的维度
input_dim = layer['input_dim'] if 'input_dim' in layer else None
#判断是否是全连接层,将其添加到模型中
if layer['type'] == 'dense':
self.model.add(Dense(neurons, activation=activation))
#判断是否是lstm,通过Keras里写好的LSTM模块将配置的参数写入到模型中
if layer['type'] == 'lstm':
self.model.add(LSTM(neurons, input_shape=(input_timesteps, input_dim), return_sequences=return_seq))
#进行droput
if layer['type'] == 'dropout':
self.model.add(Dropout(dropout_rate))
#为模型设置损失函数,梯度传播策略
self.model.compile(loss=configs['model']['loss'], optimizer=configs['model']['optimizer'])
print('[Model] Model Compiled')
#模型加载完毕,结束时间记录
timer.stop()
#这里返回模型是为了看模型的结构示意图做的步骤,可删掉
return self.model
通过model.train()
对模型进行训练
'''
#如果内存足够的话,使用这个方法
# in-memory training
model.train(
x,
y,
epochs = configs['training']['epochs'],
batch_size = configs['training']['batch_size'],
save_dir = configs['model']['save_dir']
)
'''
#如果内存不足,用生成式训练,上面的方法是一次性将数据集全部放到内存里,然后从内存里取读取数据进行训练,
#下面这个方法是先读取部分数据到内存里,用完了再从内存里读取数据。
# out-of memory generative training
# steps_per_epoch = math.ceil((data.len_train - configs['data']['sequence_length']) / configs['training']['batch_size'])
# model.train_generator(
# data_gen=data.generate_train_batch(
# seq_len=configs['data']['sequence_length'],
# batch_size=configs['training']['batch_size'],
# normalise=configs['data']['normalise']
# ),
# epochs=configs['training']['epochs'],
# batch_size=configs['training']['batch_size'],
# steps_per_epoch=steps_per_epoch,
# save_dir=configs['model']['save_dir']
# )
model.train(
x, #输入数据
y, #求损失函数时候的真实值
epochs=configs['training']['epochs'], #迭代次数
batch_size=configs['training']['batch_size'],
save_dir=configs['model']['save_dir'] #保存路径
)
下列是 model.train()
def train(self, x, y, epochs, batch_size, save_dir):
#训练计时
timer = Timer()
timer.start()
print('[Model] Training Started')
print('[Model] %s epochs, %s batch size' % (epochs, batch_size))
#训练权重的保存路径
save_fname = os.path.join(save_dir, '%s-e%s.h5' % (dt.datetime.now().strftime('%d%m%Y-%H%M%S'), str(epochs)))
#回调函数,控制训练策略用的
callbacks = [
#是否提前停止训练,判断依据,验证集的损失值,如果10轮没有降低就停止训练
EarlyStopping(monitor='val_loss', patience=10),
#是否保存最后的一轮训练权重
ModelCheckpoint(filepath=save_fname, monitor='val_loss', save_best_only=True)
]
#对模型进行训练
self.model.fit(
x,
y,
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks
)
#模型权重保存
self.model.save(save_fname)
print('[Model] Training Completed. Model saved as %s' % save_fname)
#训练结束 统计时间
timer.stop()
训练完毕后,将测试集获取好后,对模型进行预测,然后将预测结果画出来
#获取测试数据
x_test, y_test = data.get_test_data(
seq_len=configs['data']['sequence_length'],
normalise=configs['data']['normalise']
)
def get_test_data(self, seq_len, normalise):
'''
Create x, y test data windows
Warning: batch method, not generative, make sure you have enough memory to
load data, otherwise reduce size of the training split.
'''
#建立数据滑动窗口
data_windows = []
for i in range(self.len_test - seq_len):
# 从第一个数据遍历到最后一组数,每组按【1-50】,【2-51】,...,【:len_train - seq_len】划分
data_windows.append(self.data_test[i:i+seq_len])
#将数据转为浮点型的numpy格式,方便计算
data_windows = np.array(data_windows).astype(float)
#判断是否要标准化
data_windows = self.normalise_windows(data_windows, single_window=False) if normalise else data_windows
#将取出的数据存入列表,并且重复操作,直到全部取完。
x = data_windows[:, :-1]
y = data_windows[:, -1, [0]]
return x,y
下两张图是测试数据集
这里的y_test
只用在项目后续的画图可视化对比上,预测的时候不用这个数据。
#模型预测
predictions = model.predict_sequences_multiple(x_test, configs['data']['sequence_length'], configs['data']['sequence_length'])
# predictions = model.predict_sequence_full(x_test, configs['data']['sequence_length'])
# predictions = model.predict_point_by_point(x_test)
#可视化比较
plot_results_multiple(predictions, y_test, configs['data']['sequence_length'])
# plot_results(predictions, y_test)
def predict_sequences_multiple(self, data, window_size, prediction_len):
#Predict sequence of 50 steps before shifting prediction run forward by 50 steps
print('[Model] Predicting Sequences Multiple...')
#预测结果存放的列表
prediction_seqs = []
#已知预测要用的输入序列的长度,要预测几次,这里的i就是预测几次的意思
for i in range(int(len(data)/prediction_len)):
#当前的滑动窗口
curr_frame = data[i*prediction_len]
#预测存放结果
predicted = []
#输入prediction_len长度的序列,预测得到一个值,将这个值存到predicted中
#然后将所有数据向又挪一个位置,再将predicted里刚添加的数据放到当前数据的最后一位
#然后再进行下一次的预测,反复prediction_len次,最后得到prediction_len个预测结果
for j in range(prediction_len):
predicted.append(self.model.predict(curr_frame[newaxis,:,:])[0,0])
curr_frame = curr_frame[1:]
curr_frame = np.insert(curr_frame, [window_size-2], predicted[-1], axis=0)
#将这一次的预测序列添加到存放列表里。
prediction_seqs.append(predicted)
#返回预测结果
return prediction_seqs
下图为预测结果示意图
plot_results_multiple(predictions, y_test, configs['data']['sequence_length'])
def plot_results_multiple(predicted_data, true_data, prediction_len):
#画个白底图
fig = plt.figure(facecolor='white')
#添加坐标轴
ax = fig.add_subplot(111)
#将真实数据画到图里
ax.plot(true_data, label='True Data')
# Pad the list of predictions to shift it in the graph to it's correct start
#将预测结果画到图里
for i, data in enumerate(predicted_data):
padding = [None for p in range((i) * prediction_len)]
plt.plot(padding + data, label='Prediction')
plt.legend()
plt.show()
正弦数据的预测结果如下图:
股票的预测结果如下图,可以看出效果很差:
主代码run.py
__author__ = "Jakob Aungiers"
__copyright__ = "Jakob Aungiers 2018"
__version__ = "2.0.0"
__license__ = "MIT"
import os
import json
import time
import math
import matplotlib.pyplot as plt
from core.data_processor import DataLoader
from core.model import Model
from keras.utils import plot_model
def plot_results(predicted_data, true_data):
fig = plt.figure(facecolor='white')
ax = fig.add_subplot(111)
ax.plot(true_data, label='True Data')
plt.plot(predicted_data, label='Prediction')
plt.legend()
plt.show()
def plot_results_multiple(predicted_data, true_data, prediction_len):
#画个白底图
fig = plt.figure(facecolor='white')
#添加坐标轴
ax = fig.add_subplot(111)
#将真实数据画到图里
ax.plot(true_data, label='True Data')
# Pad the list of predictions to shift it in the graph to it's correct start
#将预测结果画到图里
for i, data in enumerate(predicted_data):
padding = [None for p in range((i) * prediction_len)]
plt.plot(padding + data, label='Prediction')
plt.legend()
plt.show()
def main():
configs = json.load(open('config.json', 'r')) #打开对应的配置文件
#创建模型保存路径
if not os.path.exists(configs['model']['save_dir']): os.makedirs(configs['model']['save_dir'])
#数据加载
data = DataLoader(
os.path.join('data', configs['data']['filename']),
configs['data']['train_test_split'],
configs['data']['columns']
)
model = Model()
mymodel = model.build_model(configs)
# plot_model(mymodel,to_file="model.png",show_shapes=True)
#数据集获取
x, y = data.get_train_data(
seq_len=configs['data']['sequence_length'],
normalise=configs['data']['normalise']
)
'''
#如果内存足够的话,使用这个方法
# in-memory training
model.train(
x,
y,
epochs = configs['training']['epochs'],
batch_size = configs['training']['batch_size'],
save_dir = configs['model']['save_dir']
)
'''
#如果内存不足,用生成式训练,上面的方法是一次性将数据集全部放到内存里,然后从内存里取读取数据进行训练,
#下面这个方法是先读取部分数据到内存里,用完了再从内存里读取数据。
# out-of memory generative training
# steps_per_epoch = math.ceil((data.len_train - configs['data']['sequence_length']) / configs['training']['batch_size'])
# model.train_generator(
# data_gen=data.generate_train_batch(
# seq_len=configs['data']['sequence_length'],
# batch_size=configs['training']['batch_size'],
# normalise=configs['data']['normalise']
# ),
# epochs=configs['training']['epochs'],
# batch_size=configs['training']['batch_size'],
# steps_per_epoch=steps_per_epoch,
# save_dir=configs['model']['save_dir']
# )
model.train(
x, #输入数据
y, #求损失函数时候的真实值
epochs=configs['training']['epochs'], #迭代次数
batch_size=configs['training']['batch_size'],
save_dir=configs['model']['save_dir'] #保存路径
)
#获取测试数据
x_test, y_test = data.get_test_data(
seq_len=configs['data']['sequence_length'],
normalise=configs['data']['normalise']
)
import numpy as np
# x_data = np.concatenate((x_test[0,:,:],y_test),0)
#模型预测
predictions = model.predict_sequences_multiple(x_test, configs['data']['sequence_length'], configs['data']['sequence_length'])
# predictions = model.predict_sequence_full(x_test, configs['data']['sequence_length'])
# predictions = model.predict_point_by_point(x_test)
#可视化比较
plot_results_multiple(predictions, y_test, configs['data']['sequence_length'])
# plot_results(predictions, y_test)
if __name__ == '__main__':
main()
数据加载跟处理代码data_processor.py
import math
import numpy as np
import pandas as pd
class DataLoader():
"""A class for loading and transforming data for the lstm model"""
def __init__(self, filename, split, cols):
#打开CSV文件
dataframe = pd.read_csv(filename)
#划分训练集跟测试集
i_split = int(len(dataframe) * split)
#获取训练集
self.data_train = dataframe.get(cols).values[:i_split]
#获取测试集
self.data_test = dataframe.get(cols).values[i_split:]
#获取数据集长度
self.len_train = len(self.data_train)
self.len_test = len(self.data_test)
self.len_train_windows = None
def get_test_data(self, seq_len, normalise):
'''
Create x, y test data windows
Warning: batch method, not generative, make sure you have enough memory to
load data, otherwise reduce size of the training split.
'''
#建立数据滑动窗口
data_windows = []
for i in range(self.len_test - seq_len):
# 从第一个数据遍历到最后一组数,每组按【1-50】,【2-51】,...,【:len_train - seq_len】划分
data_windows.append(self.data_test[i:i+seq_len])
#将数据转为浮点型的numpy格式,方便计算
data_windows = np.array(data_windows).astype(float)
#判断是否要标准化
data_windows = self.normalise_windows(data_windows, single_window=False) if normalise else data_windows
#将取出的数据存入列表,并且重复操作,直到全部取完。
x = data_windows[:, :-1]
y = data_windows[:, -1, [0]]
return x,y
def get_train_data(self, seq_len, normalise):
'''
Create x, y train data windows
Warning: batch method, not generative, make sure you have enough memory to
load data, otherwise use generate_training_window() method.
'''
#创建两个装数据集的列表
data_x = []
data_y = []
for i in range(self.len_train - seq_len):
#从第一个数据遍历到最后一组数,每组按【1-50】,【2-51】,...,【:len_train - seq_len】划分
x, y = self._next_window(i, seq_len, normalise)
#将取出的数据存入列表,并且重复操作,直到全部取完。
data_x.append(x)
data_y.append(y)
#将制备好的数据集转为numpy格式后返回
return np.array(data_x), np.array(data_y)
def generate_train_batch(self, seq_len, batch_size, normalise):
'''Yield a generator of training data from filename on given list of cols split for train/test'''
i = 0
while i < (self.len_train - seq_len):
x_batch = []
y_batch = []
for b in range(batch_size):
if i >= (self.len_train - seq_len):
# stop-condition for a smaller final batch if data doesn't divide evenly
yield np.array(x_batch), np.array(y_batch)
i = 0
x, y = self._next_window(i, seq_len, normalise)
x_batch.append(x)
y_batch.append(y)
i += 1
yield np.array(x_batch), np.array(y_batch)
def _next_window(self, i, seq_len, normalise):
'''Generates the next data window from the given index location i'''
#获取规定的序列长度的数据,将其作为滑动窗口。如从【1-50】滑动到【2-51】
window = self.data_train[i:i+seq_len]
#是否对这个窗口内的数据进行标准化
window = self.normalise_windows(window, single_window=True)[0] if normalise else window
#滑动窗口里最后1数据作为损失函数的真实数据也就是y,前49个数据作为输入数据,也就是x
x = window[:-1]
y = window[-1, [0]]
return x, y
def normalise_windows(self, window_data, single_window=False):
'''Normalise window with a base value of zero'''
normalised_data = []
window_data = [window_data] if single_window else window_data
for window in window_data:
normalised_window = []
for col_i in range(window.shape[1]):
normalised_col = [((float(p) / float(window[0, col_i])) - 1) for p in window[:, col_i]]
normalised_window.append(normalised_col)
normalised_window = np.array(normalised_window).T # reshape and transpose array back into original multidimensional format
normalised_data.append(normalised_window)
return np.array(normalised_data)
模型代码model.py
import os
import math
import numpy as np
import datetime as dt
from numpy import newaxis
from core.utils import Timer
from keras.layers import Dense, Activation, Dropout, LSTM
from keras.models import Sequential, load_model
from keras.callbacks import EarlyStopping, ModelCheckpoint
class Model():
"""A class for an building and inferencing an lstm model"""
def __init__(self):
#构建一个序列化的container,可以把想要在神经网络中添加的操作都放进去,按顺序进行执行。
self.model = Sequential()
def load_model(self, filepath):
print('[Model] Loading model from file %s' % filepath)
#预测阶段用,将训练好的模型权值加载到模型中
self.model = load_model(filepath)
def build_model(self, configs):
#统计模型加载时间
timer = Timer()
timer.start()
#读取配置文件的每一层
for layer in configs['model']['layers']:
#读取神经元数量
neurons = layer['neurons'] if 'neurons' in layer else None
#读取droput的值
dropout_rate = layer['rate'] if 'rate' in layer else None
#读取激活函数
activation = layer['activation'] if 'activation' in layer else None
#读取是否返回全部序列
return_seq = layer['return_seq'] if 'return_seq' in layer else None
#读取输入序列长度
input_timesteps = layer['input_timesteps'] if 'input_timesteps' in layer else None
#读取输入序列向量的维度
input_dim = layer['input_dim'] if 'input_dim' in layer else None
#判断是否是全连接层,将其添加到模型中
if layer['type'] == 'dense':
self.model.add(Dense(neurons, activation=activation))
#判断是否是lstm,通过Keras里写好的LSTM模块将配置的参数写入到模型中
if layer['type'] == 'lstm':
self.model.add(LSTM(neurons, input_shape=(input_timesteps, input_dim), return_sequences=return_seq))
#进行droput
if layer['type'] == 'dropout':
self.model.add(Dropout(dropout_rate))
#为模型设置损失函数,梯度传播策略
self.model.compile(loss=configs['model']['loss'], optimizer=configs['model']['optimizer'])
print('[Model] Model Compiled')
#模型加载完毕,结束时间记录
timer.stop()
#这里返回模型是为了看模型的结构示意图做的步骤,可删掉
return self.model
def train(self, x, y, epochs, batch_size, save_dir):
#训练计时
timer = Timer()
timer.start()
print('[Model] Training Started')
print('[Model] %s epochs, %s batch size' % (epochs, batch_size))
#训练权重的保存路径
save_fname = os.path.join(save_dir, '%s-e%s.h5' % (dt.datetime.now().strftime('%d%m%Y-%H%M%S'), str(epochs)))
#回调函数,控制训练策略用的
callbacks = [
#是否提前停止训练,判断依据,验证集的损失值,如果10轮没有降低就停止训练
EarlyStopping(monitor='val_loss', patience=10),
#是否保存最后的一轮训练权重
ModelCheckpoint(filepath=save_fname, monitor='val_loss', save_best_only=True)
]
#对模型进行训练
self.model.fit(
x,
y,
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks
)
#模型权重保存
self.model.save(save_fname)
print('[Model] Training Completed. Model saved as %s' % save_fname)
#训练结束 统计时间
timer.stop()
def train_generator(self, data_gen, epochs, batch_size, steps_per_epoch, save_dir):
timer = Timer()
timer.start()
print('[Model] Training Started')
print('[Model] %s epochs, %s batch size, %s batches per epoch' % (epochs, batch_size, steps_per_epoch))
save_fname = os.path.join(save_dir, '%s-e%s.h5' % (dt.datetime.now().strftime('%d%m%Y-%H%M%S'), str(epochs)))
callbacks = [
#每个epcho都进行保存 只保存最好的那一个
EarlyStopping(monitor='val_loss', patience=50),
ModelCheckpoint(filepath=save_fname, monitor='loss', save_best_only=True)
]
self.model.fit_generator(
data_gen,
steps_per_epoch=steps_per_epoch,
epochs=epochs,
callbacks=callbacks,
workers=1
)
print('[Model] Training Completed. Model saved as %s' % save_fname)
timer.stop()
def predict_point_by_point(self, data):
#Predict each timestep given the last sequence of true data, in effect only predicting 1 step ahead each time
print('[Model] Predicting Point-by-Point...')
predicted = self.model.predict(data)
predicted = np.reshape(predicted, (predicted.size,))
return predicted
def predict_sequences_multiple(self, data, window_size, prediction_len):
#Predict sequence of 50 steps before shifting prediction run forward by 50 steps
print('[Model] Predicting Sequences Multiple...')
#预测结果存放的列表
prediction_seqs = []
#已知预测要用的输入序列的长度,要预测几次,这里的i就是预测几次的意思
for i in range(int(len(data)/prediction_len)):
#当前的滑动窗口
curr_frame = data[i*prediction_len]
#预测存放结果
predicted = []
#输入prediction_len长度的序列,预测得到一个值,将这个值存到predicted中
#然后将所有数据向又挪一个位置,再将predicted里刚添加的数据放到当前数据的最后一位
#然后再进行下一次的预测,反复prediction_len次,最后得到prediction_len个预测结果
for j in range(prediction_len):
predicted.append(self.model.predict(curr_frame[newaxis,:,:])[0,0])
curr_frame = curr_frame[1:]
curr_frame = np.insert(curr_frame, [window_size-2], predicted[-1], axis=0)
#将这一次的预测序列添加到存放列表里。
prediction_seqs.append(predicted)
#返回预测结果
return prediction_seqs
def predict_sequence_full(self, data, window_size):
#Shift the window by 1 new prediction each time, re-run predictions on new window
print('[Model] Predicting Sequences Full...')
curr_frame = data[0]
predicted = []
for i in range(len(data)):
predicted.append(self.model.predict(curr_frame[newaxis,:,:])[0,0])
curr_frame = curr_frame[1:]
curr_frame = np.insert(curr_frame, [window_size-2], predicted[-1], axis=0)
return predicted
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。