当前位置:   article > 正文

LSTM实例代码解读-正弦数据预测、股票预测-pytorch_lstm+attention的pytorch股票预测代码

lstm+attention的pytorch股票预测代码

导读

参考文章
源码
本文以代码注释的形式对源码进行解读。
在这里插入图片描述
区别于CNN网络的RNN网络对于序列数据处理具有明显的优势性,通常应用在自然语言处理领域,LSTM是特殊的RNN网络,参考文章《如何从RNN起步,一步一步通俗理解LSTM》把RNN与LSTM原理讲述清楚。
如图所示简单总结,LSTM计算单元如上图,输入序列 x t x_t xt经过变化后与前序传来的 h t − 1 h_{t-1} ht1进行几个激活函数的组合计算。得到下一个隐状态 h t h_t ht值,下图为变换计算通式。具体去看推荐的参考文章,里面讲的很清楚,本文重点在LSTM项目代码的解读。
在这里插入图片描述
源码来自github,框架用的Keras,先说一下这个项目的任务:

  • 正弦函数的预测
  • 股票预测
    具体操作为,将正弦函数或股票的数据划分为50个一组的数据集,然后将数据集输入到训练好的模型,训练完毕后,通过模型输入新的一组50个数据,预测出下一组50个数据的值。

下图为LSTM模型示意图,模型的输入层尺寸为(49,1)一次输入49个向量,每个向量长度为1,这里的49个向量,相当于上图的 x 1 , x 2 , x 3 , . . . . . . , x 49 x_1,x_2,x_3,......,x_{49} x1,x2,x3,......,x49
在这里插入图片描述
2部分就是LSTM层,输入的49个向量经过LSTM输出49个向量每个向量长度为100,如下图所示,每个 y t y_t yt的长度为100。
在这里插入图片描述
然后经过Droppout再经过两次LSTM后将输出的最后一个向量取出,也就是取出 y 49 y_{49} y49,如下图所示:在这里插入图片描述

项目简读

该项目是配置好的,打开项目,在配置好环境后,可直接运行run.py,通过Debug可以更为细致的查看代码。
在这里插入图片描述

下图是股票的数据集,主要用到close闭盘价格跟Volume成交量
在这里插入图片描述
下图为sinewave的正弦数据的数据集。
在这里插入图片描述

模型搭建跟数据预处理

该项目中,将模型控制写在json文件中:
在这里插入图片描述
模型文件如下:

{
	"data": {
		"filename": "sinewave.csv",   数据集的文件名
		"columns": [
			"sinewave"				数据集文件中的要读取的列文件对应的第一行的名字,股票预测时候读取`close`跟`Volume`
		],
		"sequence_length": 50,		输入模型的序列长度
		"train_test_split": 0.85,   将数据集分为训练集跟测试集,划分的比例
		"normalise": false			是否要将向量标准化
	},
	"training": {
		"epochs": 2,				训练的迭代次数
		"batch_size":32				每个批次的数据批量,一次训练丢32组序列长度为50的数据到模型里
	},
	"model": {
		"loss": "mse",				损失函数用MSE
		"optimizer": "adam",		
		"save_dir": "saved_models",模型保存路径
		"layers": [
			{
				"type": "lstm",		 第一个lstm模块
				"neurons": 100,		 神经元100个,对应文章前面所述的将50个长度为1的序列向量,处理为50个长度为100的向量
				"input_timesteps": 49,输入序列数据的长度
				"input_dim": 1,		  输入序数据的向量维度
				"return_seq": true	  是否将输出的所有50个序列向量都返回
			},
			{
				"type": "dropout",
				"rate": 0.2
			},
			{
				"type": "lstm",		第二个lstm模块
				"neurons": 100,
				"return_seq": true
			},
			{
				"type": "lstm",		第三个lstm模块
				"neurons": 100,
				"return_seq": false 只返回最后一个向量,这时候数据为【batc_size,100},
			{
				"type": "dropout",
				"rate": 0.2
			},
			{
				"type": "dense", 全连接层
				"neurons": 1,	
				"activation": "linear" 通过全连接层将最后一个输出序列的维度从100->1 即为预测的第51个值
			}
		]
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

加载后的数据集如下图所示:
在这里插入图片描述
在这里插入图片描述

模型输入时序序列的前49个值,经过模型后输出一个值,这个值与第50个值进行MSE损失求解,然后反向梯度传播。训练模型。模型训练完毕后。模型预测时,通过1-50获取第51个值,2-51获取52的值,以此类推,通过50个输入数据,可以获取到下一段的50个数据。

数据加载

下列代码为数据加载跟预处理
这部分代码在run.pymain函数里

    configs = json.load(open('config.json', 'r')) #打开对应的配置文件
    #创建模型保存路径
    if not os.path.exists(configs['model']['save_dir']): os.makedirs(configs['model']['save_dir'])
    #数据加载
    data = DataLoader(
        os.path.join('data', configs['data']['filename']),
        configs['data']['train_test_split'],
        configs['data']['columns']
    )
    #数据集获取
    x, y = data.get_train_data(
        seq_len=configs['data']['sequence_length'],
        normalise=configs['data']['normalise']
    )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
import math
import numpy as np
import pandas as pd

class DataLoader():
    """A class for loading and transforming data for the lstm model"""

    def __init__(self, filename, split, cols):
        #打开CSV文件
        dataframe = pd.read_csv(filename)
        #划分训练集跟测试集
        i_split = int(len(dataframe) * split)
        #获取训练集
        self.data_train = dataframe.get(cols).values[:i_split]
        #获取测试集
        self.data_test  = dataframe.get(cols).values[i_split:]
        #获取数据集长度
        self.len_train  = len(self.data_train)
        self.len_test   = len(self.data_test)
        self.len_train_windows = None

    def get_train_data(self, seq_len, normalise):
        '''
        Create x, y train data windows
        Warning: batch method, not generative, make sure you have enough memory to
        load data, otherwise use generate_training_window() method.
        '''
        #创建两个装数据集的列表
        data_x = []
        data_y = []
        for i in range(self.len_train - seq_len):
            #从第一个数据遍历到最后一组数,每组按【1-50】,【2-51】,...,【:len_train - seq_len】划分
            x, y = self._next_window(i, seq_len, normalise)
            #将取出的数据存入列表,并且重复操作,直到全部取完。
            data_x.append(x)
            data_y.append(y)
        #将制备好的数据集转为numpy格式后返回
        return np.array(data_x), np.array(data_y)


    def _next_window(self, i, seq_len, normalise):
        '''Generates the next data window from the given index location i'''
        #获取规定的序列长度的数据,将其作为滑动窗口。如从【1-50】滑动到【2-51】
        window = self.data_train[i:i+seq_len]
        #是否对这个窗口内的数据进行标准化
        window = self.normalise_windows(window, single_window=True)[0] if normalise else window
        #滑动窗口里最后1数据作为损失函数的真实数据也就是y,前49个数据作为输入数据,也就是x
        x = window[:-1]
        y = window[-1, [0]]
        return x, y

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

模型搭建

	 model = Model()
	 model.build_model(configs)
  • 1
  • 2

进入模型Model(),通过Model()类中的build_model搭建模型

import os
import math
import numpy as np
import datetime as dt
from numpy import newaxis
from core.utils import Timer
from keras.layers import Dense, Activation, Dropout, LSTM
from keras.models import Sequential, load_model
from keras.callbacks import EarlyStopping, ModelCheckpoint

class Model():
	"""A class for an building and inferencing an lstm model"""

	def __init__(self):
	#构建一个序列化的container,可以把想要在神经网络中添加的操作都放进去,按顺序进行执行。
		self.model = Sequential()

	def load_model(self, filepath):
		print('[Model] Loading model from file %s' % filepath)
		#预测阶段用,将训练好的模型权值加载到模型中
		self.model = load_model(filepath)

	def build_model(self, configs):
		#统计模型加载时间
		timer = Timer()
		timer.start()
		#读取配置文件的每一层
		for layer in configs['model']['layers']:
			#读取神经元数量
			neurons = layer['neurons'] if 'neurons' in layer else None
			#读取droput的值
			dropout_rate = layer['rate'] if 'rate' in layer else None
			#读取激活函数
			activation = layer['activation'] if 'activation' in layer else None
			#读取是否返回全部序列
			return_seq = layer['return_seq'] if 'return_seq' in layer else None
			#读取输入序列长度
			input_timesteps = layer['input_timesteps'] if 'input_timesteps' in layer else None
			#读取输入序列向量的维度
			input_dim = layer['input_dim'] if 'input_dim' in layer else None
			#判断是否是全连接层,将其添加到模型中
			if layer['type'] == 'dense':
				self.model.add(Dense(neurons, activation=activation))
			#判断是否是lstm,通过Keras里写好的LSTM模块将配置的参数写入到模型中
			if layer['type'] == 'lstm':
				self.model.add(LSTM(neurons, input_shape=(input_timesteps, input_dim), return_sequences=return_seq))
			#进行droput
			if layer['type'] == 'dropout':
				self.model.add(Dropout(dropout_rate))
		#为模型设置损失函数,梯度传播策略
		self.model.compile(loss=configs['model']['loss'], optimizer=configs['model']['optimizer'])
		
		print('[Model] Model Compiled')
		#模型加载完毕,结束时间记录
		timer.stop()
		#这里返回模型是为了看模型的结构示意图做的步骤,可删掉
		return self.model
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

模型训练

通过model.train()对模型进行训练

  '''
    #如果内存足够的话,使用这个方法
	# in-memory training
	model.train(
		x,
		y,
		epochs = configs['training']['epochs'],
		batch_size = configs['training']['batch_size'],
		save_dir = configs['model']['save_dir']
	)
	'''
    #如果内存不足,用生成式训练,上面的方法是一次性将数据集全部放到内存里,然后从内存里取读取数据进行训练,
    #下面这个方法是先读取部分数据到内存里,用完了再从内存里读取数据。
    # out-of memory generative training
    # steps_per_epoch = math.ceil((data.len_train - configs['data']['sequence_length']) / configs['training']['batch_size'])
    # model.train_generator(
    #     data_gen=data.generate_train_batch(
    #         seq_len=configs['data']['sequence_length'],
    #         batch_size=configs['training']['batch_size'],
    #         normalise=configs['data']['normalise']
    #     ),
    #     epochs=configs['training']['epochs'],
    #     batch_size=configs['training']['batch_size'],
    #     steps_per_epoch=steps_per_epoch,
    #     save_dir=configs['model']['save_dir']
    # )
    model.train(
        x,                                          #输入数据
        y,                                          #求损失函数时候的真实值
        epochs=configs['training']['epochs'],       #迭代次数
        batch_size=configs['training']['batch_size'],
        save_dir=configs['model']['save_dir']       #保存路径
    )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

下列是 model.train()

	def train(self, x, y, epochs, batch_size, save_dir):
		#训练计时
		timer = Timer()
		timer.start()
		print('[Model] Training Started')
		print('[Model] %s epochs, %s batch size' % (epochs, batch_size))
		#训练权重的保存路径
		save_fname = os.path.join(save_dir, '%s-e%s.h5' % (dt.datetime.now().strftime('%d%m%Y-%H%M%S'), str(epochs)))
		#回调函数,控制训练策略用的
		callbacks = [
			#是否提前停止训练,判断依据,验证集的损失值,如果10轮没有降低就停止训练
			EarlyStopping(monitor='val_loss', patience=10),
			#是否保存最后的一轮训练权重
			ModelCheckpoint(filepath=save_fname, monitor='val_loss', save_best_only=True)
		]
		#对模型进行训练
		self.model.fit(
			x,
			y,
			epochs=epochs,
			batch_size=batch_size,
			callbacks=callbacks
		)
		#模型权重保存
		self.model.save(save_fname)

		print('[Model] Training Completed. Model saved as %s' % save_fname)
		#训练结束 统计时间
		timer.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

模型预测

训练完毕后,将测试集获取好后,对模型进行预测,然后将预测结果画出来

    #获取测试数据
    x_test, y_test = data.get_test_data(
        seq_len=configs['data']['sequence_length'],
        normalise=configs['data']['normalise']
    )
  • 1
  • 2
  • 3
  • 4
  • 5
    def get_test_data(self, seq_len, normalise):
        '''
        Create x, y test data windows
        Warning: batch method, not generative, make sure you have enough memory to
        load data, otherwise reduce size of the training split.
        '''
        #建立数据滑动窗口
        data_windows = []
        for i in range(self.len_test - seq_len):
            # 从第一个数据遍历到最后一组数,每组按【1-50】,【2-51】,...,【:len_train - seq_len】划分
            data_windows.append(self.data_test[i:i+seq_len])
        #将数据转为浮点型的numpy格式,方便计算
        data_windows = np.array(data_windows).astype(float)
        #判断是否要标准化
        data_windows = self.normalise_windows(data_windows, single_window=False) if normalise else data_windows
        #将取出的数据存入列表,并且重复操作,直到全部取完。
        x = data_windows[:, :-1]
        y = data_windows[:, -1, [0]]
        return x,y
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

下两张图是测试数据集

在这里插入图片描述
这里的y_test只用在项目后续的画图可视化对比上,预测的时候不用这个数据。

在这里插入图片描述

    #模型预测
    predictions = model.predict_sequences_multiple(x_test, configs['data']['sequence_length'], configs['data']['sequence_length'])
    # predictions = model.predict_sequence_full(x_test, configs['data']['sequence_length'])
    # predictions = model.predict_point_by_point(x_test)
    #可视化比较
    plot_results_multiple(predictions, y_test, configs['data']['sequence_length'])
    # plot_results(predictions, y_test)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
	def predict_sequences_multiple(self, data, window_size, prediction_len):
		#Predict sequence of 50 steps before shifting prediction run forward by 50 steps
		print('[Model] Predicting Sequences Multiple...')
		#预测结果存放的列表
		prediction_seqs = []
		#已知预测要用的输入序列的长度,要预测几次,这里的i就是预测几次的意思
		for i in range(int(len(data)/prediction_len)):
			#当前的滑动窗口
			curr_frame = data[i*prediction_len]
			#预测存放结果
			predicted = []
			#输入prediction_len长度的序列,预测得到一个值,将这个值存到predicted中
			#然后将所有数据向又挪一个位置,再将predicted里刚添加的数据放到当前数据的最后一位
			#然后再进行下一次的预测,反复prediction_len次,最后得到prediction_len个预测结果
			for j in range(prediction_len):
				predicted.append(self.model.predict(curr_frame[newaxis,:,:])[0,0])
				curr_frame = curr_frame[1:]
				curr_frame = np.insert(curr_frame, [window_size-2], predicted[-1], axis=0)
			#将这一次的预测序列添加到存放列表里。
			prediction_seqs.append(predicted)
		#返回预测结果
		return prediction_seqs
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

下图为预测结果示意图
在这里插入图片描述

可视化对比

    plot_results_multiple(predictions, y_test, configs['data']['sequence_length'])
  • 1
def plot_results_multiple(predicted_data, true_data, prediction_len):
    #画个白底图
    fig = plt.figure(facecolor='white')
    #添加坐标轴
    ax = fig.add_subplot(111)
    #将真实数据画到图里
    ax.plot(true_data, label='True Data')
	# Pad the list of predictions to shift it in the graph to it's correct start
    #将预测结果画到图里
    for i, data in enumerate(predicted_data):
        padding = [None for p in range((i) * prediction_len)]
        plt.plot(padding + data, label='Prediction')
        plt.legend()
    plt.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

正弦数据的预测结果如下图:
在这里插入图片描述
股票的预测结果如下图,可以看出效果很差:

在这里插入图片描述

完整代码

主代码run.py

__author__ = "Jakob Aungiers"
__copyright__ = "Jakob Aungiers 2018"
__version__ = "2.0.0"
__license__ = "MIT"

import os
import json
import time
import math
import matplotlib.pyplot as plt
from core.data_processor import DataLoader
from core.model import Model
from keras.utils import plot_model


def plot_results(predicted_data, true_data):
    fig = plt.figure(facecolor='white')
    ax = fig.add_subplot(111)
    ax.plot(true_data, label='True Data')
    plt.plot(predicted_data, label='Prediction')
    plt.legend()
    plt.show()


def plot_results_multiple(predicted_data, true_data, prediction_len):
    #画个白底图
    fig = plt.figure(facecolor='white')
    #添加坐标轴
    ax = fig.add_subplot(111)
    #将真实数据画到图里
    ax.plot(true_data, label='True Data')
	# Pad the list of predictions to shift it in the graph to it's correct start
    #将预测结果画到图里
    for i, data in enumerate(predicted_data):
        padding = [None for p in range((i) * prediction_len)]
        plt.plot(padding + data, label='Prediction')
        plt.legend()
    plt.show()


def main():

    configs = json.load(open('config.json', 'r')) #打开对应的配置文件
    #创建模型保存路径
    if not os.path.exists(configs['model']['save_dir']): os.makedirs(configs['model']['save_dir'])
    #数据加载
    data = DataLoader(
        os.path.join('data', configs['data']['filename']),
        configs['data']['train_test_split'],
        configs['data']['columns']
    )
    model = Model()
    mymodel =  model.build_model(configs)

    # plot_model(mymodel,to_file="model.png",show_shapes=True)

    #数据集获取
    x, y = data.get_train_data(
        seq_len=configs['data']['sequence_length'],
        normalise=configs['data']['normalise']
    )

    '''
    #如果内存足够的话,使用这个方法
	# in-memory training
	model.train(
		x,
		y,
		epochs = configs['training']['epochs'],
		batch_size = configs['training']['batch_size'],
		save_dir = configs['model']['save_dir']
	)
	'''
    #如果内存不足,用生成式训练,上面的方法是一次性将数据集全部放到内存里,然后从内存里取读取数据进行训练,
    #下面这个方法是先读取部分数据到内存里,用完了再从内存里读取数据。
    # out-of memory generative training
    # steps_per_epoch = math.ceil((data.len_train - configs['data']['sequence_length']) / configs['training']['batch_size'])
    # model.train_generator(
    #     data_gen=data.generate_train_batch(
    #         seq_len=configs['data']['sequence_length'],
    #         batch_size=configs['training']['batch_size'],
    #         normalise=configs['data']['normalise']
    #     ),
    #     epochs=configs['training']['epochs'],
    #     batch_size=configs['training']['batch_size'],
    #     steps_per_epoch=steps_per_epoch,
    #     save_dir=configs['model']['save_dir']
    # )

    model.train(
        x,                                          #输入数据
        y,                                          #求损失函数时候的真实值
        epochs=configs['training']['epochs'],       #迭代次数
        batch_size=configs['training']['batch_size'],
        save_dir=configs['model']['save_dir']       #保存路径
    )

    #获取测试数据
    x_test, y_test = data.get_test_data(
        seq_len=configs['data']['sequence_length'],
        normalise=configs['data']['normalise']
    )

    import numpy as np
    # x_data = np.concatenate((x_test[0,:,:],y_test),0)
    #模型预测
    predictions = model.predict_sequences_multiple(x_test, configs['data']['sequence_length'], configs['data']['sequence_length'])
    # predictions = model.predict_sequence_full(x_test, configs['data']['sequence_length'])
    # predictions = model.predict_point_by_point(x_test)
    #可视化比较
    plot_results_multiple(predictions, y_test, configs['data']['sequence_length'])
    # plot_results(predictions, y_test)


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116

数据加载跟处理代码data_processor.py

import math
import numpy as np
import pandas as pd

class DataLoader():
    """A class for loading and transforming data for the lstm model"""

    def __init__(self, filename, split, cols):
        #打开CSV文件
        dataframe = pd.read_csv(filename)
        #划分训练集跟测试集
        i_split = int(len(dataframe) * split)
        #获取训练集
        self.data_train = dataframe.get(cols).values[:i_split]
        #获取测试集
        self.data_test  = dataframe.get(cols).values[i_split:]
        #获取数据集长度
        self.len_train  = len(self.data_train)
        self.len_test   = len(self.data_test)
        self.len_train_windows = None

    def get_test_data(self, seq_len, normalise):
        '''
        Create x, y test data windows
        Warning: batch method, not generative, make sure you have enough memory to
        load data, otherwise reduce size of the training split.
        '''
        #建立数据滑动窗口
        data_windows = []
        for i in range(self.len_test - seq_len):
            # 从第一个数据遍历到最后一组数,每组按【1-50】,【2-51】,...,【:len_train - seq_len】划分
            data_windows.append(self.data_test[i:i+seq_len])
        #将数据转为浮点型的numpy格式,方便计算
        data_windows = np.array(data_windows).astype(float)
        #判断是否要标准化
        data_windows = self.normalise_windows(data_windows, single_window=False) if normalise else data_windows
        #将取出的数据存入列表,并且重复操作,直到全部取完。
        x = data_windows[:, :-1]
        y = data_windows[:, -1, [0]]
        return x,y

    def get_train_data(self, seq_len, normalise):
        '''
        Create x, y train data windows
        Warning: batch method, not generative, make sure you have enough memory to
        load data, otherwise use generate_training_window() method.
        '''
        #创建两个装数据集的列表
        data_x = []
        data_y = []
        for i in range(self.len_train - seq_len):
            #从第一个数据遍历到最后一组数,每组按【1-50】,【2-51】,...,【:len_train - seq_len】划分
            x, y = self._next_window(i, seq_len, normalise)
            #将取出的数据存入列表,并且重复操作,直到全部取完。
            data_x.append(x)
            data_y.append(y)
        #将制备好的数据集转为numpy格式后返回
        return np.array(data_x), np.array(data_y)

    def generate_train_batch(self, seq_len, batch_size, normalise):
        '''Yield a generator of training data from filename on given list of cols split for train/test'''
        i = 0
        while i < (self.len_train - seq_len):
            x_batch = []
            y_batch = []
            for b in range(batch_size):
                if i >= (self.len_train - seq_len):
                    # stop-condition for a smaller final batch if data doesn't divide evenly
                    yield np.array(x_batch), np.array(y_batch)
                    i = 0
                x, y = self._next_window(i, seq_len, normalise)
                x_batch.append(x)
                y_batch.append(y)
                i += 1
            yield np.array(x_batch), np.array(y_batch)

    def _next_window(self, i, seq_len, normalise):
        '''Generates the next data window from the given index location i'''
        #获取规定的序列长度的数据,将其作为滑动窗口。如从【1-50】滑动到【2-51】
        window = self.data_train[i:i+seq_len]
        #是否对这个窗口内的数据进行标准化
        window = self.normalise_windows(window, single_window=True)[0] if normalise else window
        #滑动窗口里最后1数据作为损失函数的真实数据也就是y,前49个数据作为输入数据,也就是x
        x = window[:-1]
        y = window[-1, [0]]
        return x, y

    def normalise_windows(self, window_data, single_window=False):
        '''Normalise window with a base value of zero'''
        normalised_data = []
        window_data = [window_data] if single_window else window_data
        for window in window_data:
            normalised_window = []
            for col_i in range(window.shape[1]):
                normalised_col = [((float(p) / float(window[0, col_i])) - 1) for p in window[:, col_i]]
                normalised_window.append(normalised_col)
            normalised_window = np.array(normalised_window).T # reshape and transpose array back into original multidimensional format
            normalised_data.append(normalised_window)
        return np.array(normalised_data)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

模型代码model.py

import os
import math
import numpy as np
import datetime as dt
from numpy import newaxis
from core.utils import Timer
from keras.layers import Dense, Activation, Dropout, LSTM
from keras.models import Sequential, load_model
from keras.callbacks import EarlyStopping, ModelCheckpoint

class Model():
	"""A class for an building and inferencing an lstm model"""

	def __init__(self):
		#构建一个序列化的container,可以把想要在神经网络中添加的操作都放进去,按顺序进行执行。
		self.model = Sequential()

	def load_model(self, filepath):
		print('[Model] Loading model from file %s' % filepath)
		#预测阶段用,将训练好的模型权值加载到模型中
		self.model = load_model(filepath)

	def build_model(self, configs):
		#统计模型加载时间
		timer = Timer()
		timer.start()
		#读取配置文件的每一层
		for layer in configs['model']['layers']:
			#读取神经元数量
			neurons = layer['neurons'] if 'neurons' in layer else None
			#读取droput的值
			dropout_rate = layer['rate'] if 'rate' in layer else None
			#读取激活函数
			activation = layer['activation'] if 'activation' in layer else None
			#读取是否返回全部序列
			return_seq = layer['return_seq'] if 'return_seq' in layer else None
			#读取输入序列长度
			input_timesteps = layer['input_timesteps'] if 'input_timesteps' in layer else None
			#读取输入序列向量的维度
			input_dim = layer['input_dim'] if 'input_dim' in layer else None
			#判断是否是全连接层,将其添加到模型中
			if layer['type'] == 'dense':
				self.model.add(Dense(neurons, activation=activation))
			#判断是否是lstm,通过Keras里写好的LSTM模块将配置的参数写入到模型中
			if layer['type'] == 'lstm':
				self.model.add(LSTM(neurons, input_shape=(input_timesteps, input_dim), return_sequences=return_seq))
			#进行droput
			if layer['type'] == 'dropout':
				self.model.add(Dropout(dropout_rate))
		#为模型设置损失函数,梯度传播策略
		self.model.compile(loss=configs['model']['loss'], optimizer=configs['model']['optimizer'])

		print('[Model] Model Compiled')
		#模型加载完毕,结束时间记录
		timer.stop()
		#这里返回模型是为了看模型的结构示意图做的步骤,可删掉
		return self.model

	def train(self, x, y, epochs, batch_size, save_dir):
		#训练计时
		timer = Timer()
		timer.start()
		print('[Model] Training Started')
		print('[Model] %s epochs, %s batch size' % (epochs, batch_size))
		#训练权重的保存路径
		save_fname = os.path.join(save_dir, '%s-e%s.h5' % (dt.datetime.now().strftime('%d%m%Y-%H%M%S'), str(epochs)))
		#回调函数,控制训练策略用的
		callbacks = [
			#是否提前停止训练,判断依据,验证集的损失值,如果10轮没有降低就停止训练
			EarlyStopping(monitor='val_loss', patience=10),
			#是否保存最后的一轮训练权重
			ModelCheckpoint(filepath=save_fname, monitor='val_loss', save_best_only=True)
		]
		#对模型进行训练
		self.model.fit(
			x,
			y,
			epochs=epochs,
			batch_size=batch_size,
			callbacks=callbacks
		)
		#模型权重保存
		self.model.save(save_fname)

		print('[Model] Training Completed. Model saved as %s' % save_fname)
		#训练结束 统计时间
		timer.stop()

	def train_generator(self, data_gen, epochs, batch_size, steps_per_epoch, save_dir):
		timer = Timer()
		timer.start()
		print('[Model] Training Started')
		print('[Model] %s epochs, %s batch size, %s batches per epoch' % (epochs, batch_size, steps_per_epoch))
		
		save_fname = os.path.join(save_dir, '%s-e%s.h5' % (dt.datetime.now().strftime('%d%m%Y-%H%M%S'), str(epochs)))
		callbacks = [
			#每个epcho都进行保存 只保存最好的那一个
			EarlyStopping(monitor='val_loss', patience=50),
			ModelCheckpoint(filepath=save_fname, monitor='loss', save_best_only=True)
		]
		self.model.fit_generator(
			data_gen,
			steps_per_epoch=steps_per_epoch,
			epochs=epochs,
			callbacks=callbacks,
			workers=1
		)
		
		print('[Model] Training Completed. Model saved as %s' % save_fname)
		timer.stop()

	def predict_point_by_point(self, data):
		#Predict each timestep given the last sequence of true data, in effect only predicting 1 step ahead each time
		print('[Model] Predicting Point-by-Point...')
		predicted = self.model.predict(data)
		predicted = np.reshape(predicted, (predicted.size,))
		return predicted

	def predict_sequences_multiple(self, data, window_size, prediction_len):
		#Predict sequence of 50 steps before shifting prediction run forward by 50 steps
		print('[Model] Predicting Sequences Multiple...')
		#预测结果存放的列表
		prediction_seqs = []
		#已知预测要用的输入序列的长度,要预测几次,这里的i就是预测几次的意思
		for i in range(int(len(data)/prediction_len)):
			#当前的滑动窗口
			curr_frame = data[i*prediction_len]
			#预测存放结果
			predicted = []
			#输入prediction_len长度的序列,预测得到一个值,将这个值存到predicted中
			#然后将所有数据向又挪一个位置,再将predicted里刚添加的数据放到当前数据的最后一位
			#然后再进行下一次的预测,反复prediction_len次,最后得到prediction_len个预测结果
			for j in range(prediction_len):
				predicted.append(self.model.predict(curr_frame[newaxis,:,:])[0,0])
				curr_frame = curr_frame[1:]
				curr_frame = np.insert(curr_frame, [window_size-2], predicted[-1], axis=0)
			#将这一次的预测序列添加到存放列表里。
			prediction_seqs.append(predicted)
		#返回预测结果
		return prediction_seqs

	def predict_sequence_full(self, data, window_size):
		#Shift the window by 1 new prediction each time, re-run predictions on new window
		print('[Model] Predicting Sequences Full...')
		curr_frame = data[0]
		predicted = []
		for i in range(len(data)):
			predicted.append(self.model.predict(curr_frame[newaxis,:,:])[0,0])
			curr_frame = curr_frame[1:]
			curr_frame = np.insert(curr_frame, [window_size-2], predicted[-1], axis=0)
		return predicted
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/167942
推荐阅读
相关标签
  

闽ICP备14008679号