赞
踩
★★★ 本文源自AlStudio社区精品项目,【点击此处】查看更多精品内容 >>>
@inproceedings{haoyietal-informer-2021,
author = {Haoyi Zhou and
Shanghang Zhang and
Jieqi Peng and
Shuai Zhang and
Jianxin Li and
Hui Xiong and
Wancai Zhang},
title = {Informer: Beyond Efficient Transformer for Long Sequence Time-Series Forecasting},
booktitle = {The Thirty-Fifth {AAAI} Conference on Artificial Intelligence, {AAAI} 2021, Virtual Conference},
volume = {35},
number = {12},
pages = {11106–11115},
publisher = {{AAAI} Press},
year = {2021},
}
一个基于Transformer的效率优化的长时间序列预测模型
ProbSparse自注意机制,在时间复杂度和内存使用方面达到O(Llog L),并且在序列依赖项对齐方面具有相当的性能。Informer核心创新点,解决二次计算复杂度,高耗内存,预测长输出受限问题。
自注意蒸馏通过将级联层输入减半来突出支配注意,并有效地处理极长输入序列。通过训练一个较小的模型来指导一个较大的模型的学习,从而提高模型的效率和准确性。
生成式解码器虽然概念简单,但在一次向前操作中预测长时间序列,而不是一步一步地预测,这大大提高了长序列预测的推理速度。在4个大规模数据集上的大量实验表明,Informer算法显著优于现有方法,为LSTF问题提供了一种新的解决方案
将输入表示为统一的矩阵,可以更好地处理各种输入类型。
自注意蒸馏机制是一种高效的方法,通过训练一个较小的模型来指导一个较大的模型的学习,以提高模型的效率和准确性。自注意蒸馏机制的核心思想是,在训练小模型时,利用大模型的注意力分布来指导小模型的训练,以便小模型可以更好地模仿大模型。
自注意蒸馏机制的特点是,它能够捕捉输入数据的全局特征,并将这些特征传递给小模型,从而使小模型能够更好地模拟大模型。此外,自注意蒸馏机制还可以通过最小化目标函数来优化模型的参数,从而提高模型的准确性。
自注意蒸馏机制的原理是,它通过对输入数据的注意力分布进行计算,从而捕捉输入数据的全局特征。然后,将这些特征传递给小模型,以便小模型可以更好地模仿大模型。同时,自注意蒸馏机制还通过最小化目标函数来优化模型的参数,从而提高模型的准确性。
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
import numpy as np
import math
下面这段代码定义了两个类:TriangularCausalMask 和 ProbMask。这两个类都是用来实现概率掩码(Probability Mask)的。概率掩码是一种在生成文本时常用的技术,它有助于减少生成的文本中重复或不相关的单词。
TriangularCausalMask 类实现了一个对角线因果掩码(Diagonal Causal Mask),其中对角线因果掩码用于引导生成过程。在这个例子中,类接收三个参数:batch size (B),序列长度(L),以及设备(device)。它使用 PaddlePaddle 的 extract_loc 函数生成一个对角线因果掩码,并将其放在设备上。然后,将此掩码暴露给外界的属性为 mask。
另一方面,ProbMask 类实现了一个概率掩码,它基于一个指示器(indicator)张量。这个指示器张量用于确定哪个单词应该被保留,哪个单词应该被掩码掉。在这个例子中,类接收五个参数:batch size (B),每个时间步长的嵌入维度(H),序列长度(L),掩码的索引(index),以及嵌入的得分(scores)。这个类通过使用 PaddlePaddle 的 triu 函数创建一个上三角形张量,然后将其扩展并转换为概率掩码。这个概率掩码被暴露给外界的属性为 mask。
class TriangularCausalMask(): def __init__(self, B, L, device="cpu"): mask_shape = [B, 1, L, L] with paddle.disable_grad(): self._mask = paddle.nn.functional.extract_loc(paddle.ones(mask_shape, dtype=paddle.bool), diagonal=1).to(device) @property def mask(self): return self._mask class ProbMask(): def __init__(self, B, H, L, index, scores, device="cpu"): _mask = paddle.ones(L, scores.shape[-1], dtype=paddle.bool).to(device).triu(1) _mask_ex = _mask[None, None, :].expand(B, H, L, scores.shape[-1]) indicator = _mask_ex[paddle.arange(B)[:, None, None], paddle.arange(H)[None, :, None], index, :].to(device) self._mask = indicator.view(scores.shape).to(device) @property def mask(self): return self._mask
#mask_flag:是否对输入的 queries 进行掩码处理,如果为 True,则使用 TriangularCausalMask 对 queries 进行掩码处理。 # factor:计算注意力权重的缩放因子。 # scale:缩放因子,可以替代 factor。 # attention_dropout:注意力机制中的dropout概率。 # output_attention:是否输出注意力权重。 # 在 forward 函数中,输入的 queries、keys 和 values 分别具有形状 (B, L, H, E)、(B, S, H, E) 和 (B, L, D)。 # 其中,B 表示批次大小,L 表示 queries 的长度,H 表示 queries 的通道数,E 表示 queries 的维度。paddle.einsum函数用于执行多维张量的线性操作,这里用于计算注意力分数。 # 在计算分数之后,根据 self.mask_flag 是否为 True,以及是否传递了 attn_mask 参数来判断是否进行掩码处理。然后通过 self.dropout 函数对注意力权重进行 dropout 处理,并使用 paddle.softmax 函数计算注意力权重。 # 最后,将注意力权重和 values 相乘得到输出结果。 # 如果 self.output_attention 为 True,则返回输出结果和注意力权重;否则只返回输出结果。 class FullAttention(nn.Layer): def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False): super(FullAttention, self).__init__() self.scale = scale self.mask_flag = mask_flag self.output_attention = output_attention self.dropout = nn.Dropout(attention_dropout) def forward(self, queries, keys, values, attn_mask): B, L, H, E = queries.shape _, S, _, D = values.shape scale = self.scale or 1./sqrt(E) scores = paddle.einsum("blhe,bshe->bhls", queries, keys) if self.mask_flag: if attn_mask is None: attn_mask = TriangularCausalMask(B, L, device=queries.device) scores.masked_fill_(attn_mask.mask, -np.inf) A = self.dropout(paddle.softmax(scale * scores, dim=-1)) V = paddle.einsum("bhls,bshd->blhd", A, values) if self.output_attention: return (V.contiguous(), A) else: return (V.contiguous(), None) class ProbAttention(nn.Layer): def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False): super(ProbAttention, self).__init__() self.factor = factor self.scale = scale self.mask_flag = mask_flag self.output_attention = output_attention self.dropout = nn.Dropout(attention_dropout) def _prob_QK(self, Q, K, sample_k, n_top): # n_top: c*ln(L_q) # Q [B, H, L, D] B, H, L_K, E = K.shape _, _, L_Q, _ = Q.shape # 计算采样后的Q_K K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E) index_sample = paddle.randint(L_K, (L_Q, sample_k)) # real U = U_part(factor*ln(L_k))*L_q K_sample = K_expand[:, :, paddle.arange(L_Q).unsqueeze(1), index_sample, :] Q_K_sample = paddle.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze(-2) # 使用稀疏度量查找Top_k M = Q_K_sample.max(-1)[0] - paddle.sum(Q_K_sample, -1) / L_K M_top = M.topk(n_top, sorted=False)[1] # 使用简化后的Q来计算Q_K Q_reduce = Q[paddle.arange(B)[:, None, None], paddle.arange(H)[None, :, None], M_top, :] # factor*ln(L_q) Q_K = paddle.matmul(Q_reduce, K.transpose(-2, -1)) # factor*ln(L_q)*L_k return Q_K, M_top def _get_initial_context(self, V, L_Q): B, H, L_V, D = V.shape if not self.mask_flag: # V_sum = V.sum(dim=-2) V_sum = V.mean(dim=-2) contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone() else: # use mask assert(L_Q == L_V) # 要求L_Q == L_V,即仅用于自注意力机制 contex = V.cumsum(dim=-2) return contex def _update_context(self, context_in, V, scores, index, L_Q, attn_mask): B, H, L_V, D = V.shape if self.mask_flag: attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device) scores.masked_fill_(attn_mask.mask, -np.inf) attn = paddle.softmax(scores, dim=-1) # nn.Softmax(dim=-1)(scores) context_in[paddle.arange(B)[:, None, None], paddle.arange(H)[None, :, None], index, :] = paddle.matmul(attn, V).type_as(context_in) if self.output_attention: attns = (paddle.ones([B, H, L_V, L_V])/L_V).type_as(attn).to(attn.device) attns[paddle.arange(B)[:, None, None], paddle.arange(H)[None, :, None], index, :] = attn return (context_in, attns) else: return (context_in, None) def forward(self, queries, keys, values, attn_mask): B, L_Q, H, D = queries.shape _, L_K, _, _ = keys.shape queries = queries.transpose(2,1) keys = keys.transpose(2,1) values = values.transpose(2,1) U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item() # c*ln(L_k) u = self.factor * np.ceil(np.log(L_Q)).astype('int').item() # c*ln(L_q) U_part = U_part if U_part<L_K else L_K u = u if u<L_Q else L_Q scores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u) # 加比例因子 scale = self.scale or 1./sqrt(D) if scale is not None: scores_top = scores_top * scale # 获取上下文 context = self._get_initial_context(values, L_Q) # 用选定的top_k个查询更新上下文 context, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask) return context.transpose(2,1).contiguous(), attn class AttentionLayer(nn.Layer): def __init__(self, attention, d_model, n_heads, d_keys=None, d_values=None, mix=False): super(AttentionLayer, self).__init__() d_keys = d_keys or (d_model//n_heads) d_values = d_values or (d_model//n_heads) self.inner_attention = attention self.query_projection = nn.Linear(d_model, d_keys * n_heads) self.key_projection = nn.Linear(d_model, d_keys * n_heads) self.value_projection = nn.Linear(d_model, d_values * n_heads) self.out_projection = nn.Linear(d_values * n_heads, d_model) self.n_heads = n_heads self.mix = mix def forward(self, queries, keys, values, attn_mask): B, L, _ = queries.shape _, S, _ = keys.shape H = self.n_heads queries = self.query_projection(queries).view(B, L, H, -1) keys = self.key_projection(keys).view(B, S, H, -1) values = self.value_projection(values).view(B, S, H, -1) out, attn = self.inner_attention( queries, keys, values, attn_mask ) if self.mix: out = out.transpose(2,1).contiguous() out = out.view(B, L, -1) return self.out_projection(out), attn
# 创建一个长度为 max_len 的零张量 pe,其维度为 d_model。 # 将 pe.require_grad 设置为 False,以确保反向传播时不会计算 pe 的梯度。 # 创建一个与 pe 相同维度的张量 position,其中包含从 0 到 max_len-1 的数字。 # 创建一个与 pe 相同维度的张量 div_term,其中包含 -(math.log(10000.0) / d_model) 的幂次。 # 将 pe 中每两个相邻的元素设置为 position * div_term 的正弦和余弦值。 # 将 pe 张量展开为一个大小为 (1, max_len, d_model) 的张量,并将其传递到前向传递中。在前向传递中,该类将输入张量 x 的长度与位置编码矩阵的长度进行比较,并返回相应长度的位置编码张量。 # 这个 PositionalEmbedding 类可以用于为输入序列中的每个位置提供位置编码,这些位置编码可用于各种任务。 class PositionalEmbedding(nn.Layer): def __init__(self, d_model, max_len=5000): super(PositionalEmbedding, self).__init__() # Compute the positional encodings once in log space. pe = paddle.zeros(max_len, d_model).float() pe.require_grad = False position = paddle.arange(0, max_len).float().unsqueeze(1) div_term = (paddle.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp() pe[:, 0::2] = paddle.sin(position * div_term) pe[:, 1::2] = paddle.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer('pe', pe) def forward(self, x): return self.pe[:, :x.size(1)] # 这是一个PaddlePaddle中的TokenEmbedding类,继承自nn.Layer。它包括一个卷积层,用于从输入张量x中提取嵌入特征。 # 在初始化方法中,它定义了一个名为tokenConv的卷积层,输入通道数为c_in,输出通道数为d_model,卷积核大小为3,填充大小为1(如果使用的是PaddlePaddle 1.5.0及以上的版本)或2,填充方式为循环模式。 #然后,它使用nn.init.kaiming_normal_函数初始化所有模块的权重,包括卷积层,使用fan_in模式和leaky_relu非线性激活函数。 # 在前向传递方法中,它首先将输入张量x的维度转置为(0, 2, 1),然后通过卷积层tokenConv进行处理。处理后,它将输出张量的维度转置回(0, 1, 2),并返回结果。 # 这个TokenEmbedding类可以用于为输入序列中的每个token提供嵌入特征,这些嵌入特征可以用于各种NLP任务,例如文本分类、命名实体识别等。 class TokenEmbedding(nn.Layer): def __init__(self, c_in, d_model): super(TokenEmbedding, self).__init__() padding = 1 if paddle.__version__>='1.5.0' else 2 self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model, kernel_size=3, padding=padding, padding_mode='circular') for m in self.modules(): if isinstance(m, nn.Conv1d): nn.init.kaiming_normal_(m.weight,mode='fan_in',nonlinearity='leaky_relu') def forward(self, x): x = self.tokenConv(x.permute(0, 2, 1)).transpose(1,2) return x # 这是一个名为FixedEmbedding的PaddlePaddle层类,它继承自nn.Layer。 # 在初始化方法中,它定义了一个名为w的张量,其大小为c_in x d_model,并将其梯度设置为False。 # 接下来,它创建了一个位置张量position,其大小为c_in x 1,并将其梯度设置为False。然后,它计算了div_term,即(d_model / 2) * math.log(10000)的幂次方。 # 最后,它将w张量设置为正弦和余弦函数的组合,这些函数是位置张量乘以div_term的结果。 # 在forward方法中,它创建了一个名为emb的nn.Embedding对象,并将权重设置为初始化方法中计算出的w张量。 # 这个FixedEmbedding类可以在固定的位置编码上使用标准的nn.Embedding。因此,它的实现不需要在运行时进行昂贵的计算。 # 通常,它适用于需要高效计算的任务,例如大规模文本分类。 class FixedEmbedding(nn.Layer): def __init__(self, c_in, d_model): super(FixedEmbedding, self).__init__() w = paddle.zeros(c_in, d_model).float() w.require_grad = False position = paddle.arange(0, c_in).float().unsqueeze(1) div_term = (paddle.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp() w[:, 0::2] = paddle.sin(position * div_term) w[:, 1::2] = paddle.cos(position * div_term) self.emb = nn.Embedding(c_in, d_model) self.emb.weight = nn.Parameter(w, requires_grad=False) def forward(self, x): return self.emb(x).detach() # 这是一个名为TemporalEmbedding的PaddlePaddle层类,它继承自nn.Layer。 # 在初始化方法中,它定义了一些大小,具体取决于要嵌入的时间特征的类型和频率。 # 然后,根据embed_type参数,它创建了一个FixedEmbedding对象(如果embed_type=='fixed')或nn.Embedding对象。 # 最后,它根据freq参数创建了一个时间特征的嵌入对象。 # 在forward方法中,如果freq='t',则根据分钟大小创建一个嵌入对象minute_embed,并根据小时大小创建一个嵌入对象hour_embed。 # 然后,根据星期大小创建一个嵌入对象weekday_embed,根据天大小创建一个嵌入对象day_embed,根据月大小创建一个嵌入对象month_embed。 # 最后,它将所有嵌入对象的输出连接在一起,并返回结果。 # 这个TemporalEmbedding类可以用于为时间序列数据中的不同时间特征(例如分钟、小时、星期、天和月)添加位置编码。 class TemporalEmbedding(nn.Layer): def __init__(self, d_model, embed_type='fixed', freq='h'): super(TemporalEmbedding, self).__init__() minute_size = 4; hour_size = 24 weekday_size = 7; day_size = 32; month_size = 13 Embed = FixedEmbedding if embed_type=='fixed' else nn.Embedding if freq=='t': self.minute_embed = Embed(minute_size, d_model) self.hour_embed = Embed(hour_size, d_model) self.weekday_embed = Embed(weekday_size, d_model) self.day_embed = Embed(day_size, d_model) self.month_embed = Embed(month_size, d_model) def forward(self, x): x = x.long() minute_x = self.minute_embed(x[:,:,4]) if hasattr(self, 'minute_embed') else 0. hour_x = self.hour_embed(x[:,:,3]) weekday_x = self.weekday_embed(x[:,:,2]) day_x = self.day_embed(x[:,:,1]) month_x = self.month_embed(x[:,:,0]) return hour_x + weekday_x + day_x + month_x + minute_x # 这是一个名为TimeFeatureEmbedding的PaddlePaddle层类,它继承自nn.Layer。 # 在初始化方法中,它定义了一个名为embed的nn.Linear对象,输入维度为freq_map[freq],输出维度为d_model。 # 其中,freq_map是一个字典,用于将频率('h'、't'、's'、'm'、'a'、'w'、'd'和'b'分别代表小时、分钟、秒、分钟内的刻钟、天、周、月和周内的小时)映射到相应的输入维度。 # 在前向传递方法中,它通过调用nn.Linear对象的forward方法来计算嵌入特征。 # 这个TimeFeatureEmbedding类可以用于将时间序列数据中的不同时间特征(例如小时、分钟、秒、分钟内的刻钟、天、周、月和周内的小时)映射到一个低维向量空间中。 class TimeFeatureEmbedding(nn.Layer): def __init__(self, d_model, embed_type='timeF', freq='h'): super(TimeFeatureEmbedding, self).__init__() freq_map = {'h':4, 't':5, 's':6, 'm':1, 'a':1, 'w':2, 'd':3, 'b':3} d_inp = freq_map[freq] self.embed = nn.Linear(d_inp, d_model) def forward(self, x): return self.embed(x) # 这是一个名为TimeFeatureEmbedding的PaddlePaddle层类,它继承自nn.Layer。 # 在初始化方法中,它定义了三个嵌入对象:value_embedding(用于嵌入输入序列中的值)、position_embedding(用于嵌入输入序列中的位置)和temporal_embedding(用于嵌入时间序列数据中的时间特征)。 # 其中,如果embed_type不为'timeF',则使用TemporalEmbedding对象;否则,使用TimeFeatureEmbedding对象。 # 在前向传递方法中,它首先将输入序列x传递给value_embedding对象,并将位置编码添加到结果中。 # 然后,如果embed_type不为'timeF',则将时间序列数据中的时间特征x_mark传递给temporal_embedding对象,并将结果添加到嵌入特征中。 # 最后,它通过调用nn.Dropout对象的forward方法来应用dropout正则化,并返回结果。 class DataEmbedding(nn.Layer): def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1): super(DataEmbedding, self).__init__() self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model) self.position_embedding = PositionalEmbedding(d_model=d_model) self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type, freq=freq) if embed_type!='timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq) self.dropout = nn.Dropout(p=dropout) def forward(self, x, x_mark): x = self.value_embedding(x) + self.position_embedding(x) + self.temporal_embedding(x_mark) return self.dropout(x)
# 这个ConvLayer类的初始化方法中, # 创建了一个nn.Conv1d对象downConv,一个nn.BatchNorm1d对象norm,一个nn.ELU对象activation和一个nn.MaxPool1d对象maxPool。 # 在forward方法中,先将输入x进行转置,然后经过卷积层downConv、批归一化层norm、激活函数层activation和最大池化层maxPool的处理,最后将结果转置回去并返回。 class ConvLayer(nn.Layer): def __init__(self, c_in): super(ConvLayer, self).__init__() padding = 1 if paddle.__version__>='1.5.0' else 2 self.downConv = nn.Conv1d(in_channels=c_in, out_channels=c_in, kernel_size=3, padding=padding, padding_mode='circular') self.norm = nn.BatchNorm1d(c_in) self.activation = nn.ELU() self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1) def forward(self, x): x = self.downConv(x.permute(0, 2, 1)) x = self.norm(x) x = self.activation(x) x = self.maxPool(x) x = x.transpose(1,2) return x # 这是一个PaddlePaddle的EncoderLayer类,继承自nn.Layer。 # 在初始化方法中,它定义了几个成员变量,包括self.attention(一个自注意力机制的实例),self.conv1(一个1D卷积层实例),self.conv2(另一个1D卷积层实例),self.norm1(一个规范化层实例),self.norm2(另一个规范化层实例)和self.dropout(一个dropout层实例)。 # 其中,d_ff是一个可选参数,默认为4*d_model。 # 在forward方法中,它接收一个输入张量x和一个可选参数attn_mask(一个注意力掩码张量)。 # 首先,它使用self.attention实例处理x、自身和自身,将结果加到x上,得到新的x。 # 这个处理包括使用自注意力机制计算x、自身和自身之间的相似度,然后应用softmax函数得到注意力权重,最后将结果加到x上。 # 接着,它将新的x通过规范化层self.norm1、激活函数层self.activation、卷积层self.conv1、转置张量、规范化层self.norm2、dropout层self.dropout和激活函数层self.activation处理,得到最终的输出张量。 # 其中,self.attention的返回值包括新的张量new_x和注意力权重attn。 # 最后,它通过将x作为输入并将注意力权重设置为None,在不需要的情况下禁用自注意力机制来创建具有attention_mask参数的forward方法。 class EncoderLayer(nn.Layer): def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"): super(EncoderLayer, self).__init__() d_ff = d_ff or 4*d_model self.attention = attention self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1) self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout = nn.Dropout(dropout) self.activation = F.relu if activation == "relu" else F.gelu def forward(self, x, attn_mask=None): # x [B, L, D] # x = x + self.dropout(self.attention( # x, x, x, # attn_mask = attn_mask # )) new_x, attn = self.attention( x, x, x, attn_mask = attn_mask ) x = x + self.dropout(new_x) y = x = self.norm1(x) y = self.dropout(self.activation(self.conv1(y.transpose(-1,1)))) y = self.dropout(self.conv2(y).transpose(-1,1)) return self.norm2(x+y), attn # 这是一个PaddlePaddle的Encoder类,继承自nn.Layer。在初始化方法中,它定义了一个nn.ModuleList类型的attn_layers成员变量,一个可选的nn.ModuleList类型的conv_layers成员变量和一个可选的norm_layer成员变量。 # 在forward方法中,它接收一个输入张量x和一个可选参数attn_mask(一个注意力掩码张量)。 # 首先,它遍历self.attn_layers中的每个自注意力机制实例和可选的卷积层实例,并使用zip函数将它们一一配对。对于每个配对,它使用attn_layer处理x,并将结果与attn_mask一起传递给attn_layer。 # 然后,它使用conv_layer处理x,并将结果与attn一起添加到attns列表中。 # 接着,它将x和最后一个自注意力机制的注意力张量添加到attns列表中。 # 如果conv_layers为None,则跳过卷积层处理。否则,在遍历完所有自注意力机制后,它将x和最后一个自注意力机制的注意力张量添加到attns列表中。 # 最后,如果self.norm不为None,则使用self.norm处理x。最终输出是处理后的x和每个自注意力机制的注意力张量列表attns。 class Encoder(nn.Layer): def __init__(self, attn_layers, conv_layers=None, norm_layer=None): super(Encoder, self).__init__() self.attn_layers = nn.ModuleList(attn_layers) self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None self.norm = norm_layer def forward(self, x, attn_mask=None): # x [B, L, D] attns = [] if self.conv_layers is not None: for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers): x, attn = attn_layer(x, attn_mask=attn_mask) x = conv_layer(x) attns.append(attn) x, attn = self.attn_layers[-1](x, attn_mask=attn_mask) attns.append(attn) else: for attn_layer in self.attn_layers: x, attn = attn_layer(x, attn_mask=attn_mask) attns.append(attn) if self.norm is not None: x = self.norm(x) return x, attns # 这是一个PaddlePaddle的EncoderStack类,继承自nn.Layer。 # 在初始化方法中,它接受一个encoders列表和一个inp_lens列表作为参数。encoders列表包含了多个Encoder实例,inp_lens列表包含了每个Encoder实例对应的输入序列长度。 # 在forward方法中,它接收一个输入张量x和一个可选参数attn_mask(一个注意力掩码张量)。 # 首先,它遍历self.encoders中的每个Encoder实例,并使用zip函数将它们与self.inp_lens中的对应输入序列长度配对。 # 对于每个配对,它使用Encoder实例处理x[:, -inp_len:, :],并将结果添加到x_stack列表中。同时,它也将每个Encoder实例的注意力张量添加到attns列表中。 # 最后,它将x_stack列表中的所有张量沿着第2维度(即时间维度)连接起来,并返回处理后的x_stack和每个Encoder实例的注意力张量列表attns。 class EncoderStack(nn.Layer): def __init__(self, encoders, inp_lens): super(EncoderStack, self).__init__() self.encoders = nn.ModuleList(encoders) self.inp_lens = inp_lens def forward(self, x, attn_mask=None): # x [B, L, D] x_stack = []; attns = [] for i_len, encoder in zip(self.inp_lens, self.encoders): inp_len = x.shape[1]//(2**i_len) x_s, attn = encoder(x[:, -inp_len:, :]) x_stack.append(x_s); attns.append(attn) x_stack = paddle.cat(x_stack, -2) return x_stack, attns
# 这是一个PaddlePaddle的DecoderLayer类,继承自nn.Layer。 # 在初始化方法中,它定义了self_attention、cross_attention、d_model、d_ff(默认为4*d_model)、dropout(默认为0.1)和activation(默认为"relu")等参数。 # self_attention和cross_attention分别是自注意力机制和交叉注意力机制的实例。 # d_ff是FFN(前馈神经网络)的输入和输出通道数,默认为4*d_model。conv1和conv2是1D卷积层实例,用于将输入张量转换为d_model通道数。 # norm1、norm2和norm3是规范化层实例,用于对输入张量进行归一化。dropout是dropout层实例,用于防止过拟合。activation是激活函数实例,默认为ReLU激活函数,但也可以设置为GELU激活函数。 # 在forward方法中,它接收一个输入张量x、一个可选的注意力掩码attn_mask和多个可选参数,并返回输出张量x、self_attn_output、cross_attn_output和Norm后处理结果。 # 注意,这里有两个self_attn_output和cross_attn_output输出,分别表示自注意力机制和交叉注意力机制的输出。 # 这些输出还需要通过Norm层进行归一化处理,并使用dropout层进行dropout处理,最后通过激活函数进行激活。 class DecoderLayer(nn.Layer): def __init__(self, self_attention, cross_attention, d_model, d_ff=None, dropout=0.1, activation="relu"): super(DecoderLayer, self).__init__() d_ff = d_ff or 4*d_model self.self_attention = self_attention self.cross_attention = cross_attention self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1) self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout = nn.Dropout(dropout) self.activation = F.relu if activation == "relu" else F.gelu # 这是一个PaddlePaddle的DecoderLayer类的forward方法,用于执行Decoder的每一层计算。 # 该方法接受两个输入张量x和cross,以及可选参数x_mask和cross_mask,表示输入张量的注意力掩码。 # 首先,它使用self.self_attention计算自注意力机制的输出,并使用dropout、规范化层self.norm1和dropout对输出进行处理。 # 然后,它使用self.cross_attention计算交叉注意力机制的输出,并再次使用dropout、规范化层self.norm2和dropout对输出进行处理。 # 接下来,它使用1D卷积层self.conv1和self.conv2对输出进行处理,并使用dropout进行dropout处理。 # 最后,它将所有输出张量相加,并使用规范化层self.norm3对结果进行归一化处理。 # 需要注意的是,该方法返回的是经过所有处理后的最终输出张量。 def forward(self, x, cross, x_mask=None, cross_mask=None): x = x + self.dropout(self.self_attention( x, x, x, attn_mask=x_mask )[0]) x = self.norm1(x) x = x + self.dropout(self.cross_attention( x, cross, cross, attn_mask=cross_mask )[0]) y = x = self.norm2(x) y = self.dropout(self.activation(self.conv1(y.transpose(-1,1)))) y = self.dropout(self.conv2(y).transpose(-1,1)) return self.norm3(x+y) # 这是一个PaddlePaddle的Decoder类,继承自nn.Layer。 # 在初始化方法中,它接受一个layers参数,这是一个包含多个DecoderLayer实例的列表,以及一个可选的norm_layer参数,表示使用的规范化层类型。 # 在forward方法中,它遍历self.layers中的每个DecoderLayer实例,并传递输入张量x、交叉注意力机制的输出cross、注意力掩码x_mask和cross_mask。 # 最后,它使用规范化层self.norm对输出进行处理,并返回处理后的输出张量x。 class Decoder(nn.Layer): def __init__(self, layers, norm_layer=None): super(Decoder, self).__init__() self.layers = nn.ModuleList(layers) self.norm = norm_layer def forward(self, x, cross, x_mask=None, cross_mask=None): for layer in self.layers: x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask) if self.norm is not None: x = self.norm(x) return x
class Informer(nn.Layer): def __init__(self, enc_in, dec_in, c_out, seq_len, label_len, out_len, factor=5, d_model=512, n_heads=8, e_layers=3, d_layers=2, d_ff=512, dropout=0.0, attn='prob', embed='fixed', freq='h', activation='gelu', output_attention = False, distil=True, mix=True, device=paddle.CUDAPlace(0)): super(Informer, self).__init__() self.pred_len = out_len self.attn = attn self.output_attention = output_attention # Encoding self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout) self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout) # Attention Attn = ProbAttention if attn=='prob' else FullAttention # Encoder self.encoder = Encoder( [ EncoderLayer( AttentionLayer(Attn(False, factor, attention_dropout=dropout, output_attention=output_attention), d_model, n_heads, mix=False), d_model, d_ff, dropout=dropout, activation=activation ) for l in range(e_layers) ], [ ConvLayer( d_model ) for l in range(e_layers-1) ] if distil else None, norm_layer=paddle.nn.LayerNorm(d_model) ) # Decoder self.decoder = Decoder( [ DecoderLayer( AttentionLayer(Attn(True, factor, attention_dropout=dropout, output_attention=False), d_model, n_heads, mix=mix), AttentionLayer(FullAttention(False, factor, attention_dropout=dropout, output_attention=False), d_model, n_heads, mix=False), d_model, d_ff, dropout=dropout, activation=activation, ) for l in range(d_layers) ], norm_layer=paddle.nn.LayerNorm(d_model) ) # self.end_conv1 = nn.Conv1d(in_channels=label_len+out_len, out_channels=out_len, kernel_size=1, bias=True) # self.end_conv2 = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=1, bias=True) self.projection = nn.Linear(d_model, c_out, bias=True) def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None): enc_out = self.enc_embedding(x_enc, x_mark_enc) enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask) dec_out = self.dec_embedding(x_dec, x_mark_dec) dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask) dec_out = self.projection(dec_out) # dec_out = self.end_conv1(dec_out) # dec_out = self.end_conv2(dec_out.transpose(2,1)).transpose(1,2) if self.output_attention: return dec_out[:,-self.pred_len:,:], attns else: return dec_out[:,-self.pred_len:,:] # [B, L, D] class InformerStack(nn.Layer): def __init__(self, enc_in, dec_in, c_out, seq_len, label_len, out_len, factor=5, d_model=512, n_heads=8, e_layers=[3,2,1], d_layers=2, d_ff=512, dropout=0.0, attn='prob', embed='fixed', freq='h', activation='gelu', output_attention = False, distil=True, mix=True, device=paddle.CUDAPlace(0)): super(InformerStack, self).__init__() self.pred_len = out_len self.attn = attn self.output_attention = output_attention # Encoding self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout) self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout) # Attention Attn = ProbAttention if attn=='prob' else FullAttention # Encoder inp_lens = list(range(len(e_layers))) # [0,1,2,...] you can customize here encoders = [ Encoder( [ EncoderLayer( AttentionLayer(Attn(False, factor, attention_dropout=dropout, output_attention=output_attention), d_model, n_heads, mix=False), d_model, d_ff, dropout=dropout, activation=activation ) for l in range(el) ], [ ConvLayer( d_model ) for l in range(el-1) ] if distil else None, norm_layer=paddle.nn.LayerNorm(d_model) ) for el in e_layers] self.encoder = EncoderStack(encoders, inp_lens) # Decoder self.decoder = Decoder( [ DecoderLayer( AttentionLayer(Attn(True, factor, attention_dropout=dropout, output_attention=False), d_model, n_heads, mix=mix), AttentionLayer(FullAttention(False, factor, attention_dropout=dropout, output_attention=False), d_model, n_heads, mix=False), d_model, d_ff, dropout=dropout, activation=activation, ) for l in range(d_layers) ], norm_layer=paddle.nn.LayerNorm(d_model) ) # self.end_conv1 = nn.Conv1d(in_channels=label_len+out_len, out_channels=out_len, kernel_size=1, bias=True) # self.end_conv2 = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=1, bias=True) self.projection = nn.Linear(d_model, c_out, bias=True) def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None): enc_out = self.enc_embedding(x_enc, x_mark_enc) enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask) dec_out = self.dec_embedding(x_dec, x_mark_dec) dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask) dec_out = self.projection(dec_out) # dec_out = self.end_conv1(dec_out) # dec_out = self.end_conv2(dec_out.transpose(2,1)).transpose(1,2) if self.output_attention: return dec_out[:,-self.pred_len:,:], attns else: return dec_out[:,-self.pred_len:,:] # [B, L, D] e, dec_enc_mask=None): enc_out = self.enc_embedding(x_enc, x_mark_enc) enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask) dec_out = self.dec_embedding(x_dec, x_mark_dec) dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask) dec_out = self.projection(dec_out) # dec_out = self.end_conv1(dec_out) # dec_out = self.end_conv2(dec_out.transpose(2,1)).transpose(1,2) if self.output_attention: return dec_out[:,-self.pred_len:,:], attns else: return dec_out[:,-self.pred_len:,:] # [B, L, D]
本篇文章在模型训练和预测部分就不作赘述了,后续我会推出相关的应用实战哦!
此文章为搬运
原项目链接
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。