赞
踩
最近一直在做类ChatGPT项目的部署 微调,关注比较多的是两个:一个LLaMA,一个ChatGLM,会发现有不少模型是基于这两个模型去做微调的,说到微调,那具体怎么微调呢,因此又详细了解了一下微调代码,发现微调LLM时一般都会用到Hugging face实现的Transformers库的Trainer类
从而发现,如果大家想从零复现ChatGPT,便得从实现Transformer开始,因此便开启了本文:从零实现Transformer的简易版与强大版:从300多行到3000多行,主要分为两个大部分
且本文的代码解读与其他代码解读最大的不同是:会对出现在本文的每一行代码都加以注释、解释、说明,甚至对每行代码中的变量都会做解释/说明
总之,一如既往的保持对初学者的足够友好,让即便没有太多背景知识的也能顺畅理解本文
transformer强大到什么程度呢,基本是17年之后绝大部分有影响力模型的基础架构都基于的transformer(比如,这里有200来个,包括且不限于基于decode的GPT、基于encode的BERT、基于encode-decode的T5等等)
通过博客内的这篇文章《Transformer通俗笔记:从Word2Vec、Seq2Seq逐步理解到GPT、BERT》,我们已经详细了解了transformer的原理(如果忘了,建议必复习下再看本文,当然,如果你实在不想跳转,就只想呆在本文,也行,我努力..)
如果把上图中的各种细节也显示出来,则如下大图所示(此大图来源于七月在线NLP11里倪老师讲的Transformer模型源码解读,positional encoding、多头等没画)
具体说来,是一个典型的编码器-解码器架构
- # 定义一个基于 nn.Module 的编码器-解码器类
- class EncoderDecoder(nn.Module):
-
- # 初始化方法,接收编码器、解码器、源嵌入、目标嵌入和生成器作为参数
- def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
- # 调用 nn.Module 的初始化方法
- super(EncoderDecoder, self).__init__()
-
- self.encoder = encoder # 将传入的编码器实例保存为类属性
- self.decoder = decoder # 将传入的解码器实例保存为类属性
- self.src_embed = src_embed # 将传入的源嵌入实例保存为类属性
- self.tgt_embed = tgt_embed # 将传入的目标嵌入实例保存为类属性
- self.generator = generator # 将传入的生成器实例保存为类属性
-
- # 前向传播方法,接收源序列、目标序列和它们的掩码作为参数
- def forward(self, src, tgt, src_mask, tgt_mask):
- # 对源序列进行编码,并将编码结果与掩码传递给解码器进行解码
- return self.decode(self.encode(src, src_mask), src_mask,
- tgt, tgt_mask)
-
- # 编码方法,接收源序列和掩码作为参数
- def encode(self, src, src_mask):
- # 将源序列进行嵌入,然后将嵌入后的序列和源序列掩码传给编码器
- return self.encoder(self.src_embed(src), src_mask)
-
- # 解码方法,接收编码器输出(memory)、源序列掩码、目标序列和目标序列掩码作为参数
- def decode(self, memory, src_mask, tgt, tgt_mask):
- # 将目标序列进行嵌入,然后将嵌入后的序列、编码器输出、源序列掩码和目标序列掩码传给解码器
- return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
-
- # 定义一个基于 nn.Module 的生成器类
- class Generator(nn.Module):
-
- # 初始化方法,接收模型维度(d_model)和词汇表大小(vocab)作为参数
- def __init__(self, d_model, vocab):
- # 调用 nn.Module 的初始化方法
- super(Generator, self).__init__()
- # 定义一个线性层,将模型的输出维度映射到词汇表大小
- self.proj = nn.Linear(d_model, vocab)
-
- # 前向传播方法,接收输入 x
- def forward(self, x):
- # 将输入 x 传入线性层,然后对输出应用 log-softmax 激活函数(在最后一个维度上)
- return F.log_softmax(self.proj(x), dim=-1)
考虑到Hugging face实现的Transformers库虽然功能强大,但3000多行,对于初次实现的初学者来说,理解难度比较大,因此,咱们一步步结合对应的原理来逐行编码实现一个简易版的transformer
为了方便后面代码的编写,先引入一些库
- import numpy as np # 导入NumPy库,用于进行矩阵运算和数据处理
- import torch # 导入PyTorch库,用于构建神经网络及相关操作
- import torch.nn as nn # 导入PyTorch神经网络模块,用于构建神经网络层
- import torch.nn.functional as F # 导入PyTorch神经网络函数库,用于激活函数、损失函数等
- import math, copy, time # 导入数学库、复制库和时间库,用于各种数学计算、复制操作和计时
- from torch.autograd import Variable # 从PyTorch自动微分库中导入Variable类,用于构建自动微分计算图
- import matplotlib.pyplot as plt # 导入Matplotlib的pyplot模块,用于绘制图表和可视化
- import seaborn # 导入Seaborn库,用于绘制统计图形和美化图表
- seaborn.set_context(context="talk") # 设置Seaborn的上下文环境,设置图表的尺寸和标签字体大小等
- %matplotlib inline # IPython魔术命令,使Matplotlib绘制的图形直接显示在Notebook内
对于模型来说,每一句话比如“七月的服务真好,答疑的速度很快”,在模型中都是一个词向量,但如果每句话都临时抱佛脚去生成对应的词向量,则处理起来无疑会费时费力,所以在实际应用中,我们会事先预训练好各种embedding矩阵,这些embedding矩阵包含常用领域常用单词的向量化表示,且提前做好分词
维度1 | 维度2 | 维度3 | 维度4 | ... | 维度512 | |
教育 | ||||||
机构 | ||||||
在线 | ||||||
课程 | ||||||
.. | ||||||
服务 | ||||||
答疑 | ||||||
老师 |
从而当模型接收到“七月的服务真好,答疑的速度很快”这句输入时,便可以从对应的embedding矩阵里查找对应的词向量,最终把整句输入转换成对应的向量表示
这部分的代码 可以如下表示
- # 定义一个名为Embeddings的类,继承自PyTorch的nn.Module类
- class Embeddings(nn.Module):
- # 初始化Embeddings类
- def __init__(self, d_model, vocab):
- # 调用父类nn.Module的初始化方法
- super(Embeddings, self).__init__()
- # 创建一个词嵌入层,参数为词汇表大小和词嵌入维度
- self.lut = nn.Embedding(vocab, d_model)
- # 将词嵌入维度保存为类属性
- self.d_model = d_model
-
- # 定义前向传播方法
- def forward(self, x):
- # 通过词嵌入层将输入的单词编码为向量,并乘以词嵌入维度的平方根进行缩放
- return self.lut(x) * math.sqrt(self.d_model)
关于位置编码的通透理解,请参阅此文《一文通透位置编码:从标准位置编码到旋转位置编码RoPE》
最终,再通过下面这两行代码完美实现位置编码
- # 使用正弦和余弦函数生成位置编码,对于d_model的偶数索引,使用正弦函数;对于奇数索引,使用余弦函数。
- pe[:, 0::2] = torch.sin(position * div_term)
- pe[:, 1::2] = torch.cos(position * div_term)
从下图可知,经过「embedding + 位置编码」得到的输入,会乘以「三个权重矩阵: 」得到查询向量Q、键向量K、值向量V(你可以简单粗暴的理解为弄出来了三个分身)
举个例子,针对「我想吃酸菜鱼」这句话,经过embedding + 位置编码后,可得(注:可以512维,也可以是768维,但由于transformer论文中作者设置的512维,所以除了这个酸菜鱼的例子暂为768维外,其他地方均统一为512维)
然后乘以三个权重矩阵得
为此,我们可以先创建4个相同的线性层,每个线性层都具有 d_model 的输入维度和 d_model 的输出维度
self.linears = clones(nn.Linear(d_model, d_model), 4)
前三个线性层分别用于对 Q向量、K向量、V向量进行线性变换(至于这第4个线性层在随后的第3点)
我们聚焦下transformer论文中原图的这部分,可知,输入通过embedding+位置编码后,先后做以下两个步骤
- attention = self.attention(query, key, value, mask)
- output = self.dropout(self.norm1(attention + query))
这个相加具体是怎么个相加法呢?事实上,Add代表的Residual Connection(残差连接),是为了解决多层神经网络训练困难的问题,通过将前一层的信息无差的传递到下一层,可以有效的仅关注差异部分,这一方法之前在图像处理结构如ResNet等中常常用到 具体编码时通过 SublayerConnection 函数实现此功能 - """一个残差连接(residual connection),后面跟着一个层归一化(layer normalization)操作"""
- class SublayerConnection(nn.Module):
- # 初始化函数,接收size(层的维度大小)和dropout(dropout率)作为输入参数
- def __init__(self, size, dropout):
- super(SublayerConnection, self).__init__() # 调用父类nn.Module的构造函数
- self.norm = LayerNorm(size) # 定义一个层归一化(Layer Normalization)操作,使用size作为输入维度
- self.dropout = nn.Dropout(dropout) # 定义一个dropout层
-
- # 定义前向传播函数,输入参数x是输入张量,sublayer是待执行的子层操作
- def forward(self, x, sublayer):
- # 将残差连接应用于任何具有相同大小的子层
- # 首先对输入x进行层归一化,然后执行子层操作(如self-attention或前馈神经网络)
- # 接着应用dropout,最后将结果与原始输入x相加。
- return x + self.dropout(sublayer(self.norm(x)))
而Norm则代表了Layer Normalization,通过对层的激活值的归一化,可以加速模型的训练过程,使其更快的收敛,编码时用 LayerNorm 函数实现 - """构建一个层归一化(layernorm)模块"""
- class LayerNorm(nn.Module):
- # 初始化函数,接收features(特征维度大小)和eps(防止除以零的微小值)作为输入参数
- def __init__(self, features, eps=1e-6):
- super(LayerNorm, self).__init__() # 调用父类nn.Module的构造函数
- self.a_2 = nn.Parameter(torch.ones(features)) # 定义一个大小为features的一维张量,初始化为全1,并将其设置为可训练参数
- self.b_2 = nn.Parameter(torch.zeros(features)) # 定义一个大小为features的一维张量,初始化为全0,并将其设置为可训练参数
- self.eps = eps # 将防止除以零的微小值eps保存为类实例的属性
-
- # 定义前向传播函数,输入参数x是输入张量
- def forward(self, x):
- mean = x.mean(-1, keepdim=True) # 计算输入x在最后一个维度上的均值,保持输出结果的维度
- std = x.std(-1, keepdim=True) # 计算输入x在最后一个维度上的标准差,保持输出结果的维度
- # 对输入x进行层归一化,使用可训练参数a_2和b_2进行缩放和偏移,最后返回归一化后的结果
- return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
- forward = self.feed_forward(output)
- block_output = self.dropout(self.norm2(forward + output))
- return block_output
最终这个编码器层代码可以完整的写为
- """编码器(Encoder)由自注意力(self-attention)层和前馈神经网络(feed forward)层组成"""
- class EncoderLayer(nn.Module):
- # 初始化函数,接收size(层的维度大小)、self_attn(自注意力层实例)
- # feed_forward(前馈神经网络实例)和dropout(dropout率)作为输入参数
- def __init__(self, size, self_attn, feed_forward, dropout):
- super(EncoderLayer, self).__init__() # 调用父类nn.Module的构造函数
- self.self_attn = self_attn # 将自注意力层实例保存为类实例的属性
- self.feed_forward = feed_forward # 将前馈神经网络实例保存为类实例的属性
-
- # 创建两个具有相同参数的SublayerConnection实例(用于残差连接和层归一化)
- self.sublayer = clones(SublayerConnection(size, dropout), 2)
- self.size = size # 将层的维度大小保存为类实例的属性
-
- def forward(self, x, mask):
- # 先对输入x进行自注意力操作
- # 然后将结果传递给第一个SublayerConnection实例(包括残差连接和层归一化)
- x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
-
- # 将上一步的输出传递给前馈神经网络
- # 然后将结果传递给第二个SublayerConnection实例(包括残差连接和层归一化),最后返回结果
- return self.sublayer[1](x, self.feed_forward)
接下来,先看下缩放点积注意力(Scaled Dot-Product Attention)的整体实现步骤
- # torch.matmul是PyTorch库提供的矩阵乘法函数
- # 具体操作即是将第一个矩阵的每一行与第二个矩阵的每一列进行点积(对应元素相乘并求和),得到新矩阵的每个元素
- scores = torch.matmul(query, key.transpose(-2, -1)) \
- / math.sqrt(d_k)
- # 对 scores 进行 softmax 操作,得到注意力权重 p_attn
- p_attn = F.softmax(scores, dim = -1)
- # 用注意力权重 p_attn 对 value 向量进行加权求和,得到最终的输出
- return torch.matmul(p_attn, value), p_attn
同样的方法,也可以计算出,如下图8所示, b2就是拿q2去对其他的key做attention,最后再与其他的value值相乘取weighted sum得到,最终每个单词都包含了上下文相关单词的语义信息,不再只是attention计算之前,每个单词只有它自己的信息,和上下文没有关联
另外,这里面还有一点值得注意的是,可能有同学疑问:当我们计算x1与x2、x3、x4的相似度之后,x2会再与x1、x3、x4再依次计算一遍相似度,这两个过程中,前者算过了x1和x2的相似度,后者则再算一遍x2与x1的相似度,这不是重复计算么?其实不然,这是两码事,原因很简单,正如你喜欢一个人 你会觉得她对你很重要,但那个人不一定喜欢你 她不会觉得你对她有多重要..
最终,Scaled Dot-Product Attention这部分对应的完整代码可以写为
- '''计算“缩放点积注意力'''
- # query, key, value 是输入的向量组
- # mask 用于遮掩某些位置,防止计算注意力
- # dropout 用于添加随机性,有助于防止过拟合
- def attention(query, key, value, mask=None, dropout=None):
-
- d_k = query.size(-1) # 获取 query 向量的最后一个维度的大小,即词嵌入的维度
-
- # 计算 query 和 key 的点积,并对结果进行缩放,以减少梯度消失或爆炸的可能性
- scores = torch.matmul(query, key.transpose(-2, -1)) \
- / math.sqrt(d_k)
-
- # 如果提供了 mask,根据 mask 对 scores 进行遮掩
- # 遮掩的具体方法就是设为一个很大的负数比如-1e9,从而softmax后 对应概率基本为0
- if mask is not None:
- scores = scores.masked_fill(mask == 0, -1e9)
-
- # 对 scores 进行 softmax 操作,得到注意力权重 p_attn
- p_attn = F.softmax(scores, dim = -1)
-
- # 如果提供了 dropout,对注意力权重 p_attn 进行 dropout 操作
- if dropout is not None:
- p_attn = dropout(p_attn)
-
- # 用注意力权重 p_attn 对 value 向量进行加权求和,得到最终的输出
- return torch.matmul(p_attn, value), p_attn
先看2个头的例子,依然还是通过生成对应的三个矩阵、、,然后这三个矩阵再各自乘以两个转移矩阵得到对应的分矩阵,如
至于同理,也生成对应的6个分矩阵、、、、、
接下来编码时,分两步
如果是8个头呢,计算步骤上也是一样的,只是从2个头变化到8个头而已,最终把每个头得到的结果直接concat,最后经过一个linear变换,得到最终的输出,整体如下所示
这部分Multi-Head Attention的代码可以写为
- '''代码来自nlp.seas.harvard.edu,我针对每一行代码、甚至每行代码中的部分变量都做了详细的注释/解读'''
- class MultiHeadedAttention(nn.Module):
- # 输入模型的大小(d_model)和注意力头的数量(h)
- def __init__(self, h, d_model, dropout=0.1):
- super(MultiHeadedAttention, self).__init__()
- assert d_model % h == 0 # 确保 d_model 可以被 h 整除
-
- # 我们假设 d_v(值向量的维度)总是等于 d_k(键向量的维度)
- self.d_k = d_model // h # 计算每个注意力头的维度
- self.h = h # 保存注意力头的数量
- self.linears = clones(nn.Linear(d_model, d_model), 4) # 上文解释过的四个线性层
- self.attn = None # 初始化注意力权重为 None
- self.dropout = nn.Dropout(p=dropout) # 定义 dropout 层
-
- # 实现多头注意力的前向传播
- def forward(self, query, key, value, mask=None):
- if mask is not None:
- # 对所有 h 个头应用相同的 mask
- mask = mask.unsqueeze(1)
- nbatches = query.size(0) # 获取 batch 的大小
-
- # 1) 批量执行从 d_model 到 h x d_k 的线性投影
- query, key, value = \
- [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
- for l, x in zip(self.linears, (query, key, value))]
-
- # 2) 在批量投影的向量上应用注意力
- # 具体方法是调用上面实现Scaled Dot-Product Attention的attention函数
- x, self.attn = attention(query, key, value, mask=mask,
- dropout=self.dropout)
-
- # 3) 使用 view 函数进行“拼接concat”,然后做下Linear变换
- x = x.transpose(1, 2).contiguous() \
- .view(nbatches, -1, self.h * self.d_k)
- return self.linears[-1](x) # 返回多头注意力的输出
在上文,咱们逐一编码实现了embedding、位置编码、缩放点积/多头注意力,以及Add和Norm,整个编码器部分还剩最后一个模块,即下图框里的Feed Forward Network(简称FFN)
其中包括两个线性变换:维度上先扩大后缩小,最终输入和输出的维数为,内层的维度为,过程中使用ReLU作为激活函数
虽然线性变换在不同位置上是相同的,但它们在层与层之间使用不同的参数,相当于使用了两个内核大小为1的卷积
这部分的代码可以如下编写
- ‘’‘定义一个名为PositionwiseFeedForward的类,继承自nn.Module’‘’
- class PositionwiseFeedForward(nn.Module):
- # 文档字符串:实现FFN方程
- # 初始化方法,接受三个参数:d_model,d_ff和dropout(默认值为0.1)
- def __init__(self, d_model, d_ff, dropout=0.1):
- # 调用父类nn.Module的初始化方法
- super(PositionwiseFeedForward, self).__init__()
- self.w_1 = nn.Linear(d_model, d_ff) # 定义一个全连接层,输入维度为d_model,输出维度为d_ff
- self.w_2 = nn.Linear(d_ff, d_model) # 定义一个全连接层,输入维度为d_ff,输出维度为d_model
- self.dropout = nn.Dropout(dropout) # 定义一个dropout层,dropout概率为传入的dropout参数
-
- # 定义前向传播方法,接受一个输入参数x
- def forward(self, x):
- # 将输入x通过第一个全连接层w_1后,经过ReLU激活函数,再通过dropout层,最后通过第二个全连接层w_2,返回最终结果
- return self.w_2(self.dropout(F.relu(self.w_1(x))))
N可以等于6或其他数值
- class Encoder(nn.Module): # 定义一个名为Encoder的类,它继承了nn.Module类
- # 一个具有N层堆叠的核心编码器
- # 初始化方法,接受两个参数:layer(编码器层的类型)和N(编码器层的数量)
- def __init__(self, layer, N):
- super(Encoder, self).__init__() # 调用父类nn.Module的初始化方法
- self.layers = clones(layer, N) # 创建N个编码器层的副本,并将其赋值给实例变量self.layers
- self.norm = LayerNorm(layer.size) # 创建一个LayerNorm层,并将其赋值给实例变量self.norm
-
- # 定义前向传播方法,接受两个参数:x(输入数据)和mask(掩码)
- def forward(self, x, mask):
- # 文档字符串:解释本方法的功能是将输入(及其掩码)依次传递给每一层
- for layer in self.layers: # 遍历self.layers中的每一个编码器层
- x = layer(x, mask) # 将输入x和mask传递给当前编码器层,并将输出结果赋值给x
- return self.norm(x) # 对最终的输出x应用LayerNorm层,并将结果返回
其中的clone函数的代码为
- def clones(module, N):
- "Produce N identical layers."
- return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
咱们再回顾下transformer的整个模型架构,特别是解码器的部分,毕竟BERT外,GPT等很有影响力的模型都用的transformer decode结构
从底至上,
由于在第一部分介绍过了embedding、positional encoding、FFN、Add&Norm、linear、softmax、multi-head attention,故本部分只重点介绍下Masked Multi-Head Self-attention
本过程和第一部分介绍的Multi-Head self-attention基本一致,区别在于加了个mask机制
整个解码器架构的代码可以如下编写『有一点值得注意的是,如下文代码中所述
- # 定义DecoderLayer类,继承自PyTorch的nn.Module类
- class DecoderLayer(nn.Module):
- # 初始化方法,接收五个参数:size, self_attn, src_attn, feed_forward, dropout
- # 调用父类nn.Module的初始化方法
- def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
- super(DecoderLayer, self).__init__()
-
- # 将size赋值给实例变量self.size
- self.size = size
-
- # 将self_attn赋值给实例变量self.self_attn
- self.self_attn = self_attn
-
- # 将src_attn赋值给实例变量self.src_attn
- self.src_attn = src_attn
-
- # 将feed_forward赋值给实例变量self.feed_forward
- self.feed_forward = feed_forward
-
- # 使用SublayerConnection类创建三个子层,并存储到实例变量self.sublayer中
- self.sublayer = clones(SublayerConnection(size, dropout), 3)
-
- # 定义前向传播方法,接收四个参数:x, memory, src_mask, tgt_mask
- def forward(self, x, memory, src_mask, tgt_mask):
-
- # 将memory赋值给局部变量m
- m = memory
-
- # 对输入x执行自注意力计算并进行第一个子层的处理
- x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
-
- # 对输入x执行源注意力计算并进行第二个子层的处理
- x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))
-
- # 对输入x执行前馈神经网络计算并进行第三个子层的处理,然后返回结果
- return self.sublayer[2](x, self.feed_forward)
且Decoder也是由N=6个相同层组成
- class Decoder(nn.Module):
- "Generic N layer decoder with masking."
- def __init__(self, layer, N):
- super(Decoder, self).__init__()
- self.layers = clones(layer, N)
- self.norm = LayerNorm(layer.size)
-
- def forward(self, x, memory, src_mask, tgt_mask):
- for layer in self.layers:
- x = layer(x, memory, src_mask, tgt_mask)
- return self.norm(x)
最终,整个transformer完整模型的整体封装代码为
- def make_model(src_vocab, tgt_vocab, N=6,
- d_model=512, d_ff=2048, h=8, dropout=0.1):
- "Helper: Construct a model from hyperparameters."
- c = copy.deepcopy
- attn = MultiHeadedAttention(h, d_model)
- ff = PositionwiseFeedForward(d_model, d_ff, dropout)
- position = PositionalEncoding(d_model, dropout)
- model = EncoderDecoder(
- Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
- Decoder(DecoderLayer(d_model, c(attn), c(attn),
- c(ff), dropout), N),
- nn.Sequential(Embeddings(d_model, src_vocab), c(position)),
- nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),
- Generator(d_model, tgt_vocab))
-
- # This was important from their code.
- # Initialize parameters with Glorot / fan_avg.
- for p in model.parameters():
- if p.dim() > 1:
- nn.init.xavier_uniform(p)
- return model
-
- # Small example model.
- tmp_model = make_model(10, 10, 2)
- None
当我们把编码器和解码器组合到一起后,看下它两是如何一块协作的
需要注意的是
具体实现时,先创建批次和掩码
- class Batch:
- def __init__(self, src, trg=None, pad=0):
- self.src = src # 输入数据源(通常为源语言)
- self.src_mask = (src != pad).unsqueeze(-2) # 创建源语言的掩码,用于忽略填充部分
- if trg is not None: # 如果目标语言数据存在
- self.trg = trg[:, :-1] # 目标语言数据,去掉最后一个词
- self.trg_y = trg[:, 1:] # 目标语言数据,去掉第一个词
- self.trg_mask = \
- self.make_std_mask(self.trg, pad) # 创建目标语言的掩码,用于忽略填充部分和未来词汇
- self.ntokens = (self.trg_y != pad).data.sum() # 计算目标语言中非填充词的数量
-
- @staticmethod
- def make_std_mask(tgt, pad):
- "Create a mask to hide padding and future words."
- tgt_mask = (tgt != pad).unsqueeze(-2) # 创建目标语言的掩码,用于忽略填充部分
- tgt_mask = tgt_mask & Variable(
- subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)) # 使用子掩码屏蔽未来词汇
- return tgt_mask # 返回完整的目标语言掩码
其中,subsequent_mask的实现如下所示
- def subsequent_mask(size):
- "Mask out subsequent positions."
- attn_shape = (1, size, size)
- subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
- return torch.from_numpy(subsequent_mask) == 0
接下来,我们创建一个通用的训练和得分函数来跟踪损失。我们传入一个通用的损失计算函数,它也处理参数更新
- def run_epoch(data_iter, model, loss_compute):
- start = time.time() # 记录当前时间
- total_tokens = 0 # 初始化总tokens计数
- total_loss = 0 # 初始化总损失
- tokens = 0 # 初始化tokens计数
-
- # 遍历数据集中的每个批次
- for i, batch in enumerate(data_iter):
- # 对每个批次进行前向传播
- out = model.forward(batch.src, batch.trg,
- batch.src_mask, batch.trg_mask)
-
- # 计算每个批次的损失
- loss = loss_compute(out, batch.trg_y, batch.ntokens)
-
- # 累加损失
- total_loss += loss
- total_tokens += batch.ntokens # 累加tokens
- tokens += batch.ntokens # 累加tokens
-
- # 每50个批次进行一次日志记录
- if i % 50 == 1:
- elapsed = time.time() - start # 计算已用时间
-
- # 输出当前批次,损失和每秒处理的tokens
- print("Epoch Step: %d Loss: %f Tokens per Sec: %f" %
- (i, loss / batch.ntokens, tokens / elapsed))
- start = time.time() # 重置开始时间
- tokens = 0 # 重置tokens计数
-
- return total_loss / total_tokens # 返回平均损失
下面这段代码定义了一个名为 SimpleLossCompute 的类,实现了简单的损失计算和训练函数
- # 定义 SimpleLossCompute 类,实现简单的损失计算和训练函数
- class SimpleLossCompute:
- # 初始化 SimpleLossCompute 类的实例
- def __init__(self, generator, criterion, opt=None):
- self.generator = generator # 生成器,用于预测输出
- self.criterion = criterion # 损失函数,如交叉熵损失
- self.opt = opt # 优化器,如 Adam
-
- # 定义调用 SimpleLossCompute 类实例时的操作
- def __call__(self, x, y, norm):
- x = self.generator(x) # 生成预测输出
- # 计算损失,这里需要将预测输出和目标输出转换为合适的形状
- loss = self.criterion(x.contiguous().view(-1, x.size(-1)),
- y.contiguous().view(-1)) / norm
- loss.backward() # 计算梯度
- if self.opt is not None: # 如果提供了优化器
- self.opt.step() # 更新模型参数
- self.opt.optimizer.zero_grad() # 清空梯度缓存
- return loss.data[0] * norm # 返回损失值乘以规范化因子(实际损失值)
优化器(optimizer)经常用于在训练过程中更新模型参数以最小化损失函数,而Adam(Adaptive Moment Estimation)是一种常用的优化器,它结合了两种传统优化算法的优点:Momentum和RMSprop
为了通俗易懂地理解Adam,可以将其比作一个赛车手。训练模型就像是找到一辆赛车在赛道上的最佳行驶速度和路径,以达到最快的速度并取得优异的成绩。在这个过程中,速度的调整(即学习率)非常重要
首先,Adam像Momentum一样,具有动量效应。这意味着赛车手(模型)会积累动量,使其在下坡时更快,而在上坡时减速。这有助于模型更快地穿越平坦区域,并避免在最低点附近摆动
其次,Adam像RMSprop一样,会自适应地调整每个参数的学习率。在我们的赛车比喻中,这就像赛车手会针对每个轮胎的摩擦系数(赛道状况)做出相应的速度调整。这有助于模型更快地收敛到最优解
总之,Adam可以自动调整学习率,并具有动量效应。总的来说,它能帮助我们的“赛车手”在不同的赛道状况下更快地找到最佳行驶速度和路径,从而更快地训练出高效的模型
transformer原始论文便选择的Adam作为优化器,其参数为,和,根据以下公式,我们在训练过程中改变了学习率:
在预热中随步数线性地增加学习速率,并且此后与步数的反平方根成比例地减小它,设置预热步数为4000
我们来看下具体的编码实现。下面这段代码定义了一个名为 NoamOpt 的类,实现了一种自适应学习率调整策略,该策略在训练 Transformer 模型时常用。在训练的前几个步骤(预热期)中,学习率会线性增长,之后学习率会随着步数的增加而逐渐降低。这种策略有助于模型在训练初期更快地收敛,同时在训练后期保持较低的学习率,有利于模型的稳定训练。
- # 定义 NoamOpt 类,实现自适应学习率调整策略
- class NoamOpt:
- # 初始化 NoamOpt 类的实例
- def __init__(self, model_size, factor, warmup, optimizer):
- self.optimizer = optimizer # 优化器对象(如 Adam)
- self._step = 0 # 记录优化步数
- self.warmup = warmup # 预热步数
- self.factor = factor # 缩放因子
- self.model_size = model_size # 模型维度大小
- self._rate = 0 # 初始学习率
-
- # 更新模型参数和学习率
- def step(self):
- self._step += 1 # 优化步数加 1
- rate = self.rate() # 计算当前学习率
- for p in self.optimizer.param_groups: # 更新优化器中的学习率
- p['lr'] = rate
- self._rate = rate # 存储当前学习率
- self.optimizer.step() # 更新模型参数
-
- # 计算当前步数的学习率
- def rate(self, step=None):
- if step is None: # 如果未提供步数,使用当前步数
- step = self._step
- return self.factor * \
- (self.model_size ** (-0.5) * # 计算学习率公式中的模型维度项
- min(step ** (-0.5), step * self.warmup ** (-1.5))) # 计算学习率公式中的最小值项
-
- # 定义用于获取 NoamOpt 类实例的函数
- def get_std_opt(model):
- return NoamOpt(model.src_embed[0].d_model, 2, 4000,
- torch.optim.Adam(model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9))
最后总结一下Transformer的影响力
目前绝大部分有影响力的大模型基本都基于transformer的架构 (这个页面底部可以看到基于transformer的200多个有影响力的模型),既然基于transformer便得实现transformer
然要分析这么一个大库是不容易的,如下图所示,包括分词等等各种功能
且光trainer.py(https://github.com/huggingface/transformers/blob/main/src/transformers/trainer.py)这一个项目文件的实现就有3858行
然后定义class Trainer,逐一实现了如下函数
硬件配置:代码首先判断是否需要将模型放置在特定的设备(如 GPU 或 CPU)上。一些特殊情况,如使用了模型并行、深度学习库DeepSpeed、完全bf16或fp16评估、数据并行处理和完全分片的数据并行处理,都会对这个决定产生影响。
数据预处理:然后,代码会创建一个用于数据处理的 data_collator,这个 data_collator 会根据是否有分词器(tokenizer)来选择默认的数据整理器。这个整理器将在训练和验证过程中用于整理数据。
优化器与学习率调度器:然后,代码检查了优化器和学习率调度器是否已经设置,并在必要时进行了一些配置。在这里,还进行了一些错误检查,以防模型和优化器参数不在同一个设备上,或者优化器与使用的并行处理库(如Fairscale、Deepspeed或PyTorch FSDP)不兼容。
回调函数:最后,代码初始化了一些默认的回调函数,并在需要时创建了一个远程仓库的克隆和输出目录。这些回调函数将在训练过程中的不同时间点被调用,可以用来做一些自定义的操作,比如在每个 epoch 结束后保存模型。
混合精度设置:代码首先检查是否需要使用混合精度训练(即使用 fp16 或 bf16)。如果需要,根据后端类型(例如 "cuda_amp" 或 "cpu_amp"),选择正确的混合精度训练策略。在这里,也进行了一些错误检查,以防混合精度训练与使用的并行处理库(如SageMaker Model Parallelism)不兼容。
标签平滑:然后,代码检查是否需要使用标签平滑(一种常见的防止过拟合的技巧),并在需要时设置相应的对象。
训练器状态和控制:接下来,代码初始化了训练器的状态和控制对象,这两个对象将在训练过程中用于跟踪训练的进展和控制训练的流程。
其他设置:最后,代码还进行了一些其他的设置,比如初始化内存跟踪器,设置训练批次的大小,以及处理一些特定的训练参数(如 "torch_compile")
- # 获取训练采样器
- def _get_train_sampler(self) -> Optional[torch.utils.data.Sampler]:
- if self.train_dataset is None or not has_length(self.train_dataset): # 如果没有训练数据集或训练数据集没有长度,返回None
- return None
-
- # 创建采样器
- if self.args.group_by_length: # 如果参数设定了按长度分组
- if is_datasets_available() and isinstance(self.train_dataset, datasets.Dataset): # 如果有datasets库并且训练数据集是datasets.Dataset的实例
- lengths = (
- self.train_dataset[self.args.length_column_name]
- if self.args.length_column_name in self.train_dataset.column_names
- else None
- ) # 如果训练数据集中有长度列名,获取长度,否则长度为None
- else:
- lengths = None # 否则,长度为None
- model_input_name = self.tokenizer.model_input_names[0] if self.tokenizer is not None else None # 获取模型输入名称
- return LengthGroupedSampler( # 返回长度分组采样器
- self.args.train_batch_size * self.args.gradient_accumulation_steps,
- dataset=self.train_dataset,
- lengths=lengths,
- model_input_name=model_input_name,
- )
-
- else:
- return RandomSampler(self.train_dataset) # 否则,返回随机采样器
- # 获取训练数据的 DataLoader
- def get_train_dataloader(self) -> DataLoader:
- """
- 返回训练[`~torch.utils.data.DataLoader`]。
- 如果`train_dataset`未实现`__len__`,将不使用采样器,
- 否则,使用适应于分布式训练的随机采样器。
- 如果想注入一些自定义行为,可以在子类中重写此方法。
- """
- # 如果训练集为空,则抛出 ValueError
- if self.train_dataset is None:
- raise ValueError("Trainer: training requires a train_dataset.")
-
- # 创建训练数据集和数据整理器
- train_dataset = self.train_dataset
- data_collator = self.data_collator
- # 如果训练集是数据集的实例,移除未使用的列
- if is_datasets_available() and isinstance(train_dataset, datasets.Dataset):
- train_dataset = self._remove_unused_columns(train_dataset, description="training")
- # 否则,使用数据整理器移除未使用的列
- else:
- data_collator = self._get_collator_with_removed_columns(data_collator, description="training")
-
- # 定义 DataLoader 参数
- dataloader_params = {
- "batch_size": self._train_batch_size,
- "collate_fn": data_collator,
- "num_workers": self.args.dataloader_num_workers,
- "pin_memory": self.args.dataloader_pin_memory,
- }
-
- # 如果训练集不是迭代的数据集,设定采样器和其他参数
- if not isinstance(train_dataset, torch.utils.data.IterableDataset):
- dataloader_params["sampler"] = self._get_train_sampler()
- dataloader_params["drop_last"] = self.args.dataloader_drop_last
- dataloader_params["worker_init_fn"] = seed_worker
-
- # 返回由 accelerator 处理过的 DataLoader
- return self.accelerator.prepare(DataLoader(train_dataset, **dataloader_params))
- # 获取评估数据的采样器
- def _get_eval_sampler(self, eval_dataset: Dataset) -> Optional[torch.utils.data.Sampler]:
- # 废弃的代码
- if self.args.use_legacy_prediction_loop:
- # 如果是在TPU上运行,返回 SequentialDistributedSampler
- if is_torch_tpu_available():
- return SequentialDistributedSampler(
- eval_dataset, num_replicas=xm.xrt_world_size(), rank=xm.get_ordinal()
- )
- # 如果是在Sagemaker多处理器环境中运行,返回SequentialDistributedSampler
- elif is_sagemaker_mp_enabled():
- return SequentialDistributedSampler(
- eval_dataset,
- num_replicas=smp.dp_size(),
- rank=smp.dp_rank(),
- batch_size=self.args.per_device_eval_batch_size,
- )
- # 其他情况下,返回顺序采样器
- else:
- return SequentialSampler(eval_dataset)
-
- # 如果是单机环境,返回顺序采样器;否则,返回 None
- if self.args.world_size <= 1:
- return SequentialSampler(eval_dataset)
- else:
- return None
- # 获取评估数据的 DataLoader
- def get_eval_dataloader(self, eval_dataset: Optional[Dataset] = None) -> DataLoader:
- """
- 返回评估[`~torch.utils.data.DataLoader`]。
- 如果想注入一些自定义行为,可以在子类中重写此方法。
- Args:
- eval_dataset (`torch.utils.data.Dataset`, *optional*):
- 如果提供,将覆盖`self.eval_dataset`。如果它是一个[`~datasets.Dataset`],自动删除模型的`forward()`
- 方法不接受的列。必须实现`__len__`。
- """
- # 如果评估集为空,则抛出 ValueError
- if eval_dataset is None and self.eval_dataset is None:
- raise ValueError("Trainer: evaluation requires an eval_dataset.")
- # 创建评估数据集和数据整理器
- eval_dataset = eval_dataset if eval_dataset is not None else self.eval_dataset
- data_collator = self.data_collator
-
- # 如果评估集是数据集的实例,移除未使用的列
- if is_datasets_available() and isinstance(eval_dataset, datasets.Dataset):
- eval_dataset = self._remove_unused_columns(eval_dataset, description="evaluation")
- # 否则,使用数据整理器移除未使用的列
- else:
- data_collator = self._get_collator_with_removed_columns(data_collator, description="evaluation")
-
- # 定义 DataLoader 参数
- dataloader_params = {
- "batch_size": self.args.eval_batch_size,
- "collate_fn": data_collator,
- "num_workers": self.args.dataloader_num_workers,
- "pin_memory": self.args.dataloader_pin_memory,
- }
-
- # 如果评估集不是迭代的数据集,设定采样器和其他参数
- if not isinstance(eval_dataset, torch.utils.data.IterableDataset):
- dataloader_params["sampler"] = self._get_eval_sampler(eval_dataset)
- dataloader_params["drop_last"] = self.args.dataloader_drop_last
-
- # 返回由 accelerator 处理过的 DataLoader
- return self.accelerator.prepare(DataLoader(eval_dataset, **dataloader_params))
- def get_test_dataloader(self, test_dataset: Dataset) -> DataLoader:
- """
- 返回测试集的数据加载器 [`~torch.utils.data.DataLoader`]
- 如果需要插入一些自定义行为,可以在子类中重写此方法
- Args:
- test_dataset (`torch.utils.data.Dataset`, *optional*):
- 要使用的测试数据集。如果它是一个 [`~datasets.Dataset`],则自动删除 `model.forward()` 方法不接受的列。它必须实现 `__len__`
- """
- data_collator = self.data_collator # 获取数据处理器
-
- # 如果datasets库可用且test_dataset是datasets.Dataset类型,移除不必要的列
- if is_datasets_available() and isinstance(test_dataset, datasets.Dataset):
- test_dataset = self._remove_unused_columns(test_dataset, description="test")
- else:
- data_collator = self._get_collator_with_removed_columns(data_collator, description="test")
-
- # 定义数据加载器参数
- dataloader_params = {
- "batch_size": self.args.eval_batch_size, # 批大小
- "collate_fn": data_collator, # 数据处理函数
- "num_workers": self.args.dataloader_num_workers, # 工作线程数量
- "pin_memory": self.args.dataloader_pin_memory, # 是否将数据加载器的数据放在固定的内存区域
- }
-
- # 如果test_dataset不是可迭代数据集,添加采样器和drop_last参数
- if not isinstance(test_dataset, torch.utils.data.IterableDataset):
- dataloader_params["sampler"] = self._get_eval_sampler(test_dataset) # 添加采样器
- dataloader_params["drop_last"] = self.args.dataloader_drop_last # 是否丢弃最后不完整的批次
-
- # 返回加速器准备好的数据加载器
- return self.accelerator.prepare(DataLoader(test_dataset, **dataloader_params))
- def create_optimizer_and_scheduler(self, num_training_steps: int):
- """
- 设置优化器和学习率调度器
- 我们提供一个合理的默认值,工作得很好。如果你想使用其他的,你可以在Trainer的init中通过`optimizers`传递一个元组,或者在子类中重写此方法(或`create_optimizer`和/或`create_scheduler`)。
- """
- self.create_optimizer() # 创建优化器
- # 如果SageMaker版本大于等于1.10且启用了fp16,解包优化器
- if IS_SAGEMAKER_MP_POST_1_10 and smp.state.cfg.fp16:
- optimizer = self.optimizer.optimizer
- else:
- optimizer = self.optimizer
- self.create_scheduler(num_training_steps=num_training_steps, optimizer=optimizer) # 创建学习率调度器
- def create_optimizer(self):
- """
- 设置优化器。
- 我们提供一个合理的默认值,工作得很好。如果你想使用其他的,你可以在Trainer的init中通过`optimizers`传递一个元组,或者在子类中重写此方法。
- """
- # 根据是否启用了SageMaker模型并行,选择不同的模型
- opt_model = self.model_wrapped if is_sagemaker_mp_enabled() else self.model
-
- # 如果优化器为空,初始化一个新的优化器
- if self.optimizer is None:
- # 获取待优化参数,并区分是否需要权重衰减
- decay_parameters = get_parameter_names(opt_model, ALL_LAYERNORM_LAYERS)
- decay_parameters = [name for name in decay_parameters if "bias" not in name]
- optimizer_grouped_parameters = [
- {
- "params": [
- p for n, p in opt_model.named_parameters() if (n in decay_parameters and p.requires_grad)
- ],
- "weight_decay": self.args.weight_decay, # 权重衰减
- },
- {
- "params": [
- p for n, p in opt_model.named_parameters() if (n not in decay_parameters and p.requires_grad)
- ],
- "weight_decay": 0.0, # 不需要权重衰减
- },
- ]
-
- # 获取优化器类和参数
- optimizer_cls, optimizer_kwargs = Trainer.get_optimizer_cls_and_kwargs(self.args)
-
- # 如果启用了简单的分片DDP,使用OSS作为优化器,否则使用获取的优化器
- if self.sharded_ddp == ShardedDDPOption.SIMPLE:
- self.optimizer = OSS(
- params=optimizer_grouped_parameters,
- optim=optimizer_cls,
- **optimizer_kwargs,
- )
- else:
- self.optimizer = optimizer_cls(optimizer_grouped_parameters, **optimizer_kwargs)
- if optimizer_cls.__name__ == "Adam8bit":
- import bitsandbytes
-
- manager = bitsandbytes.optim.GlobalOptimManager.get_instance()
-
- skipped = 0
- for module in opt_model.modules():
- if isinstance(module, nn.Embedding):
- skipped += sum({p.data_ptr(): p.numel() for p in module.parameters()}.values())
- logger.info(f"skipped {module}: {skipped/2**20}M params")
- manager.register_module_override(module, "weight", {"optim_bits": 32})
- logger.debug(f"bitsandbytes: will optimize {module} in fp32")
- logger.info(f"skipped: {skipped/2**20}M params")
-
- # 如果启用了SageMaker模型并行,使用SageMaker的分布式优化器
- if is_sagemaker_mp_enabled():
- self.optimizer = smp.DistributedOptimizer(self.optimizer)
-
- return self.optimizer
根据提供的参数,选择并配置合适的优化器,以便在模型训练中使用
- # 定义创建学习率调度器的函数
- def create_scheduler(self, num_training_steps: int, optimizer: torch.optim.Optimizer = None):
- """
- 设置调度器。训练器的优化器必须在调用此方法之前已经设置好,或者作为参数传递。
- Args:
- num_training_steps (int): 要进行的训练步数。
- """
- # 如果调度器还未设置
- if self.lr_scheduler is None:
- # 使用 get_scheduler 函数创建调度器
- self.lr_scheduler = get_scheduler(
- self.args.lr_scheduler_type,
- optimizer=self.optimizer if optimizer is None else optimizer,
- num_warmup_steps=self.args.get_warmup_steps(num_training_steps),
- num_training_steps=num_training_steps,
- )
- # 返回创建的学习率调度器
- return self.lr_scheduler
- """
- 主要训练入口
- """
- def train(
- self,
- # 可选参数,接收字符串或布尔类型,代表从哪个检查点恢复训练
- resume_from_checkpoint: Optional[Union[str, bool]] = None,
- # 可选参数,接收Optuna的Trial实例或者包含超参数的字典
- trial: Union["optuna.Trial", Dict[str, Any]] = None,
- # 可选参数,接收一个字符串列表,代表在模型输出中需要忽略的键值
- ignore_keys_for_eval: Optional[List[str]] = None,
- **kwargs, # 接收其他关键字参数,用于隐藏已弃用的参数
- ):
-
- # 如果resume_from_checkpoint为False,将其设置为None
- if resume_from_checkpoint is False:
- resume_from_checkpoint = None
-
- # 内存指标 - 必须尽早设置
- self._memory_tracker.start()
-
- args = self.args
-
- # 设置训练状态为True
- self.is_in_train = True
-
- # do_train可能未设置,但仍然可能调用.train(),所以下面的操作是为了避免这种情况
- if (args.fp16_full_eval or args.bf16_full_eval) and not args.do_train:
- self._move_model_to_device(self.model, args.device)
-
- # 如果关键字参数中包含model_path
- if "model_path" in kwargs:
- # 将model_path的值赋给resume_from_checkpoint并在kwargs中删除model_path
- resume_from_checkpoint = kwargs.pop("model_path")
- warnings.warn(
- "`model_path` is deprecated and will be removed in a future version. Use `resume_from_checkpoint` "
- "instead.", # 发出关于model_path将在未来版本中删除的警告
- FutureWarning,
- )
-
- # 如果还有未处理的关键字参数
- if len(kwargs) > 0:
- raise TypeError(f"train() received got unexpected keyword arguments: {', '.join(list(kwargs.keys()))}.") # 抛出类型错误
-
- # 这可能会改变随机种子,因此需要先运行
- self._hp_search_setup(trial)
- self._train_batch_size = self.args.train_batch_size # 设置训练批次大小
-
- # 重载模型
- model_reloaded = False
- if self.model_init is not None: # 如果模型初始化方法存在
- # 在实例化模型时,必须先设置随机种子
- enable_full_determinism(self.args.seed) if self.args.full_determinism else set_seed(self.args.seed)
-
- # 使用试验的超参数初始化模型
- self.model = self.call_model_init(trial)
-
- # 将模型重载标记设置为True
- model_reloaded = True
-
- # 重新初始化优化器和调度器
- self.optimizer, self.lr_scheduler = None, None
-
- # 加载可能存在的模型检查点
- # 如果resume_from_checkpoint是bool类型且值为True
- if isinstance(resume_from_checkpoint, bool) and resume_from_checkpoint:
- # 从输出目录中获取最新的检查点
- resume_from_checkpoint = get_last_checkpoint(args.output_dir)
-
- # 如果没有找到有效的检查点
- if resume_from_checkpoint is None:
- raise ValueError(f"No valid checkpoint found in output directory ({args.output_dir})") # 抛出值错误
-
- # 如果resume_from_checkpoint不为None,并且SageMaker MP和DeepSpeed没有启用
- if resume_from_checkpoint is not None and not is_sagemaker_mp_enabled() and not self.is_deepspeed_enabled:
- # 从检查点恢复模型
- self._load_from_checkpoint(resume_from_checkpoint)
-
- # 如果模型已经重载,将其放在正确的设备上并更新self.model_wrapped
- if model_reloaded:
- if self.place_model_on_device:
- self._move_model_to_device(self.model, args.device)
- self.model_wrapped = self.model
-
- # 查找可执行的批次大小
- inner_training_loop = find_executable_batch_size(
- self._inner_training_loop, self._train_batch_size, args.auto_find_batch_size
- )
-
- # 进行内部训练循环
- return inner_training_loop(
- args=args,
- resume_from_checkpoint=resume_from_checkpoint,
- trial=trial,
- ignore_keys_for_eval=ignore_keys_for_eval,
- )
首先,代码计算了每个epoch中的训练步骤数量(steps_in_epoch
),这可以是数据加载器的长度,或者是最大步数乘以梯度累积步数。
然后,它会处理开始新的训练epoch,包括可能的从检查点恢复训练的步骤。
代码遍历了每个训练步骤,每个步骤接收输入数据,并进行以下操作:
在每个epoch结束时,代码处理epoch的结束,可能会记录、保存和评估模型,检查是否有配置的TPU,并决定是否应该停止整个训练
这个函数主要执行的是在训练过程中的日志记录、模型评估和模型保存的操作。主要步骤包括:
一个训练步骤的实现,它涵盖了一个批量数据的前向和后向传播
- # `training_step`函数表示训练过程中的一步操作,涵盖了模型的前向和后向传播
- def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
-
- # 将模型设置为训练模式,这对于某些层(如Dropout或BatchNorm)的行为有影响,因为它们在训练和评估阶段的行为是不同的
- model.train()
-
- # 调用一个辅助方法准备模型的输入,具体的实现取决于模型的需求
- inputs = self._prepare_inputs(inputs)
-
- # 如果启用了 SageMaker Model Parallelism,则使用 `smp_forward_backward` 在多个 GPU 上执行前向和后向操作
- # 然后减小损失,并将其从计算图中分离
- if is_sagemaker_mp_enabled():
- loss_mb = smp_forward_backward(model, inputs, self.args.gradient_accumulation_steps)
- return loss_mb.reduce_mean().detach().to(self.args.device)
-
- # 计算损失值
- with self.compute_loss_context_manager():
- loss = self.compute_loss(model, inputs)
-
- # 如果使用的 GPU 数量大于 1,则对损失值取平均,以处理多 GPU 并行训练
- if self.args.n_gpu > 1:
- loss = loss.mean() # mean() to average on multi-gpu parallel training
-
- # 根据是否进行梯度缩放,选择不同的后向传播方式
- if self.do_grad_scaling:
- self.scaler.scale(loss).backward() # 使用梯度缩放进行后向传播,可以防止在混合精度训练中出现梯度下溢
- elif self.use_apex:
- with amp.scale_loss(loss, self.optimizer) as scaled_loss: # 如果使用了APEX工具进行混合精度训练,则需要对损失进行缩放后再进行后向传播
- scaled_loss.backward()
- else:
- self.accelerator.backward(loss) # 使用加速器进行后向传播,适用于没有使用梯度缩放和APEX的情况
-
- # 返回损失值,如果设置了梯度累积步骤,则需要将损失值除以梯度累积步骤数
- return loss.detach() / self.args.gradient_accumulation_steps
计算损失
- # `compute_loss`函数用于计算模型的损失值
- def compute_loss(self, model, inputs, return_outputs=False):
-
- # 如果存在标签平滑处理器且输入中有标签,则将标签从输入中移除
- if self.label_smoother is not None and "labels" in inputs:
- labels = inputs.pop("labels")
- else:
- labels = None
-
- # 使用模型进行前向传播,得到输出
- outputs = model(**inputs)
-
- # 如果存在之前的状态信息,保存它
- # TODO: 这部分需要在未来进行清理和优化
- if self.args.past_index >= 0:
- self._past = outputs[self.args.past_index]
-
- # 如果标签存在,使用标签平滑处理器计算损失
- if labels is not None:
- if unwrap_model(model)._get_name() in MODEL_FOR_CAUSAL_LM_MAPPING_NAMES.values():
- loss = self.label_smoother(outputs, labels, shift_labels=True)
- else:
- loss = self.label_smoother(outputs, labels)
- else:
- # 如果输出是一个字典,但并未包含损失,那么抛出错误
- if isinstance(outputs, dict) and "loss" not in outputs:
- raise ValueError(
- "The model did not return a loss from the inputs, only the following keys: "
- f"{','.join(outputs.keys())}. For reference, the inputs it received are {','.join(inputs.keys())}."
- )
- # 我们并未直接使用.outputs,因为模型可能返回的是元组,而非ModelOutput
- loss = outputs["loss"] if isinstance(outputs, dict) else outputs[0]
-
- # 如果`return_outputs`为真,返回损失和输出;否则只返回损失
- return (loss, outputs) if return_outputs else loss
- def predict(
- self, test_dataset: Dataset, ignore_keys: Optional[List[str]] = None, metric_key_prefix: str = "test"
- ) -> PredictionOutput:
-
- # 设置内存跟踪器,尽早启动
- self._memory_tracker.start()
-
- # 获取测试数据集的数据加载器
- test_dataloader = self.get_test_dataloader(test_dataset)
-
- # 记录开始时间
- start_time = time.time()
-
- # 选择预测循环或评估循环,这取决于args中的use_legacy_prediction_loop参数
- eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
-
- # 运行选定的循环,并获得预测或评估输出
- output = eval_loop(
- test_dataloader, description="Prediction", ignore_keys=ignore_keys, metric_key_prefix=metric_key_prefix
- )
-
- # 计算总批次大小,包括所有的并行处理单元
- total_batch_size = self.args.eval_batch_size * self.args.world_size
-
- # 如果度量指标中包含jit编译时间,那么将这段时间加到开始时间中
- if f"{metric_key_prefix}_jit_compilation_time" in output.metrics:
- start_time += output.metrics[f"{metric_key_prefix}_jit_compilation_time"]
-
- # 更新度量指标,包括预测速度相关的指标
- output.metrics.update(
- speed_metrics(
- metric_key_prefix,
- start_time,
- num_samples=output.num_samples,
- num_steps=math.ceil(output.num_samples / total_batch_size),
- )
- )
-
- # 使用回调处理器进行预测后的操作,并更新控制状态
- self.control = self.callback_handler.on_predict(self.args, self.state, self.control, output.metrics)
-
- # 停止内存跟踪器,并更新相关度量指标
- self._memory_tracker.stop_and_update_metrics(output.metrics)
-
- # 返回预测结果,包括预测值,标签(如果存在)和度量指标
- return PredictionOutput(predictions=output.predictions, label_ids=output.label_ids, metrics=output.metrics)
// 待更
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。