一、介绍
1.1 背景
2017年,Google的一篇 Attention Is All You Need 为我们带来了Transformer,其在NLP领域的重大成功展示了它对时序数据的强大建模能力,自然有人想要把Transformer应用到时序数据预测上。在Transformer的基础上构建时序预测能力可以突破以往的诸多限制,最明显的一个增益点是,Transformer for TS可以基于Multi-head Attention结构具备同时建模长期和短期时序特征的能力。
本文将要介绍的一个充分利用了Transformer的优势,并在Transformer的基础上改进了Attention的计算方式以适应时序数据,同时提出了一种解决Transformer拓展性差问题的算法: ConvTrans。论文来源为:Shiyang Li, et al. NIPS 2019. Enhancing the Locality and Breaking the Memory Bottleneck of Transformer on Time Series Forecasting。并且给出基于PyTorch的具体实现。
1.2 发展历史
一般来说,谈及DL领域时序预测,首先大家会想到RNN类的模型,但RNN在网络加深时存在梯度消失和梯度爆炸问题。即使是后续的LSTM,在捕捉长期依赖上依然力不从心。再后面有了Amazon提出的DeepAR,是一种针对大量相关时间序列统一建模的预测算法,该算法使用递归神经网络 (RNN) 结合自回归(AR) 来预测标量时间序列,在大量时间序列上训练自回归递归网络模型,并通过预测目标在序列每个时间步上取值的概率分布来完成预测任务。
本文对上述三种历史算法不做深入分析,概念有不清楚的童鞋自己搜哈。
1.3 ConvTrans
直接说ConvTrans, 其实它与DeepAR有很多相似的地方,比如它也是一个自回归的概率预测模型,对于下一步预测采用分位数 �10 (分位数就是以概率将一批数据进行分割,比如 �10=� 代表一批数据中小于a的数占总数的10%)、 �50 等;再比如ConvTrans也支持协变量预测,可以接受输入比如气温、事件、个体标识等等其他相关变量来辅助预测。
不同的是ConvTrans具备Transformer架构独有的优势,大致为以下四点:
- 支持并行,训练得更快。基于RNN的模型中每一个隐状态都依赖于它前一步的隐状态,因此必须从前向后必须逐个计算,每一次都只能前进一步。而Transformer没有这样的约束,输入的序列被并行处理,由此带来更快的训练速度。
- 更强的长期依赖建模能力,在长序列上效果更好。在前面提到过,基于RNN的方法面对长序列时无法完全消除梯度消失和梯度爆炸的问题,而Transformer架构则可以解决这个问题
- Transformer可以同时建模长期依赖和短期依赖。Multi-head Attention中不同的head可以关注不同的模式。
- Transformer的AttentionScore可以提供一定的可解释性。通过可视化AttentionScore可以看到当前预测对历史值注意力的分布。
当然Transformer for TS架构也有相应的缺点:
- 是基于序列的编解码结构(seq2seq),编码器和解码器均采用基于自注意力机制的网络,所以计算空间复杂度大,需要处理序列的编解码。
- 原始Transformer的自注意力计算方法对局部信息不敏感,使得模型易受异常点影响,带来了潜在的优化问题。
而2019NIPS的论文[1]针对这些缺点做了相应的2点改进:
- Convolutional Self-Attention :针对时序数据预测任务的特点,增强对局部上下文信息的关注,使预测更精准 。
- LogSparse :解决了Attention计算空间复杂度太高的问题,使模型能处理更长的时间序列数据。
后面会详细展开说明。
二、模型结构
2.1 Convolutional Self-Attention
原始Transformer中的Self-Attention结构如下,细节也可以参考之前的文章
而论文中设计的Convolutional Self-Attention更适合时序数据, 因为它能够增强模型对时间序列中局部上下文信息的建模能力,从而降低异常点对预测结果的影响程度,提高预测准确性。这也是ConvTrans(Convolution Transformer)的名称由来。
Convolutional Self Attention
Self-Attention中的计算 Q、K、V 的过程可能导致数据的关注点出现异常,如上图中(a)所示,由于之前的注意力得分仅仅是单时间点之间关联的体现,(a)中中间的红点只关注到与它值相近的另一单时间红点,而没有考虑到自身上下文(即变化趋势)。即希望增强局部上下文的建模能力,得到图(c)中的效果。
作者们提出的改进方法是在计算 Query 和 Key 时采用大小大于1(等于1就是原始Transformer)的卷积核来进行卷积操作,如图中(d)所示,从而实现使注意力关注局部上下文,使得更相关的特征能够得到匹配。
观察下图,对比应用不同大小卷积核的模型实现,Convolutional Self-Attention能够更快地拟合,且取得更低的训练损失。
图中k代表卷积核大小,下角标1d代表预测未来一天
并且文中还给出了基于真实世界数据的具体结果对比,来说明k对模型预测准确率的影响:
注:这里以及之后使用的评价指标都是分位数损失 �0.5/�0.9 ,值越低预测越准确。详细定义请参考论文,代码实现参考章节3。
数据集来源:
太阳能: 137条太阳能产出记录,每1h记录一次。 https://www.nrel.gov/grid/solar-power-data.html
电力:来自370个用户的电力消耗数据,原数据间隔15min,简写e-f表示。e-c代表在原数据上每4个点聚合一次,即数据变为每1h的耗电量。 https://archive.ics.uci.edu/ml/machine-learning-databases/00321/LD2011_2014.txt.zip
由于电力数据集相对简单且协变量提供了丰富的信息,改变k值对模型的提升并不明显。但在更加复杂的交通数据集中,更大的k较明显地提升了模型的预测准确度,进一步验证了增强局部信息的必要性。目前k值的设置需要在实践中权衡。
2.2 LogSparse Transformer
针对Transformer的存储瓶颈问题,文中引入了LogSparse机制,那么具体是个啥呢?
我们先来看一下原始Transformer在交通数据集上训练学习得到的注意力得分分布情况:
Transformer在交通数据集上训练学习得到的注意力得分分布
可以看到该模型共10层,图中蓝色、青色、红色的线分别是第2,6,10层的注意力得分,灰色的线为原始数据。
我们注意到:不同层对不同频率信息的关注度不同
- 第2层(蓝色)倾向于学习每一天的模式
- 第6层(青色)则更关注周末的影响
- 而第10层(红色)对最近的时刻(邻近预测点)关注较高。
论文作者们认为引入某种程度的稀疏性,不会显著影响性能,反而为模型带来了处理具备细粒度和强长期依赖的长时间序列的能力。为了使得最终每个点都能接触到它的所有历史值的信息,所以便提出了LogSparse的设计,通过堆叠多个自注意力层来实现这个目的,如下图所示:
设 ��� 为单元 � 在第k 至 k+1 层计算时要访问的单元的索引的集合。在标准的Transformer中, ���= { �;�≤� }, 这表示每一个单元都要访问所有的历史单元以及它自己(如图a所示),那么这样空间复杂度为 �(�2) ,L是序列长度。
如果采用堆叠多层的方式, ���= { �−2���2�,�−2���2�−1,...,�−20,� }
如图b所示,每一层的空间复杂度就降低到了 �(����2�) , 整体空间复杂度就是 �(�(���2�)2) ,解决了Transformer的可扩展性瓶颈。另外作者还提出了一些其他的稀疏性策略, 具体可以参考原论文。
堆叠theorem证明以及其他稀疏策略:Local Attention & Restart Attention
对比在不同限制条件下的预测效果,可以看出LogSparse在更复杂的交通数据集上对模型提升效果更明显,也说明了长期依赖的重要性。
三、具体实现
3.1 Version(1)
一个比较容易上手的简单实现版本(没有用到稀疏策略)是:
大致涉及几个类:
- DataProcessor
-
class DataProcessor(Dataset):
-
""
"synthetic time series dataset from section 5.1"
""
-
-
def __init__(
self,t
0
=
96,N
=
4500,transform
=None):
-
""
"
-
Args:
-
t0: previous t0 data points to predict from
-
N: number of data points
-
transform: any transformations to be applied to time series
-
"
""
-
self.t
0
= t
0
-
self.N
= N
-
self.transform
= None
-
-
#
time points
-
self.x
= torch.cat(N
*[torch.arange(
0,t
0
+
24).
type(torch.float).unsqueeze(
0)])
-
-
# sinuisoidal signal
-
# 如果用到自己的数据的话,把下面这块改掉就好
-
# 注意数据输入格式为(N,Nb
of timepoints)
-
# 其中N为你有多少行ts,以电力系统数据为例,一个客户
10天的数据就构成一行ts
-
# 而nb
of timepoints为一行ts中有几个时间点,比如十天小时粒度的,就是
10
*
24
=
240
-
A
1,A
2,A
3
=
60
* torch.rand(
3,N)
-
A
4
= torch.max(A
1,A
2)
-
self.fx
= torch.cat([A
1.unsqueeze(
1)
*torch.sin(np.pi
*
self.x[
0,0:
12]
/
6)
+
72 ,
-
A
2.unsqueeze(
1)
*torch.sin(np.pi
*
self.x[
0,12:
24]
/
6)
+
72 ,
-
A
3.unsqueeze(
1)
*torch.sin(np.pi
*
self.x[
0,24:t
0]
/
6)
+
72,
-
A
4.unsqueeze(
1)
*torch.sin(np.pi
*
self.x[
0,t
0:t
0
+
24]
/
12)
+
72],
1)
-
-
#
add noise
-
self.fx
=
self.fx
+ torch.randn(
self.fx.shape)
-
-
self.masks
=
self._
generate_square_subsequent_mask(t
0)
-
-
-
# print out shapes
to confirm desired
output
-
print(
"x: {}*{}".
format(
*list(
self.x.shape)),
-
"fx: {}*{}".
format(
*list(
self.fx.shape)))
-
-
def __len__(
self):
-
return len(
self.fx)
-
-
def __getitem__(
self,idx):
-
if torch.
is_tensor(idx):
-
idx
= idx.tolist()
-
-
-
sample
= (
self.x[idx,:],
-
self.fx[idx,:],
-
self.masks)
-
-
if
self.transform:
-
sample
=
self.transform(sample)
-
-
return sample
-
-
def _
generate_square_subsequent_mask(
self,t
0):
-
mask
= torch.
zeros(t
0
+
24,t
0
+
24)
-
for i
in range(
0,t
0):
-
mask[i,t
0:]
=
1
-
for i
in range(t
0,t
0
+
24):
-
mask[i,i
+
1:]
=
1
-
mask
= mask.float().masked_fill(mask
=
=
1, float(
'-inf'))#.masked_fill(mask
=
=
1, float(
0.0))
-
return mask
- 1
- TransformerTimeSeries
-
class CausalConv
1d(torch.nn.Conv
1d):
-
def __init__(
self,
-
in_channels,
-
out_channels,
-
kernel_
size,
-
stride
=
1,
-
dilation
=
1,
-
groups
=
1,
-
bias
=
True):
-
-
super(CausalConv
1d,
self).__init__(
-
in_channels,
-
out_channels,
-
kernel_
size
=kernel_
size,
-
stride
=stride,
-
padding
=
0,
-
dilation
=dilation,
-
groups
=groups,
-
bias
=bias)
-
-
self.__padding
= (kernel_
size
-
1)
* dilation
-
-
def forward(
self,
input):
-
return
super(CausalConv
1d,
self).forward(F.pad(
input, (
self.__padding,
0)))
-
-
-
class context_embedding(torch.nn.Module):
-
def __init__(
self,
in_channels
=
1,embedding_
size
=
256,k
=
5):
-
super(context_embedding,
self).__init__()
-
self.causal_convolution
= CausalConv
1d(
in_channels,embedding_
size,kernel_
size
=k)
-
-
def forward(
self,x):
-
x
=
self.causal_convolution(x)
-
return F.tanh(x)
-
-
# model
class
-
class TransformerTimeSeries(torch.nn.Module):
-
""
"
-
Time Series application of transformers based on paper
-
-
causal_convolution_layer parameters:
-
in_channels: the number of features per time point
-
out_channels: the number of features outputted per time point
-
kernel_size: k is the width of the 1-D sliding kernel
-
-
nn.Transformer parameters:
-
d_model: the size of the embedding vector (input)
-
-
PositionalEncoding parameters:
-
d_model: the size of the embedding vector (positional vector)
-
dropout: the dropout to be used on the sum of positional+embedding vector
-
-
"
""
-
def __init__(
self):
-
super(TransformerTimeSeries,
self).__init__()
-
self.
input_embedding
= causal_convolution_layer.context_embedding(
2,256,9)
-
self.positional_embedding
= torch.nn.Embedding(
512,256)
-
-
-
self.decode_layer
= torch.nn.TransformerEncoderLayer(d_model
=
256,nhead
=
8)
-
self.transformer_decoder
= torch.nn.TransformerEncoder(
self.decode_layer, num_layers
=
3)
-
-
self.fc
1
= torch.nn.Linear(
256,1)
-
-
def forward(
self,x,y,attention_masks):
-
-
# concatenate observed points
and
time covariate
-
# (B
*feature_
size
*n_
time_points)
-
z
= torch.cat((y.unsqueeze(
1),x.unsqueeze(
1)),
1)
-
-
#
input_embedding returns shape (Batch
size,embedding
size,
sequence len) -
> need (
sequence len,Batch
size,embedding_
size)
-
z_embedding
=
self.
input_embedding(z).permute(
2,0,1)
-
-
#
get my positional embeddings (Batch
size,
sequence_len, embedding_
size) -
> need (
sequence len,Batch
size,embedding_
size)
-
positional_embeddings
=
self.positional_embedding(x.
type(torch.long)).permute(
1,0,2)
-
-
input_embedding
= z_embedding
+positional_embeddings
-
-
transformer_embedding
=
self.transformer_decoder(
input_embedding,attention_masks)
-
-
output
=
self.fc
1(transformer_embedding.permute(
1,0,2))
-
-
return
output
- 1
和具体train函数
-
def train(train_dataset,
test_dataset, t
0, future):
-
train_dl
= DataLoader(train_dataset, batch_
size
=
32, shuffle
=
True)
-
test_dl
= DataLoader(
test_dataset, batch_
size
=
128)
-
-
model
= TransformerTimeSeries()
-
-
lr
= .
0005 # learning rate
-
opt
= torch.optim.Adam(model.parameters(), lr
=lr)
-
epochs
=
50
-
criterion
= torch.nn.MSELoss()
-
-
train_epoch_loss
= []
-
eval_epoch_loss
= []
-
Rp_best
=
1e
5
-
model_save_path
=
'ConvTransformer_nologsparse.pth'
-
for e, epoch
in enumerate(range(epochs)):
-
train_loss
= []
-
eval_loss
= []
-
-
l_t
= train_epoch(model, train_dl, opt, criterion, t
0)
-
train_loss.append(l_t)
-
-
Rp
=
test_epoch(model,
test_dl, t
0, future)
-
-
if Rp_best
> Rp:
-
Rp_best
= Rp
-
torch.save({
-
'epoch': epoch,
-
'model_state_dict': model.state_dict(),
-
'optimizer_state_dict': opt.state_dict(),
-
'loss': Rp,
-
}, model_save_path)
-
-
train_epoch_loss.append(np.mean(train_loss))
-
eval_epoch_loss.append(np.mean(eval_loss))
-
-
print(
"Epoch {}: Train loss: {} \t Validation loss: {} \t R_p={}".
format(e,
-
np.mean(train_loss),
-
np.mean(eval_loss), Rp))
-
-
print(
"Rp best={}".
format(Rp_best))
- 1
没啥需要特别说明的地方,就是这个作者只是写了一个论文构造数据复现的代码,所以工程性很差,很多地方都写死或者没有抽象出来,拿来即用的同学们需要注意改一下。另外他也没有prediction相关函数,我这边写了一个,仅供参考:
- prediction
-
def prediction(model, dl, t
0, future):
-
# 预测前先load model, dl就是待预测数据,t
0就是前n和时间点,future就是要预测的n个时间点
-
# 比如你要用一周内前五天的数据训练模型,来预测后两天的值 t
0
=
5
*
24
=
120, future
=
48
-
with torch.
no_grad():
-
predictions
= []
-
observations
= []
-
for step, (x, y, attention_masks)
in enumerate(dl):
-
# x: (batch_
size, total_ts_
length)
-
# y: (batch_
size, total_ts_
length)
-
# ouput:(batch_
size, total_ts_
length,
1)
-
output
= model(x, y, attention_masks[
0])
-
history
= y[:, :t
0].cpu().numpy().tolist()
-
for p, o
in zip(
output.squeeze()[:, (t
0
-
1):(t
0
+ future
-
1)].cpu().numpy().tolist(),
-
y[:, t
0:].cpu().numpy().tolist()): #
not missing
data
-
-
predictions.append(p) # (batch_
size, future)
-
observations.append(o) # (batch_
size, future)
-
num
=
0
-
den
=
0
-
for hist, y_preds, y_trues
in zip(history, predictions, observations):
-
plot_result(hist, y_preds, y_trues, t
0)
-
num_i, den_i
= Rp_num_den(y_preds, y_trues, .
5)
-
num
+
= num_i
-
den
+
= den_i
-
Rp
= (
2
* num)
/ den
-
return Rp
- 1
- plot_result
-
def plot_result(history, yhat, ytruth, t
0):
-
# 带上历史值
-
yhat
= history
+ yhat
-
ytruth
= history
+ ytruth
-
# 画图
-
x
= range(len(ytruth))
-
yhat
= np.round(yhat,
2)
-
ytruth
= np.round(ytruth,
2)
-
plt.figure(facecolor
=
'w')
-
plt.plot(range(len(x)), ytruth,
'green', linewidth
=
1.5, label
=
'ground truth')
-
plt.plot(range(len(x)), yhat,
'blue', alpha
=
0.8, linewidth
=
1.2, label
=
'predict value')
-
# 画条预测起始线
-
plt.vlines(t
0, yhat.min()
*
0.99, yhat.m
ax()
*
1.01,
-
alpha
=
0.7, colors
=
"r", linestyles
=
"dashed")
-
# plt.text(
0.15,
0.01,
error_message,
size
=
10, alpha
=
0.9, transform
=plt.gc
a().transAxes) # 相对位置,经验设置值
-
plt.legend(loc
=
'best') # 设置标签的位置
-
plt.grid(
True)
-
plt.show()
- 1
画图效果如下
3.2 Version(2)
Log Sparse策略的实现参考了:ghsama/ConvTransformerTimeSeries
3.2.1 涉及GPU训练的朋友们看这里
- GPU + torch环境配置:Pytorch实战总结篇之使用GPU训练模型_Miracle8070-CSDN博客
- 代码层面,train.py头部加几行代码做个device判断:
-
# GPU | CPU
-
device
=
'cpu'
-
if torch.cuda.
is_available():
-
torch.
set_
default_tensor_
type(torch.cuda.FloatTensor)
-
device
=
'cuda'
- 1
''' Log Sparse Version '''
ForcastConvTransformer
-
#
Self Attention
Class
-
class SelfAttentionConv(nn.Module):
-
def __init__(
self, k, headers
=
8, kernel_
size
=
5, mask_
next
=
True, mask_diag
=
False):
-
super().__init__()
-
-
self.k,
self.headers,
self.kernel_
size
= k, headers, kernel_
size
-
self.mask_
next
= mask_
next
-
self.mask_diag
= mask_diag
-
-
h
= headers
-
-
# Query,
Key
and
Value Transformations
-
padding
= (kernel_
size
-
1)
-
self.padding_opertor
= nn.ConstantPad
1d((padding,
0),
0)
-
-
self.toqueries
= nn.Conv
1d(k, k
* h, kernel_
size, padding
=
0, bias
=
True)
-
self.tokeys
= nn.Conv
1d(k, k
* h, kernel_
size, padding
=
0, bias
=
True)
-
self.tovalues
= nn.Conv
1d(k, k
* h, kernel_
size
=
1, padding
=
0, bias
=
False) #
No convolution operated
-
-
# Heads unifier
-
self.unifyheads
= nn.Linear(k
* h, k)
-
-
def forward(
self, x):
-
# Extraction dimensions
-
b, t, k
= x.
size() # batch_
size,
number_
of_timesteps,
number_
of_
time_series
-
-
# Checking Embedding dimension
-
assert
self.k
=
= k,
'Number of time series '
+ str(k)
+
' didn t much the number of k '
+ str(
-
self.k)
+
' in the initiaalization of the attention layer.'
-
h
=
self.headers
-
-
# Transpose
to see the different
time series
as different channels
-
x
= x.transpose(
1,
2)
-
x_padded
=
self.padding_opertor(x)
-
-
# Query,
Key
and
Value Transformations
-
queries
=
self.toqueries(x_padded).view(b, k, h, t)
-
keys
=
self.tokeys(x_padded).view(b, k, h, t)
-
values
=
self.tovalues(x).view(b, k, h, t)
-
-
# Transposition
to
return the canonical
format
-
queries
= queries.transpose(
1,
2) # batch, header,
time serie,
time step (b, h, k, t)
-
queries
= queries.transpose(
2,
3) # batch, header,
time step,
time serie (b, h, t, k)
-
-
values
=
values.transpose(
1,
2) # batch, header,
time serie,
time step (b, h, k, t)
-
values
=
values.transpose(
2,
3) # batch, header,
time step,
time serie (b, h, t, k)
-
-
keys
= keys.transpose(
1,
2) # batch, header,
time serie,
time step (b, h, k, t)
-
keys
= keys.transpose(
2,
3) # batch, header,
time step,
time serie (b, h, t, k)
-
-
# Weights
-
queries
= queries
/ (k
** (.
25))
-
keys
= keys
/ (k
** (.
25))
-
-
queries
= queries.transpose(
1,
2).contiguous().view(b
* h, t, k)
-
keys
= keys.transpose(
1,
2).contiguous().view(b
* h, t, k)
-
values
=
values.transpose(
1,
2).contiguous().view(b
* h, t, k)
-
-
weights
= torch.bmm(queries, keys.transpose(
1,
2))
-
-
## Mask the upper
& diag
of the attention matrix
-
if
self.mask_
next:
-
if
self.mask_diag:
-
indices
= torch.triu_indices(t, t, offset
=
0)
-
weights[:, indices[
0], indices[
1]]
= float(
'-inf')
-
else:
-
indices
= torch.triu_indices(t, t, offset
=
1)
-
weights[:, indices[
0], indices[
1]]
= float(
'-inf')
-
-
# Softmax
-
weights
= F.softmax(weights, dim
=
2)
-
-
#
Output
-
output
= torch.bmm(weights,
values)
-
output
=
output.view(b, h, t, k)
-
output
=
output.transpose(
1,
2).contiguous().view(b, t, k
* h)
-
-
return
self.unifyheads(
output) # shape (b,t,k)
-
-
-
# Conv Transforme
Block
-
class ConvTransformerBLock(nn.Module):
-
def __init__(
self, k, headers, kernel_
size
=
5, mask_
next
=
True, mask_diag
=
False, dropout_proba
=
0.2):
-
super().__init__()
-
-
#
Self attention
-
self.attention
= SelfAttentionConv(k, headers, kernel_
size, mask_
next, mask_diag)
-
-
#
First
& Second Norm
-
self.norm
1
= nn.LayerNorm(k)
-
self.norm
2
= nn.LayerNorm(k)
-
-
# Feed Forward Network
-
self.feedforward
= nn.
Sequential(
-
nn.Linear(k,
4
* k),
-
nn.ReLU(),
-
nn.Linear(
4
* k, k)
-
)
-
# Dropout funtcion
& Relu:
-
self.dropout
= nn.Dropout(p
=dropout_proba)
-
self.activation
= nn.ReLU()
-
-
def forward(
self, x, train
=
False):
-
#
Self attention
+ Residual
-
x
=
self.attention(x)
+ x
-
-
# Dropout attention
-
if train:
-
x
=
self.dropout(x)
-
-
#
First Normalization
-
x
=
self.norm
1(x)
-
-
# Feed Froward network
+ residual
-
x
=
self.feedforward(x)
+ x
-
-
# Second Normalization
-
x
=
self.norm
2(x)
-
-
return x
-
-
-
# Forcasting Conv Transformer :
-
class ForcastConvTransformer(nn.Module):
-
def __init__(
self, k, headers, depth, seq_
length, kernel_
size
=
5, mask_
next
=
True, mask_diag
=
False, dropout_proba
=
0.2,
-
num_tokens
=None):
-
super().__init__()
-
# Embedding
-
self.tokens_
in_
count
=
False
-
if num_tokens:
-
self.tokens_
in_
count
=
True
-
self.token_embedding
= nn.Embedding(num_tokens, k) # (
369,
1)
= (nb_ts, k)
-
-
# Embedding the position
-
self.position_embedding
= nn.Embedding(seq_
length, k) # (
500,
1)
= (windows_
size, k)
-
-
#
Number
of kind
of
time series
-
self.k
= k # 没有协变量的情况下,k
=
1
-
self.seq_
length
= seq_
length # seq_
length即窗口大小, 数据准备的时候切割好了
-
-
# Transformer blocks
-
tblocks
= []
-
# log sparse 稀疏策略: 采用多层ConvTrans层堆叠的方式
-
for t
in range(depth):
-
tblocks.append(ConvTransformerBLock(k, headers, kernel_
size, mask_
next, mask_diag, dropout_proba))
-
self.TransformerBlocks
= nn.
Sequential(
*tblocks)
-
-
# Transformation
from k dimension
to numClasses
-
self.topreSigma
= nn.Linear(k,
1)
-
self.tomu
= nn.Linear(k,
1)
-
self.
plus
= nn.Softplus()
-
-
def forward(
self, x, tokens
=None):
-
b, t, k
= x.
size()
-
-
# checking that the given batch had
same
number
of
time series
as the
BLock had
-
assert k
=
=
self.k,
'The k :'
+ str(
-
self.k)
+
' number of timeseries given in the initialization is different than what given in the x :'
+ str(
-
k)
-
assert t
=
=
self.seq_
length,
'The lenght of the timeseries given t '
+ str(
-
t)
+
' miss much with the lenght sequence given in the Tranformers initialisation self.seq_length: '
+ str(
-
self.seq_
length)
-
-
# Position embedding
-
pos
= torch.arange(t)
-
self.pos_emb
=
self.position_embedding(pos).expand(b, t, k)
-
-
# Checking token embedding
-
assert
self.tokens_
in_
count
=
= (
not (tokens
is None)),
'self.tokens_in_count = '
+ str(
-
self.tokens_
in_
count)
+
' should be equal to (not (tokens is None)) = '
+ str((
not (tokens
is None)))
-
if
not (tokens
is None):
-
## checking that the
number
of tockens corresponde
to the
number
of batch elements
-
assert tokens.
size(
0)
=
= b
-
self.tok_emb
=
self.token_embedding(tokens)
-
self.tok_emb
=
self.tok_emb.expand(t, b, k).transpose(
0,
1)
-
-
# Adding Pos Embedding
and token Embedding
to the variable
-
if
not (tokens
is None):
-
x
=
self.pos_emb
+
self.tok_emb
+ x
-
else:
-
x
=
self.pos_emb
+ x
-
-
# Transformer :
-
x
=
self.TransformerBlocks(x)
-
mu
=
self.tomu(x)
-
presigma
=
self.topreSigma(x)
-
sigma
=
self.
plus(presigma)
-
-
return mu, sigma
- 1
四、评估指标
时间序列算法预测效果好与否,依赖于一个好的评估指标。
常见的有基于预测误差的各类指标如RMSE、MSE、MAE、MAPE、SMAPE,这边就不赘述了,公式不清楚的同学见:
也可以基于预测x的概率分布来做,如Quantile Loss、CRPS (Continuous Ranked Probability Score)、这篇paper中提及的 ��(�,�^) 等。
郭奇:需求预测准确率(Forecast Accuracy)中分母是实际值好还是预测值好?13 赞同 · 2 评论文章正在上传…重新上传取消
五、效果对比
原论文在真实数据集上进行了训练评估,并与ARIMA,TRMF以及DeepAR等模型进行了对比实验。
对比的基线模型:
-
- ARIMA:将自回归(AR)的算子加上移动平均(MA),就是 ARIMA 算法。回归能够反映数据的周期性规律,和移动平均形成互补,从统计学的角度可以很好的预测一元与时间强相关场景下的时间序列。
- TRMF:矩阵分解方法。
- DeepAR:基于LSTM的自回归概率预测方法。
- DeepState: 基于RNN的状态空间方法。
实验结果
实验表明其在多个公开数据集上取得state-of-the-art。
六、总结
老生常谈,算法再SOTA,还是要结合你具体的业务场景去选择。
在forecast常见的业务场景,传统方法基于统计、自回归的预测方法,针对单条时间线,虽然需要根据具体数据特征实时计算,但是也轻便快速好上手;
相比之下,深度学习方法能同时考虑多条时间序列之间的相关性,并且可以引入协变量辅助模型判断(例如预测未来销售量时,如果只接受时间和历史销售量数据,则是自回归预测,如果可以接受天气、经济指数、政策事件分类等其他协变量,则称为使用协变量进行预测。)适合业务数据量大、全的场景。
声明:
所有文章都为本人的学习笔记,非商用,
目的只求在工作学习过程中通过记录,梳理清楚自己的知识体系。
引用
- ghsama/ConvTransformerTimeSeries
- 时间序列预测的评估指标补遗
- https://zhuanlan.zhihu.com/p/391337035