赞
踩
RNN的基本结构:
图1
上图可以看到存在参数X、U、S、W、V、O,其中X为输入,U为输入层到隐藏层的参数矩阵,S表示隐藏层向量,V表示隐藏层到输出层的参数矩阵,O表示输出。抛开W,X->O的数据传播过程其实就是一个全连接神经网络的传播过程(X等价于输入,U等价于w41…;S等价于a部分;V等价于w84…部分;O等价于Y输出)。
图2
W的作用在循环训练过程中发挥作用,按照时间线可将训练过程展开为如下形式。神经网咯的输出Ot不仅与本时刻输入有关,还与上一时刻的隐藏层值决定,输入与输出之间的泛函形式如下:
Q
t
=
g
(
V
⋅
S
t
)
Q_t=g(V \sdot S_t)
Qt=g(V⋅St)
S
t
=
f
(
U
⋅
X
t
+
W
⋅
S
t
−
1
)
S_t=f(U \sdot X_t+W \sdot S_{t-1})
St=f(U⋅Xt+W⋅St−1)
图3
当句子很长之后,RNN会出现梯度消失的问题,LSTM为解决该问题应运而生,相关论文发表于1997年。
与RNN类似,LSTM在单组数据训练过程中,本质也是同一个网络在一个序列输入下的形式。只是由于输入数据为序列形式,因此按照时间展开训练过程,示意图如下。
图4
图5
接下来对上图4中的运行机制进行解释。可以看到图中训练过程仍以序列形式展现,实际上全程仍仅一个神经网络的参数在更新,就是图中绿色部分。绿色框内又分几个小模块,每个模块通过颜色和形状可以分辨其功能,具体对照表参考图5。
橙黄色矩形:神经网络层,即
w
T
x
+
b
w^Tx+b
wTx+b操作,区别在于使用不同的激活函数,三个
σ
\sigma
σ部分使用的是sigmoid函数,将数据压缩到[0,1]范围内;tanh()
部分使用的是双曲正切函数,可将数据归一化到[-1,1]区间。
浅粉色圆型:pointwise operation指的是矩阵按位操作,即两个维数相同的矩阵,同样位置的元素相乘或相加后放到新矩阵的同样位置上,示意过程如下。
Vector transfer:矩阵值传递
Concatenate:矩阵连接,两个矩阵不做任何运算,只是连接在一起,比如原来A矩阵10维,B矩阵5维,连接之后的C矩阵为15维。
**Copy:**矩阵赋值
图6
数据更新流程;
接下来更新
C
t
C_t
Ct,
上面的 g t g_t gt其实就是 C t ~C_t Ct,其他符号基本是一致的。可以看到,pytorch中, x t x_t xt与 h t − 1 h_{t-1} ht−1并没有拼接在一起,而是各自做了对应的运算,这其实就是使用了分块矩阵的技巧进行计算,结果理论上是一样的,不过这里有些不同的就是加了两个bias,因此计算偏置的参数需要乘2。
nn.lstm是继承nn.RNNBase,初始化定义如下:
class RNNBase(Module):
...
def __init__(self, mode, input_size, hidden_size,
num_layers=1, bias=True, batch_first=False,
dropout=0., bidirectional=False):
当batch_first设置为True使,输入数据集维度为(batch, seq_length, input_size),其中:
seq_len
表示文本长度,即输入序列的长度,比如输入为365天里每天的温度和湿度信息,则seq_length=365;input_size
:输入的特征维度,对应上一环节的温度和湿度,此值为2。当batch_first为True时,此时输出维度为(batch,seq_len,hidden_size*num_directions),hidden_size
表示隐藏层长度,num_directions
根据不同情境确定,普通LSTM该值为1, 双LSTM该值为2。
main.py
import torch import torch.nn as nn import numpy as np from trainer import * from LSTM import * data_dir="my.mat" # 初始化模型 input_size = 2 # 输入特征维度为1 hidden_size = 64 # 隐含层大小为64 hidden_num_layers=2 #隐藏层数 output_size = 1 # 输出特征维度为1 lstm_model = RNN(input_size, hidden_size, hidden_num_layers,output_size) # 设置训练参数 epoch = 1 batch_size = 1 trainer=Trainer(rnn_model,epoch=epoch,batch_size=batch_size,data_path=data_dir,lr=1e-3) train=1 test=1 if train: trainer.train() if test: trainer.test()
LSTM.py
import torch import torch.nn as nn class RNN(nn.Module): def __init__(self, input_size, hidden_size,hidden_num_layers, output_size): super(RNN, self).__init__() self.rnn = nn.LSTM(input_size, hidden_size,num_layers=hidden_num_layers,batch_first=True) self.fc = nn.Linear(hidden_size, output_size)#根据不同情境的输出维度,还需要添加全连接层进行维度转换。 def forward(self, x): out, _ = self.rnn(x) seq_len,batch_size,hidden_size=out.shape out=out.view(-1,hidden_size) out = self.fc(out) out=out.view(seq_len,batch_size,-1) return out class AverageMeter(object): """Computes and stores the average and current value""" def __init__(self): self.reset() def reset(self): self.val = 0 self.avg = 0 self.sum = 0 self.count = 0 self.vals=[] def update(self, val, n=1): self.val = val self.vals.append(val) self.sum += val * n self.count += n self.avg = self.sum / self.count
Datasets.py
import scipy.io as scio import torch import torch.nn as nn import numpy as np import torch.utils.data as Data from torch.utils.data import Dataset class Dataset_new(Dataset): def __init__(self) -> None: return @staticmethod def get_img_info(filename,time_size=100,train=False,test=False,val=False): load_mat = scio.loadmat(filename) feature1 = load_mat['temperture'] feature2 = load_mat['humidity'] tags = load_mat['weather'] data_info=[] if train: size=int(len(feature1)*0.8) for i in range(size): temp=np.array(feature1[i]) temp=temp.reshape(time_size,1) temp2=np.array(feature2[i]) temp2=temp.reshape(time_size,1) features=np.hstack((temp,temp2)) features=torch.Tensor(features) tag=tags[i] tag=np.array(tag) tag=torch.Tensor(tag).view(time_size,1) data_info.append((features,tag)) if test: size=int(len(feature1)*0.8) size2=int(len(feature1)*0.9) for i in range(size,size2): temp=np.array(feature1[i]) temp=temp.reshape(time_size,1) temp2=np.array(feature2[i]) temp2=temp.reshape(time_size,1) features=np.hstack((temp,temp2)) features=torch.Tensor(features) tag=tags[i] tag=np.array(tag) tag=torch.Tensor(tag).view(time_size,1) data_info.append((features,tag)) if val: size=int(len(feature1)*0.9) size2=int(len(feature1)) # print(size, size2) for i in range(size,size2): temp=np.array(feature1[i]) temp=temp.reshape(time_size,1) temp2=np.array(feature2[i]) temp2=temp.reshape(time_size,1) features=np.hstack((temp,temp2)) features=torch.Tensor(features) tag=tags[i] tag=np.array(tag) tag=torch.Tensor(tag).view(time_size,1) data_info.append((features,tag)) return data_info
Trainer.py
from RNN import * from dataset import * import torch import torch.nn as nn import numpy as np import matplotlib.pyplot as plt from torch.nn.modules import loss import torch.optim as optim from torch.optim.lr_scheduler import StepLR #用于动态调整任意优化算法的学习率,可定义step_size以及gamma值 from torch.utils.data import DataLoader import os import shutil import time from torch.utils.tensorboard import SummaryWriter def save_model(state,is_best=None,save_dir=None): last_model=os.path.join(save_dir,'last_model.pth') torch.save(state,last_model) if is_best: best_model=os.path.join(save_dir,'best_model.pth') shutil.copyfile(last_model,best_model)# 将last_model文件复制到best_model中 class Trainer(): def __init__(self,model,epoch,optimizer='Adam',batch_size=32,data_path="",lr=1e-3): self.model=model self.device=torch.device("cuda") self.criterion=nn.MSELoss(reduction="mean") self.model_name=self.model.__class__.__name__ self.epochs=epoch self.batch_size=batch_size if optimizer in ['Adam']: self.optimizer=optim.Adam(self.model.parameters(),lr=lr) if optimizer in ['SGD']: self.optimizer=optim.SGD(self.model.parameters(),lr=1e-3) if optimizer in ['RMSdrop']: self.optimizer=optim.RMSdrop(self.model.parameters(),lr=1e-3,alpha=0.99,eps=1e-08) if optimizer in ['Momentum']: self.optimizer=optim.SGD(self.model.parameters(),lr=1e-3,momentum=0.5) self.model=self.model.to(self.device) self.model.zero_grad() dataset=Dataset_new() self.train_data=dataset.get_img_info(data_path,time_size=100,train=True) #训练集 self.val_data=dataset.get_img_info(data_path,time_size=100,val=True) # 验证集 self.test_data=dataset.get_img_info(data_path,time_size=100,test=True) # 测试集 def _model_path(self): if not os.path.exists('./checkpoints/checkpoints'): os.mkdir('./checkpoints/checkpoints') path=os.path.join('./checkpoints/checkpoints',self.model_name) if not os.path.exists(path): os.mkdir(path) return path def train(self): # 利用batch size方法获取训练集和验证集 train_loader = DataLoader(self.train_data,batch_size=self.batch_size,shuffle=True) val_loader = DataLoader(self.val_data, batch_size=self.batch_size,shuffle=True) best_loss=1e10 val_loss=AverageMeter() for epoch in range(self.epochs): for batch_idx,(data,tags) in enumerate(train_loader): data=data.to(self.device) tags=tags.to(self.device) self.optimizer.zero_grad() output=self.model(data) loss=self.criterion(output,tags) loss.backward() self.optimizer.step() with torch.no_grad():# 验证精度 val_loss.reset() for x,(data,tags) in enumerate(val_loader): data=data.to(self.device) tags=tags.to(self.device) output=self.model(data) loss2=self.criterion(output,tags) val_loss.update(loss2.item()) is_best=(val_loss.avg<best_loss) best_loss=val_loss.avg if is_best else best_loss if is_best: state={ 'epoch':epoch, 'state_dict':self.model.state_dict(), 'best_loss':best_loss } save_model(state,is_best,save_dir=self._model_path()) # best_loss_epoch.update(best_loss)#这里需要再测试一下 if epoch%10==0: print("Epoch: ",epoch," Best loss: ",best_loss) def test(self,data_path='',number=0,training=1,flag=0): test_loader = DataLoader(self.test_data,batch_size=self.batch_size,shuffle=True) # 调用模型 path1='./checkpoints/checkpoints' model_name=self.model.__class__.__name__ model_path=os.path.join(path1,model_name,'best_model.pth') best_model=torch.load(model_path) self.model.load_state_dict(best_model['state_dict']) self.model=self.model.to(self.device) # # 测试 with torch.no_grad():# 验证精度 test_loss=AverageMeter() self.model.eval() for x,(data,tags) in enumerate(test_loader): data=data.to(self.device) tags=tags.to(self.device) output=self.model(data) loss=self.criterion(output,tags) test_loss.update(loss.item()) print('test loss average value: ',test_loss.avg)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。