Informer:用于长序列时间序列预测的高效Transformer模型_informer 时间序列

        最近在研究时间序列分析的的过程看,看到一篇精彩的文章,名为:《Informer: Beyond Efficient Transformer for Long Sequence Time-Series Forecasting》,特此撰写一篇博客。


  1. ProbSparse自注意力机制:这种机制通过实现O(L log L)的时间复杂度和内存使用,有效地解决了传统Transformer模型在处理长序列数据时时间复杂度过高的问题。

  2. 自注意力蒸馏操作:该操作通过减少每一层的输入量来突出主导的注意力,有效地处理极长的输入序列。

  3. 生成式风格的解码器:它在预测长时间序列时只需要一步前向操作,而非逐步方式,显著提高了长序列预测的推理速度。



图: Informer模型概述。左图: 编码器接收大量长序列输入(绿色系列)。我们用提出的ProbSparse自注意代替规范自注意。蓝色梯形是自注意力蒸馏操作,提取主导注意力,大幅减小网络规模。层堆叠副本增加了鲁棒性。右图: 解码器接收长序列输入,将目标元素填充为零,测量特征图的加权注意力组成,并以生成方式立即预测输出元素(橙色系列)。



图: Informer编码器中的单个堆栈。(1)水平堆栈代表图(2)中的单个编码器副本之一。(2)本文给出的是接收整个输入序列的主堆栈。然后,第二个堆栈获取输入的一半切片,随后的堆栈重复。(3)红色层为点积矩阵,通过对每一层进行自注意蒸馏得到级联递减。(4)连接所有堆栈的特征映射作为编码器的输出。

        聚焦于注意力机制方面。ProbSparse自注意力机制,即ProbAttention,是论文《Informer: Beyond Efficient Transformer for Long Sequence Time-Series Forecasting》中提出的一种自注意力机制。这种机制的主要特点和细节如下:

  1. 主要机制:ProbSparse自注意力允许每个键(key)仅关注最主导的u个查询(query)。这是通过一个稀疏矩阵Q实现的,该矩阵与传统的查询矩阵q大小相同,但只包含根据稀疏性度量M(q, K)选出的Top-u查询。这种自注意力的计算公式是:A(Q, K, V) = Softmax(QK√/d)V,其中A代表注意力函数,Q、K、V分别代表查询、键和值​​。

  2. 采样因子:采样因子c控制了ProbSparse自注意力的信息带宽。在实践中,采样因子通常设置为5。这个因子决定了在每个查询-键查找中需要计算的点积对数量,进而影响模型性能​​。

  3. 时间复杂度和空间复杂度:ProbSparse自注意力机制实现了O(L log L)的时间复杂度和内存使用,这是一个显著改进,因为它比传统Transformer模型中的自注意力机制更加高效。这种效率的提高是通过在长尾分布下随机采样U = LK ln LQ点积对来计算M(qi, K),从而选择稀疏的Top-u作为Q来实现的​​。

  4. 改进的效率:ProbSparse自注意力机制通过减少必须计算的点积对数量,有效地降低了处理长序列时的计算负担。这种机制能够有效地替代传统的自注意力机制,并在长序列依赖对齐上实现了更高的效率和准确性​​。





  1. import torch
  2. import torch.nn as nn
  3. import torch.nn.functional as F
  4. import numpy as np
  5. from math import sqrt
  6. from utils.masking import TriangularCausalMask, ProbMask
  7. class FullAttention(nn.Module):
  8. def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
  9. super(FullAttention, self).__init__()
  10. self.scale = scale
  11. self.mask_flag = mask_flag
  12. self.output_attention = output_attention
  13. self.dropout = nn.Dropout(attention_dropout)
  14. def forward(self, queries, keys, values, attn_mask):
  15. B, L, H, E = queries.shape
  16. _, S, _, D = values.shape
  17. scale = self.scale or 1./sqrt(E)
  18. scores = torch.einsum("blhe,bshe->bhls", queries, keys)
  19. if self.mask_flag:
  20. if attn_mask is None:
  21. attn_mask = TriangularCausalMask(B, L, device=queries.device)
  22. scores.masked_fill_(attn_mask.mask, -np.inf)
  23. A = self.dropout(torch.softmax(scale * scores, dim=-1))
  24. V = torch.einsum("bhls,bshd->blhd", A, values)
  25. if self.output_attention:
  26. return (V.contiguous(), A)
  27. else:
  28. return (V.contiguous(), None)
  29. class ProbAttention(nn.Module):
  30. def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
  31. super(ProbAttention, self).__init__()
  32. self.factor = factor
  33. self.scale = scale
  34. self.mask_flag = mask_flag
  35. self.output_attention = output_attention
  36. self.dropout = nn.Dropout(attention_dropout)
  37. def _prob_QK(self, Q, K, sample_k, n_top): # n_top: c*ln(L_q)
  38. # Q [B, H, L, D]
  39. B, H, L_K, E = K.shape
  40. _, _, L_Q, _ = Q.shape
  41. # calculate the sampled Q_K
  42. K_expand = K.unsqueeze(-3).expand(B, H, L_Q, L_K, E)
  43. index_sample = torch.randint(L_K, (L_Q, sample_k)) # real U = U_part(factor*ln(L_k))*L_q
  44. K_sample = K_expand[:, :, torch.arange(L_Q).unsqueeze(1), index_sample, :]
  45. Q_K_sample = torch.matmul(Q.unsqueeze(-2), K_sample.transpose(-2, -1)).squeeze(-2)
  46. # find the Top_k query with sparisty measurement
  47. M = Q_K_sample.max(-1)[0] - torch.div(Q_K_sample.sum(-1), L_K) # 96个Q中每一个选跟其他K关系最大的值,再计算与均匀分布的差异
  48. M_top = M.topk(n_top, sorted=False)[1] # 对96个Q的评分中选出25个,返回值1表示要得到索引
  49. # use the reduced Q to calculate Q_K
  50. Q_reduce = Q[torch.arange(B)[:, None, None],
  51. torch.arange(H)[None, :, None],
  52. M_top, :] # factor*ln(L_q) 取出Q的特征
  53. Q_K = torch.matmul(Q_reduce, K.transpose(-2, -1)) # factor*ln(L_q)*L_k 25个Q和全部K之间的关系
  54. return Q_K, M_top
  55. def _get_initial_context(self, V, L_Q):
  56. B, H, L_V, D = V.shape
  57. if not self.mask_flag:
  58. # V_sum = V.sum(dim=-2)
  59. V_sum = V.mean(dim=-2)
  60. contex = V_sum.unsqueeze(-2).expand(B, H, L_Q, V_sum.shape[-1]).clone() # 先把96个V都用均值来替换
  61. else: # use mask
  62. assert(L_Q == L_V) # requires that L_Q == L_V, i.e. for self-attention only
  63. contex = V.cumsum(dim=-2)
  64. return contex
  65. def _update_context(self, context_in, V, scores, index, L_Q, attn_mask):
  66. B, H, L_V, D = V.shape
  67. if self.mask_flag:
  68. attn_mask = ProbMask(B, H, L_Q, index, scores, device=V.device)
  69. scores.masked_fill_(attn_mask.mask, -np.inf)
  70. attn = torch.softmax(scores, dim=-1) # nn.Softmax(dim=-1)(scores)
  71. context_in[torch.arange(B)[:, None, None],
  72. torch.arange(H)[None, :, None],
  73. index, :] = torch.matmul(attn, V).type_as(context_in) # 对25个有Q的更新V,其余的没变还是均值
  74. if self.output_attention:
  75. attns = (torch.ones([B, H, L_V, L_V])/L_V).type_as(attn).to(attn.device)
  76. attns[torch.arange(B)[:, None, None], torch.arange(H)[None, :, None], index, :] = attn
  77. return (context_in, attns)
  78. else:
  79. return (context_in, None)
  80. def forward(self, queries, keys, values, attn_mask):
  81. B, L_Q, H, D = queries.shape
  82. _, L_K, _, _ = keys.shape
  83. queries = queries.transpose(2,1)
  84. keys = keys.transpose(2,1)
  85. values = values.transpose(2,1)
  86. U_part = self.factor * np.ceil(np.log(L_K)).astype('int').item() # c*ln(L_k)
  87. u = self.factor * np.ceil(np.log(L_Q)).astype('int').item() # c*ln(L_q)
  88. U_part = U_part if U_part<L_K else L_K
  89. u = u if u<L_Q else L_Q
  90. scores_top, index = self._prob_QK(queries, keys, sample_k=U_part, n_top=u)
  91. # add scale factor
  92. scale = self.scale or 1./sqrt(D)
  93. if scale is not None:
  94. scores_top = scores_top * scale
  95. # get the context
  96. context = self._get_initial_context(values, L_Q)
  97. # update the context with selected top_k queries
  98. context, attn = self._update_context(context, values, scores_top, index, L_Q, attn_mask)
  99. return context.transpose(2,1).contiguous(), attn
  100. class AttentionLayer(nn.Module):
  101. def __init__(self, attention, d_model, n_heads,
  102. d_keys=None, d_values=None, mix=False):
  103. super(AttentionLayer, self).__init__()
  104. d_keys = d_keys or (d_model//n_heads)
  105. d_values = d_values or (d_model//n_heads)
  106. self.inner_attention = attention
  107. self.query_projection = nn.Linear(d_model, d_keys * n_heads)
  108. self.key_projection = nn.Linear(d_model, d_keys * n_heads)
  109. self.value_projection = nn.Linear(d_model, d_values * n_heads)
  110. self.out_projection = nn.Linear(d_values * n_heads, d_model)
  111. self.n_heads = n_heads
  112. self.mix = mix
  113. def forward(self, queries, keys, values, attn_mask):
  114. B, L, _ = queries.shape
  115. _, S, _ = keys.shape
  116. H = self.n_heads
  117. queries = self.query_projection(queries).view(B, L, H, -1)
  118. keys = self.key_projection(keys).view(B, S, H, -1)
  119. values = self.value_projection(values).view(B, S, H, -1)
  120. out, attn = self.inner_attention(
  121. queries,
  122. keys,
  123. values,
  124. attn_mask
  125. )
  126. if self.mix:
  127. out = out.transpose(2,1).contiguous()
  128. out = out.view(B, L, -1)
  129. return self.out_projection(out), attn


ProbAttention 类

        ProbAttention 类继承自 nn.Module,是实现ProbSparse自注意力机制的核心。它包括以下几个重要的方法:

  1. __init__ 方法:初始化模块,设置参数,如factor(用于控制采样的密度)、scale(用于缩放注意力分数)和attention_dropout(注意力层的dropout比率)。

  2. _prob_QK 方法:这个方法是ProbSparse自注意力的核心。它首先对键(K)进行采样,然后计算查询(Q)和采样后的键(K)之间的点积,以获取注意力分数。这个过程通过选择关键的点积对而非所有可能的组合来减少计算量。

  3. _get_initial_context 方法:计算初始的上下文表示。这个方法根据是否使用掩码(mask_flag)来处理值(V)的累积和或平均值。

  4. _update_context 方法:使用选择的Top-k注意力分数更新上下文表示。这个方法考虑了概率掩码,并根据注意力分数更新上下文。

  5. forward 方法:定义了ProbAttention的前向传播逻辑。它包括将查询、键和值转换为适合的形状,计算并应用ProbSparse自注意力,以及更新上下文表示。


  • ProbSparse自注意力的关键在于它如何有效减少计算量。通过只计算和选择关键的点积对,这种机制降低了在处理长序列时的计算复杂度。

  • 这种方法特别适合长序列预测任务,因为它减轻了传统注意力机制在处理长序列时的计算负担。

  • 在移植这部分代码到其他框架时,要确保对应的数据结构和操作符在目标框架中是可用的。




ProbAttention 类的定义

  1. class ProbAttention(nn.Module):
  2. def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
  3. super(ProbAttention, self).__init__()
  4. self.factor = factor
  5. self.scale = scale
  6. self.mask_flag = mask_flag
  7. self.output_attention = output_attention
  8. self.dropout = nn.Dropout(attention_dropout)
  9. def _prob_QK(self, Q, K, sample_k, n_top):
  10. # Implementation details...
  11. def _get_initial_context(self, V, L_Q):
  12. # Implementation details...
  13. def _update_context(self, context_in, V, scores, index, L_Q, attn_mask):
  14. # Implementation details...
  15. def forward(self, queries, keys, values, attn_mask):
  16. # Implementation details...






  • 确保您理解了每个方法的工作原理及其在ProbSparse自注意力机制中的作用。
  • 检查目标框架是否支持所有必要的操作,例如张量操作和矩阵乘法。
  • 考虑到不同框架可能有不同的API和张量操作习惯,因此在移植时可能需要对代码进行适当的调整。



  1. import torch
  2. import torch.nn as nn
  3. import torch.nn.functional as F
  4. class ConvLayer(nn.Module):
  5. def __init__(self, c_in):
  6. super(ConvLayer, self).__init__()
  7. padding = 1 if torch.__version__>='1.5.0' else 2
  8. self.downConv = nn.Conv1d(in_channels=c_in,
  9. out_channels=c_in,
  10. kernel_size=3,
  11. padding=padding,
  12. padding_mode='circular')
  13. self.norm = nn.BatchNorm1d(c_in)
  14. self.activation = nn.ELU()
  15. self.maxPool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
  16. def forward(self, x):
  17. x = self.downConv(x.permute(0, 2, 1))
  18. x = self.norm(x)
  19. x = self.activation(x)
  20. x = self.maxPool(x)
  21. x = x.transpose(1,2)
  22. return x
  23. class EncoderLayer(nn.Module):
  24. def __init__(self, attention, d_model, d_ff=None, dropout=0.1, activation="relu"):
  25. super(EncoderLayer, self).__init__()
  26. d_ff = d_ff or 4*d_model
  27. self.attention = attention
  28. self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
  29. self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
  30. self.norm1 = nn.LayerNorm(d_model)
  31. self.norm2 = nn.LayerNorm(d_model)
  32. self.dropout = nn.Dropout(dropout)
  33. self.activation = F.relu if activation == "relu" else F.gelu
  34. def forward(self, x, attn_mask=None):
  35. # x [B, L, D]
  36. # x = x + self.dropout(self.attention(
  37. # x, x, x,
  38. # attn_mask = attn_mask
  39. # ))
  40. new_x, attn = self.attention(
  41. x, x, x,
  42. attn_mask = attn_mask
  43. )
  44. x = x + self.dropout(new_x) # 残差连接
  45. y = x = self.norm1(x)
  46. y = self.dropout(self.activation(self.conv1(y.transpose(-1,1))))
  47. y = self.dropout(self.conv2(y).transpose(-1,1))
  48. return self.norm2(x+y), attn
  49. class Encoder(nn.Module):
  50. def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
  51. super(Encoder, self).__init__()
  52. self.attn_layers = nn.ModuleList(attn_layers)
  53. self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
  54. self.norm = norm_layer
  55. def forward(self, x, attn_mask=None):
  56. # x [B, L, D]
  57. attns = []
  58. if self.conv_layers is not None:
  59. for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
  60. x, attn = attn_layer(x, attn_mask=attn_mask)
  61. x = conv_layer(x) # pooling后再减半,还是为了速度考虑
  62. attns.append(attn)
  63. x, attn = self.attn_layers[-1](x, attn_mask=attn_mask)
  64. attns.append(attn)
  65. else:
  66. for attn_layer in self.attn_layers:
  67. x, attn = attn_layer(x, attn_mask=attn_mask)
  68. attns.append(attn)
  69. if self.norm is not None:
  70. x = self.norm(x)
  71. return x, attns
  72. class EncoderStack(nn.Module):
  73. def __init__(self, encoders, inp_lens):
  74. super(EncoderStack, self).__init__()
  75. self.encoders = nn.ModuleList(encoders)
  76. self.inp_lens = inp_lens
  77. def forward(self, x, attn_mask=None):
  78. # x [B, L, D]
  79. x_stack = []; attns = []
  80. for i_len, encoder in zip(self.inp_lens, self.encoders):
  81. inp_len = x.shape[1]//(2**i_len)
  82. x_s, attn = encoder(x[:, -inp_len:, :])
  83. x_stack.append(x_s); attns.append(attn)
  84. x_stack = torch.cat(x_stack, -2)
  85. return x_stack, attns



  1. EncoderLayer 类

    • EncoderLayer 类实现了编码器层的核心功能,其中包括一个自注意力机制(通过attention参数传入),两个卷积层(conv1conv2),以及层归一化(norm1norm2)。
    • forward方法中,首先通过自注意力机制处理输入x,然后进行残差连接、归一化和卷积操作。
  2. ProbAttention 的使用

    • 在这个代码段中,ProbAttention被作为EncoderLayer的一个参数(attention),这意味着ProbAttention的具体实现应该在其他地方定义,并在创建EncoderLayer实例时传入。

提取 ProbAttention 用于移植



  1. ProbAttention 类的完整实现:包括之前提供的ProbAttention类的所有方法和内部逻辑。

  2. 与 EncoderLayer 的集成:确保在目标框架中创建的EncoderLayer或等效类能够接收并正确使用ProbAttention实例。




  1. import torch
  2. import torch.nn as nn
  3. import torch.nn.functional as F
  4. class DecoderLayer(nn.Module):
  5. def __init__(self, self_attention, cross_attention, d_model, d_ff=None,
  6. dropout=0.1, activation="relu"):
  7. super(DecoderLayer, self).__init__()
  8. d_ff = d_ff or 4*d_model
  9. self.self_attention = self_attention
  10. self.cross_attention = cross_attention
  11. self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
  12. self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
  13. self.norm1 = nn.LayerNorm(d_model)
  14. self.norm2 = nn.LayerNorm(d_model)
  15. self.norm3 = nn.LayerNorm(d_model)
  16. self.dropout = nn.Dropout(dropout)
  17. self.activation = F.relu if activation == "relu" else F.gelu
  18. def forward(self, x, cross, x_mask=None, cross_mask=None):
  19. x = x + self.dropout(self.self_attention(
  20. x, x, x,
  21. attn_mask=x_mask
  22. )[0])
  23. x = self.norm1(x)
  24. x = x + self.dropout(self.cross_attention(
  25. x, cross, cross,
  26. attn_mask=cross_mask
  27. )[0])
  28. y = x = self.norm2(x)
  29. y = self.dropout(self.activation(self.conv1(y.transpose(-1,1))))
  30. y = self.dropout(self.conv2(y).transpose(-1,1))
  31. return self.norm3(x+y)
  32. class Decoder(nn.Module):
  33. def __init__(self, layers, norm_layer=None):
  34. super(Decoder, self).__init__()
  35. self.layers = nn.ModuleList(layers)
  36. self.norm = norm_layer
  37. def forward(self, x, cross, x_mask=None, cross_mask=None):
  38. for layer in self.layers:
  39. x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)
  40. if self.norm is not None:
  41. x = self.norm(x)
  42. return x



  1. DecoderLayer 类

    • DecoderLayer 类包含两种注意力机制:self_attentioncross_attentionself_attention是在解码器内部使用的,而cross_attention用于在解码器和编码器之间交换信息。
    • 类还包括两个卷积层(conv1conv2)和三个层归一化(norm1norm2norm3)。
    • forward方法中,先是应用自注意力机制,然后是交叉注意力机制,最后是卷积层和归一化操作。
  2. ProbAttention 的使用

    • 在提供的代码中,self_attentioncross_attention可以是任何注意力机制的实例,包括ProbSparse自注意力机制(ProbAttention)。但是,这取决于在创建DecoderLayer实例时传入的具体注意力实例。

提取 ProbAttention 用于移植


  1. 确保 ProbAttention 类的可用性:从之前的实现中获取ProbAttention类的代码,确保它在您的环境中是可用的。

  2. 在 DecoderLayer 中使用 ProbAttention:在创建DecoderLayer实例时,将ProbAttention实例作为self_attention和/或cross_attention的参数传入。


  • 在移植过程中,确保目标框架支持所有必要的操作,如张量操作、矩阵乘法、卷积操作等。
  • 考虑到不同框架可能有不同的API和张量操作习惯,您可能需要对代码进行一些调整以适应新框架。
  • 确保在新框架中ProbAttention的功能和原始实现保持一致。



  1. import torch
  2. import torch.nn as nn
  3. import torch.nn.functional as F
  4. from utils.masking import TriangularCausalMask, ProbMask
  5. from models.encoder import Encoder, EncoderLayer, ConvLayer, EncoderStack
  6. from models.decoder import Decoder, DecoderLayer
  7. from models.attn import FullAttention, ProbAttention, AttentionLayer
  8. from models.embed import DataEmbedding
  9. class Informer(nn.Module):
  10. def __init__(self, enc_in, dec_in, c_out, seq_len, label_len, out_len,
  11. factor=5, d_model=512, n_heads=8, e_layers=3, d_layers=2, d_ff=512,
  12. dropout=0.0, attn='prob', embed='fixed', freq='h', activation='gelu',
  13. output_attention = False, distil=True, mix=True,
  14. device=torch.device('cuda:0')):
  15. super(Informer, self).__init__()
  16. self.pred_len = out_len
  17. self.attn = attn
  18. self.output_attention = output_attention
  19. # Encoding
  20. self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout)
  21. self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout)
  22. # Attention
  23. Attn = ProbAttention if attn=='prob' else FullAttention
  24. # Encoder
  25. self.encoder = Encoder(
  26. [
  27. EncoderLayer(
  28. AttentionLayer(Attn(False, factor, attention_dropout=dropout, output_attention=output_attention),
  29. d_model, n_heads, mix=False),
  30. d_model,
  31. d_ff,
  32. dropout=dropout,
  33. activation=activation
  34. ) for l in range(e_layers)
  35. ],
  36. [
  37. ConvLayer(
  38. d_model
  39. ) for l in range(e_layers-1)
  40. ] if distil else None,
  41. norm_layer=torch.nn.LayerNorm(d_model)
  42. )
  43. # Decoder
  44. self.decoder = Decoder(
  45. [
  46. DecoderLayer(
  47. AttentionLayer(Attn(True, factor, attention_dropout=dropout, output_attention=False),
  48. d_model, n_heads, mix=mix),
  49. AttentionLayer(FullAttention(False, factor, attention_dropout=dropout, output_attention=False),
  50. d_model, n_heads, mix=False),
  51. d_model,
  52. d_ff,
  53. dropout=dropout,
  54. activation=activation,
  55. )
  56. for l in range(d_layers)
  57. ],
  58. norm_layer=torch.nn.LayerNorm(d_model)
  59. )
  60. # self.end_conv1 = nn.Conv1d(in_channels=label_len+out_len, out_channels=out_len, kernel_size=1, bias=True)
  61. # self.end_conv2 = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=1, bias=True)
  62. self.projection = nn.Linear(d_model, c_out, bias=True)
  63. def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
  64. enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
  65. enc_out = self.enc_embedding(x_enc, x_mark_enc)
  66. enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
  67. dec_out = self.dec_embedding(x_dec, x_mark_dec)
  68. dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)
  69. dec_out = self.projection(dec_out)
  70. # dec_out = self.end_conv1(dec_out)
  71. # dec_out = self.end_conv2(dec_out.transpose(2,1)).transpose(1,2)
  72. if self.output_attention:
  73. return dec_out[:,-self.pred_len:,:], attns
  74. else:
  75. return dec_out[:,-self.pred_len:,:] # [B, L, D]
  76. class InformerStack(nn.Module):
  77. def __init__(self, enc_in, dec_in, c_out, seq_len, label_len, out_len,
  78. factor=5, d_model=512, n_heads=8, e_layers=[3,2,1], d_layers=2, d_ff=512,
  79. dropout=0.0, attn='prob', embed='fixed', freq='h', activation='gelu',
  80. output_attention = False, distil=True, mix=True,
  81. device=torch.device('cuda:0')):
  82. super(InformerStack, self).__init__()
  83. self.pred_len = out_len
  84. self.attn = attn
  85. self.output_attention = output_attention
  86. # Encoding
  87. self.enc_embedding = DataEmbedding(enc_in, d_model, embed, freq, dropout)
  88. self.dec_embedding = DataEmbedding(dec_in, d_model, embed, freq, dropout)
  89. # Attention
  90. Attn = ProbAttention if attn=='prob' else FullAttention
  91. # Encoder
  92. inp_lens = list(range(len(e_layers))) # [0,1,2,...] you can customize here
  93. encoders = [
  94. Encoder(
  95. [
  96. EncoderLayer(
  97. AttentionLayer(Attn(False, factor, attention_dropout=dropout, output_attention=output_attention),
  98. d_model, n_heads, mix=False),
  99. d_model,
  100. d_ff,
  101. dropout=dropout,
  102. activation=activation
  103. ) for l in range(el)
  104. ],
  105. [
  106. ConvLayer(
  107. d_model
  108. ) for l in range(el-1)
  109. ] if distil else None,
  110. norm_layer=torch.nn.LayerNorm(d_model)
  111. ) for el in e_layers]
  112. self.encoder = EncoderStack(encoders, inp_lens)
  113. # Decoder
  114. self.decoder = Decoder(
  115. [
  116. DecoderLayer(
  117. AttentionLayer(Attn(True, factor, attention_dropout=dropout, output_attention=False),
  118. d_model, n_heads, mix=mix),
  119. AttentionLayer(FullAttention(False, factor, attention_dropout=dropout, output_attention=False),
  120. d_model, n_heads, mix=False),
  121. d_model,
  122. d_ff,
  123. dropout=dropout,
  124. activation=activation,
  125. )
  126. for l in range(d_layers)
  127. ],
  128. norm_layer=torch.nn.LayerNorm(d_model)
  129. )
  130. # self.end_conv1 = nn.Conv1d(in_channels=label_len+out_len, out_channels=out_len, kernel_size=1, bias=True)
  131. # self.end_conv2 = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=1, bias=True)
  132. self.projection = nn.Linear(d_model, c_out, bias=True)
  133. def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
  134. enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
  135. enc_out = self.enc_embedding(x_enc, x_mark_enc)
  136. enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
  137. dec_out = self.dec_embedding(x_dec, x_mark_dec)
  138. dec_out = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask)
  139. dec_out = self.projection(dec_out)
  140. # dec_out = self.end_conv1(dec_out)
  141. # dec_out = self.end_conv2(dec_out.transpose(2,1)).transpose(1,2)
  142. if self.output_attention:
  143. return dec_out[:,-self.pred_len:,:], attns
  144. else:
  145. return dec_out[:,-self.pred_len:,:] # [B, L, D]


