赞
踩
GRU是LSTM网络的一种效果很好的变体,它较LSTM网络的结构更加简单,而且效果也很好,因此也是当前非常流形的一种网络。GRU既然是LSTM的变体,因此也是可以解决RNN网络中的长依赖问题。
在LSTM中引入了三个门函数:输入门、遗忘门和输出门来控制输入值、记忆值和输出值。而在GRU模型中只有两个门:分别是更新门和重置门。具体结构如下图所示:
图中的update gate和reset gate分别表示更新门和重置门。
更新门的作用类似于LSTM的遗忘和输入门。它决定前一时刻的状态信息哪些被丢弃和哪些被更新新信息,更新门的值越大说明前一时刻的状态信息带入越多。
重置门控制前一状态有多少信息被写入到当前的候选集
h
t
~
\tilde{h_{t}}
ht~上,重置门越小,前一状态的信息被写入的越少。
详见下图:
GRU使用门控机制学习长期依赖关系的基本思想和 LSTM 一致,但还是有一些关键区别:
1、GRU 有两个门(重置门与更新门),而 LSTM 有三个门(输入门、遗忘门和输出门)。
2、GRU 并不会控制并保留内部记忆(c_t),且没有 LSTM 中的输出门。
3、LSTM 中的输入与遗忘门对应于 GRU 的更新门,重置门直接作用于前面的隐藏状态。
4、在计算输出时并不应用二阶非线性。
由于 GRU 参数更少,收敛速度更快,因此其实际花费时间要少很多,这可以大大加速了我们的迭代过程。 而从表现上讲,二者之间孰优孰劣并没有定论,这要依据具体的任务和数据集而定,而实际上,二者之间的 performance 差距往往并不大,远没有调参所带来的效果明显,我通常的做法,首先选择GRU作为基本的单元,因为其收敛速度快,可以加速试验进程,快速迭代,而我认为快速迭代这一特点很重要。如果实现没其余优化技巧,才会尝试将 GRU 换为 LSTM,看看有没有变化。
数据的下载地址:http://quotes.money.163.com/trade/lsjysj_zhishu_000001.html
首先选择查询数据的范围,然后选择下载数据:
点击下载数据后,弹出个框,这个框里可以看到起止日期,数据的列,默认勾选全部,然后单击下载。
将下载后的数据放到工程的根目录下面,然后做一些处理。
第一步 读入csv文件,编码方式设置为gbk。
第二步 将日期列转为“年-月-日”这样的格式。
第三步 根据日期排序,数据默认是从降序,改为升序。
最后 保存到“sh.csv”中。
代码如下:
import pandas as pd
df = pd.read_csv("000001.csv", encoding='gbk')
df["日期"] = pd.to_datetime(df["日期"], format="%Y-%m-%d")
df.sort_values(by=["日期"], ascending=True)
df = df.sort_values(by=["日期"], ascending=True)
df.to_csv("sh.csv", index=False, sep=',')
到这里数据集制作完成了
generate_df_affect_by_n_days函数,通过一个序列来生成一个矩阵(用于处理时序的数据)。就是把当天的前n天作为参数,当天的数据作为label。
readData中的文件名为:sh.csv ,参数column,代表选用的数据,本次预测只用了一列数据,列名就是column,参数n就是之前模型中所说的n,代表column前n天的数据。train_end表示的是后面多少个数据作为测试集。
import pandas as pd import matplotlib.pyplot as plt import datetime import torch import torch.nn as nn import numpy as np from torch.utils.data import Dataset, DataLoader def generate_df_affect_by_n_days(series, n, index=False): if len(series) <= n: raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n)) df = pd.DataFrame() for i in range(n): df['c%d' % i] = series.tolist()[i:-(n - i)] df['y'] = series.tolist()[n:] if index: df.index = series.index[n:] return df def readData(column='收盘价', n=30, all_too=True, index=False, train_end=-300): df = pd.read_csv("sh.csv", index_col=0, encoding='utf-8') df.fillna(0, inplace=True) df.replace(to_replace="None", value=0) del df["股票代码"] del df["名称"] df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index)) df_column = df[column].copy() df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:] df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index) print(df_generate_from_df_column_train) if all_too: return df_generate_from_df_column_train, df_column, df.index.tolist() return df_generate_from_df_column_train
模型见RNN类,采用GRU+全连接。隐藏层设置64,GRU的输出是64维的,所以设置全连接也是64。
TrainSet是数据读取类,将输出数据的最后一列做标签,前面的列做输入,类的写法是Pytorch的固定模式。
class RNN(nn.Module): def __init__(self, input_size): super(RNN, self).__init__() self.rnn = nn.GRU( input_size=input_size, hidden_size=64, num_layers=1, batch_first=True, ) self.out = nn.Sequential( nn.Linear(64, 1), ) self.hidden = None def forward(self, x): r_out, self.hidden = self.rnn(x) # None 表示 hidden state 会用全0的 state out = self.out(r_out) return out class TrainSet(Dataset): def __init__(self, data): # 定义好 image 的路径 self.data, self.label = data[:, :-1].float(), data[:, -1].float() def __getitem__(self, index): return self.data[index], self.label[index] def __len__(self): return len(self.data)
n为模型中的n
LR是模型的学习率
EPOCH是多次循环
train_end这个在之前的数据集中有提到。(注意是负数)
n = 30
LR = 0.0001
EPOCH = 100
train_end = -500
loss选用mse
预测的数据选择“收盘价”
n = 10 LR = 0.0001 EPOCH = 100 train_end = -500 # 数据集建立 df, df_all, df_index = readData('收盘价', n=n, train_end=train_end) df_all = np.array(df_all.tolist()) plt.plot(df_index, df_all, label='real-data') df_numpy = np.array(df) df_numpy_mean = np.mean(df_numpy) df_numpy_std = np.std(df_numpy) df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std df_tensor = torch.Tensor(df_numpy) trainset = TrainSet(df_tensor) trainloader = DataLoader(trainset, batch_size=16, shuffle=True) rnn = RNN(n) optimizer = torch.optim.Adam(rnn.parameters(), lr=LR) # optimize all cnn parameters loss_func = nn.MSELoss() for step in range(EPOCH): for tx, ty in trainloader: output = rnn(torch.unsqueeze(tx, dim=0)) loss = loss_func(torch.squeeze(output), ty) optimizer.zero_grad() # clear gradients for this training step loss.backward() # back propagation, compute gradients optimizer.step() print(step, loss) if step % 10: torch.save(rnn, 'rnn.pkl') torch.save(rnn, 'rnn.pkl') generate_data_train = [] generate_data_test = [] test_index = len(df_all) + train_end df_all_normal = (df_all - df_numpy_mean) / df_numpy_std df_all_normal_tensor = torch.Tensor(df_all_normal) for i in range(n, len(df_all)): x = df_all_normal_tensor[i - n:i] x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0) y = rnn(x) if i < test_index: generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) else: generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) plt.plot(df_index[n:train_end], generate_data_train, label='generate_train') plt.plot(df_index[train_end:], generate_data_test, label='generate_test') plt.legend() plt.show() plt.cla() plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data') plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test') plt.legend() plt.show()
本实例数据集使用上证的收盘价,构建时序数据集,使用GRU对数据预测,从结果上来看,走势有一定的滞后性,由于只是用了价格预测价格,还是太简单了,可以考虑加入更多的特征去优化这一算法。
import pandas as pd import matplotlib.pyplot as plt import datetime import torch import torch.nn as nn import numpy as np from torch.utils.data import Dataset, DataLoader import os os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' def generate_df_affect_by_n_days(series, n, index=False): if len(series) <= n: raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n)) df = pd.DataFrame() for i in range(n): df['c%d' % i] = series.tolist()[i:-(n - i)] df['y'] = series.tolist()[n:] if index: df.index = series.index[n:] return df def readData(column='收盘价', n=30, all_too=True, index=False, train_end=-300): df = pd.read_csv("sh.csv", index_col=0, encoding='utf-8') df.fillna(0, inplace=True) df.replace(to_replace="None", value=0) del df["股票代码"] del df["名称"] df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index)) df_column = df[column].copy() df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:] df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index) print(df_generate_from_df_column_train) if all_too: return df_generate_from_df_column_train, df_column, df.index.tolist() return df_generate_from_df_column_train class RNN(nn.Module): def __init__(self, input_size): super(RNN, self).__init__() self.rnn = nn.GRU( input_size=input_size, hidden_size=64, num_layers=1, batch_first=True, ) self.out = nn.Sequential( nn.Linear(64, 1), ) self.hidden = None def forward(self, x): r_out, self.hidden = self.rnn(x) # None 表示 hidden state 会用全0的 state out = self.out(r_out) return out class TrainSet(Dataset): def __init__(self, data): # 定义好 image 的路径 self.data, self.label = data[:, :-1].float(), data[:, -1].float() def __getitem__(self, index): return self.data[index], self.label[index] def __len__(self): return len(self.data) n = 10 LR = 0.0001 EPOCH = 100 train_end = -500 # 数据集建立 df, df_all, df_index = readData('收盘价', n=n, train_end=train_end) df_all = np.array(df_all.tolist()) plt.plot(df_index, df_all, label='real-data') df_numpy = np.array(df) df_numpy_mean = np.mean(df_numpy) df_numpy_std = np.std(df_numpy) df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std df_tensor = torch.Tensor(df_numpy) trainset = TrainSet(df_tensor) trainloader = DataLoader(trainset, batch_size=64, shuffle=True) rnn = RNN(n) optimizer = torch.optim.Adam(rnn.parameters(), lr=LR) # optimize all cnn parameters loss_func = nn.MSELoss() for step in range(EPOCH): for tx, ty in trainloader: output = rnn(torch.unsqueeze(tx, dim=0)) loss = loss_func(torch.squeeze(output), ty) optimizer.zero_grad() # clear gradients for this training step loss.backward() # back propagation, compute gradients optimizer.step() print(step, loss) if step % 10: torch.save(rnn, 'rnn.pkl') torch.save(rnn, 'rnn.pkl') generate_data_train = [] generate_data_test = [] test_index = len(df_all) + train_end df_all_normal = (df_all - df_numpy_mean) / df_numpy_std df_all_normal_tensor = torch.Tensor(df_all_normal) for i in range(n, len(df_all)): x = df_all_normal_tensor[i - n:i] x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0) y = rnn(x) if i < test_index: generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) else: generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) plt.plot(df_index[n:train_end], generate_data_train, label='generate_train') plt.plot(df_index[train_end:], generate_data_test, label='generate_test') plt.legend() plt.show() plt.cla() plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data') plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test') plt.legend() plt.show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。