ProbAttention 在 Informer 架构中的使用

  1. 初始化函数(__init__:在InformerInformerStack类中,根据attn参数的值('prob'表示使用ProbAttention,否则使用FullAttention)来初始化注意力机制。

  2. 编码器和解码器的构建

    • 编码器(Encoder)和解码器(Decoder)使用EncoderLayerDecoderLayer,其中包含AttentionLayer
    • AttentionLayer使用Attn作为注意力机制,这里Attn根据上述初始化函数中的选择是ProbAttentionFullAttention

ProbAttention 的具体实现


提取 ProbAttention 用于移植

  1. 获取 ProbAttention 类的实现:需要从models.attn中提取ProbAttention类的实现代码。

  2. 理解 ProbAttention 的工作机制:了解其如何在输入序列上进行稀疏采样,以及如何计算稀疏自注意力权重。

  3. 确保与编码器和解码器的兼容性:在将ProbAttention移植到其他框架时,确保它能与编码器和解码器中的其他组件(如EncoderLayerDecoderLayerAttentionLayer)正确集成。




        最后给出 embed.py 文件中的源码:

  1. import torch
  2. import torch.nn as nn
  3. import torch.nn.functional as F
  4. import math
  5. class PositionalEmbedding(nn.Module):
  6. def __init__(self, d_model, max_len=5000):
  7. super(PositionalEmbedding, self).__init__()
  8. # Compute the positional encodings once in log space.
  9. pe = torch.zeros(max_len, d_model).float()
  10. pe.require_grad = False
  11. position = torch.arange(0, max_len).float().unsqueeze(1)
  12. div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
  13. pe[:, 0::2] = torch.sin(position * div_term)
  14. pe[:, 1::2] = torch.cos(position * div_term)
  15. pe = pe.unsqueeze(0)
  16. self.register_buffer('pe', pe)
  17. def forward(self, x):
  18. return self.pe[:, :x.size(1)]
  19. class TokenEmbedding(nn.Module):
  20. def __init__(self, c_in, d_model):
  21. super(TokenEmbedding, self).__init__()
  22. padding = 1 if torch.__version__>='1.5.0' else 2
  23. self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model,
  24. kernel_size=3, padding=padding, padding_mode='circular')
  25. for m in self.modules():
  26. if isinstance(m, nn.Conv1d):
  27. nn.init.kaiming_normal_(m.weight,mode='fan_in',nonlinearity='leaky_relu')
  28. def forward(self, x):
  29. x = self.tokenConv(x.permute(0, 2, 1)).transpose(1,2)
  30. return x
  31. class FixedEmbedding(nn.Module):
  32. def __init__(self, c_in, d_model):
  33. super(FixedEmbedding, self).__init__()
  34. w = torch.zeros(c_in, d_model).float()
  35. w.require_grad = False
  36. position = torch.arange(0, c_in).float().unsqueeze(1)
  37. div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp()
  38. w[:, 0::2] = torch.sin(position * div_term)
  39. w[:, 1::2] = torch.cos(position * div_term)
  40. self.emb = nn.Embedding(c_in, d_model)
  41. self.emb.weight = nn.Parameter(w, requires_grad=False)
  42. def forward(self, x):
  43. return self.emb(x).detach()
  44. class TemporalEmbedding(nn.Module):
  45. def __init__(self, d_model, embed_type='fixed', freq='h'):
  46. super(TemporalEmbedding, self).__init__()
  47. minute_size = 4; hour_size = 24
  48. weekday_size = 7; day_size = 32; month_size = 13
  49. Embed = FixedEmbedding if embed_type=='fixed' else nn.Embedding
  50. if freq=='t':
  51. self.minute_embed = Embed(minute_size, d_model)
  52. self.hour_embed = Embed(hour_size, d_model)
  53. self.weekday_embed = Embed(weekday_size, d_model)
  54. self.day_embed = Embed(day_size, d_model)
  55. self.month_embed = Embed(month_size, d_model)
  56. def forward(self, x):
  57. x = x.long()
  58. minute_x = self.minute_embed(x[:,:,4]) if hasattr(self, 'minute_embed') else 0.
  59. hour_x = self.hour_embed(x[:,:,3])
  60. weekday_x = self.weekday_embed(x[:,:,2])
  61. day_x = self.day_embed(x[:,:,1])
  62. month_x = self.month_embed(x[:,:,0])
  63. return hour_x + weekday_x + day_x + month_x + minute_x
  64. class TimeFeatureEmbedding(nn.Module):
  65. def __init__(self, d_model, embed_type='timeF', freq='h'):
  66. super(TimeFeatureEmbedding, self).__init__()
  67. freq_map = {'h':4, 't':5, 's':6, 'm':1, 'a':1, 'w':2, 'd':3, 'b':3}
  68. d_inp = freq_map[freq]
  69. self.embed = nn.Linear(d_inp, d_model)
  70. def forward(self, x):
  71. return self.embed(x)
  72. class DataEmbedding(nn.Module):
  73. def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1):
  74. super(DataEmbedding, self).__init__()
  75. self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model)
  76. self.position_embedding = PositionalEmbedding(d_model=d_model)
  77. self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type, freq=freq) if embed_type!='timeF' else TimeFeatureEmbedding(d_model=d_model, embed_type=embed_type, freq=freq)
  78. self.dropout = nn.Dropout(p=dropout)
  79. def forward(self, x, x_mark):
  80. x = self.value_embedding(x) + self.position_embedding(x) + self.temporal_embedding(x_mark)
  81. return self.dropout(x)



  1. PositionalEmbedding

    • 用于生成位置嵌入。利用正弦和余弦函数的变化生成每个位置的唯一表示。
  2. TokenEmbedding

    • 将输入数据的每个特征转换为高维表示。使用一维卷积网络(Conv1d)进行特征提取。
  3. FixedEmbedding 和 TemporalEmbedding

    • 生成时间相关的嵌入。FixedEmbedding 用于生成固定的时间嵌入,TemporalEmbedding 根据输入的时间特征生成动态的时间嵌入。
  4. TimeFeatureEmbedding

    • 用于处理不同时间频率(如小时、分钟等)的时间特征。
  5. DataEmbedding

    • 综合TokenEmbeddingPositionalEmbeddingTemporalEmbedding/TimeFeatureEmbedding,将它们的输出相加以生成最终的嵌入表示。

提取 ProbSparse 自注意力机制相关代码

在这部分代码中,没有直接涉及到ProbSparse自注意力机制(ProbAttention)。这些代码主要用于数据的嵌入处理,而不是注意力机制的实现。ProbAttention 通常在模型的编码器(Encoder)和解码器(Decoder)部分使用,特别是在构建注意力层(AttentionLayer)时。



