赞
踩
run.py
文件保证在不修改任何参数的情况下,代码可以跑通,这里windows系统需要将代码中--is_training
、--model_id
、--model
、--data
参数中required=True
选项删除,否则会报错。--num_workers
参数需要置为0。data
用来存放训练数据,可以使用ETTh1
数据,这里提供下载地址run.py
训练完成不报错就成功了,建议在GPU上进行,CPU几乎没办法训练。parser = argparse.ArgumentParser(description='TimesNet') # basic config # 任务类型 parser.add_argument('--task_name', type=str, default='long_term_forecast', help='task name, options:[long_term_forecast, short_term_forecast, imputation, classification, anomaly_detection]') # 置为训练模式 parser.add_argument('--is_training', type=int, default=1, help='status') # 模型名称 parser.add_argument('--model_id', type=str, default='test', help='model id') # 选择模型 parser.add_argument('--model', type=str, default='TimesNet', help='model name, options: [Autoformer, Transformer, TimesNet]') # data loader # 数据名称 parser.add_argument('--data', type=str, default='ETTh1', help='dataset type') # 数据所在文件夹 parser.add_argument('--root_path', type=str, default='./data/', help='root path of the data file') # 数据文件全称 parser.add_argument('--data_path', type=str, default='ETTh1.csv', help='data file') # 时间特征处理方式 parser.add_argument('--features', type=str, default='M', help='forecasting task, options:[M, S, MS]; M:multivariate predict multivariate, S:univariate predict univariate, MS:multivariate predict univariate') # 目标列列名 parser.add_argument('--target', type=str, default='OT', help='target feature in S or MS task') # 时间采集粒度 parser.add_argument('--freq', type=str, default='h', help='freq for time features encoding, options:[s:secondly, t:minutely, h:hourly, d:daily, b:business days, w:weekly, m:monthly], you can also use more detailed freq like 15min or 3h') # 模型权重保存文件夹 parser.add_argument('--checkpoints', type=str, default='./checkpoints/', help='location of model checkpoints') # forecasting task # 回视窗口 parser.add_argument('--seq_len', type=int, default=96, help='input sequence length') # 先验序列长度 parser.add_argument('--label_len', type=int, default=48, help='start token length') # 预测窗口长度 parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length') # 季节模式(针对M4数据集) parser.add_argument('--seasonal_patterns', type=str, default='Monthly', help='subset for M4') # inputation task # 插补任务中数据丢失率 parser.add_argument('--mask_rate', type=float, default=0.25, help='mask ratio') # anomaly detection task # 异常检测中异常点占比 parser.add_argument('--anomaly_ratio', type=float, default=0.25, help='prior anomaly ratio (%)') # model define # TimesBlock中傅里叶变换,频率排名前k个周期 parser.add_argument('--top_k', type=int, default=5, help='for TimesBlock') # Inception中卷积核个数 parser.add_argument('--num_kernels', type=int, default=6, help='for Inception') # encoder输入特征数 parser.add_argument('--enc_in', type=int, default=7, help='encoder input size') # decoder输入特征数 parser.add_argument('--dec_in', type=int, default=7, help='decoder input size') # 输出通道数 parser.add_argument('--c_out', type=int, default=7, help='output size') # 线性层隐含神经元个数 parser.add_argument('--d_model', type=int, default=512, help='dimension of model') # 多头注意力机制 parser.add_argument('--n_heads', type=int, default=8, help='num of heads') # encoder层数 parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers') # decoder层数 parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers') # FFN层隐含神经元个数 parser.add_argument('--d_ff', type=int, default=2048, help='dimension of fcn') # 滑动窗口长度 parser.add_argument('--moving_avg', type=int, default=25, help='window size of moving average') # 对Q进行采样,对Q采样的因子数 parser.add_argument('--factor', type=int, default=1, help='attn factor') # 是否下采样操作pooling parser.add_argument('--distil', action='store_false', help='whether to use distilling in encoder, using this argument means not using distilling', default=True) # dropout率 parser.add_argument('--dropout', type=float, default=0.1, help='dropout') # 时间特征嵌入方式 parser.add_argument('--embed', type=str, default='timeF', help='time features encoding, options:[timeF, fixed, learned]') # 激活函数类型 parser.add_argument('--activation', type=str, default='gelu', help='activation') # 是否输出attention parser.add_argument('--output_attention', action='store_true', help='whether to output attention in ecoder') # optimization # 并行核心数 parser.add_argument('--num_workers', type=int, default=0, help='data loader num workers') # 实验轮数 parser.add_argument('--itr', type=int, default=1, help='experiments times') # 训练迭代次数 parser.add_argument('--train_epochs', type=int, default=10, help='train epochs') # batch size大小 parser.add_argument('--batch_size', type=int, default=32, help='batch size of train input data') # early stopping机制容忍次数 parser.add_argument('--patience', type=int, default=3, help='early stopping patience') # 学习率 parser.add_argument('--learning_rate', type=float, default=0.0001, help='optimizer learning rate') parser.add_argument('--des', type=str, default='test', help='exp description') # 损失函数 parser.add_argument('--loss', type=str, default='MSE', help='loss function') # 学习率下降策略 parser.add_argument('--lradj', type=str, default='type1', help='adjust learning rate') # 使用混合精度训练 parser.add_argument('--use_amp', action='store_true', help='use automatic mixed precision training', default=False) # GPU parser.add_argument('--use_gpu', type=bool, default=False, help='use gpu') parser.add_argument('--gpu', type=int, default=0, help='gpu') parser.add_argument('--use_multi_gpu', action='store_true', help='use multiple gpus', default=False) parser.add_argument('--devices', type=str, default='0,1,2,3', help='device ids of multile gpus') # de-stationary projector params parser.add_argument('--p_hidden_dims', type=int, nargs='+', default=[128, 128], help='hidden layer dimensions of projector (List)') parser.add_argument('--p_hidden_layers', type=int, default=2, help='number of hidden layers in projector')
我们在exp.train(setting)
行打上断点跳到训练主函数exp_long_term_forecasting.py
。
在_get_data
中找到数据处理函数data_factory.py
点击进入,可以看到各标准数据集处理方法:
data_dict = {
'ETTh1': Dataset_ETT_hour,
'ETTh2': Dataset_ETT_hour,
'ETTm1': Dataset_ETT_minute,
'ETTm2': Dataset_ETT_minute,
'custom': Dataset_Custom,
'm4': Dataset_M4,
'PSM': PSMSegLoader,
'MSL': MSLSegLoader,
'SMAP': SMAPSegLoader,
'SMD': SMDSegLoader,
'SWAT': SWATSegLoader,
'UEA': UEAloader
}
ETTh1
,那么数据处理的方式为Dataset_ETT_hour
,我们进入data_loader.py
文件,找到Dataset_ETT_hour
类__init__
主要用于传各类参数,这里不过多赘述,主要对__read_data__
进行说明def __read_data__(self): # 数据标准化实例 self.scaler = StandardScaler() # 读取数据 df_raw = pd.read_csv(os.path.join(self.root_path, self.data_path)) # 计算数据起始点 border1s = [0, 12 * 30 * 24 - self.seq_len, 12 * 30 * 24 + 4 * 30 * 24 - self.seq_len] border2s = [12 * 30 * 24, 12 * 30 * 24 + 4 * 30 * 24, 12 * 30 * 24 + 8 * 30 * 24] border1 = border1s[self.set_type] border2 = border2s[self.set_type] # 如果预测对象为多变量预测或多元预测单变量 if self.features == 'M' or self.features == 'MS': # 取除日期列的其他所有列 cols_data = df_raw.columns[1:] df_data = df_raw[cols_data] # 若预测类型为S(单特征预测单特征) elif self.features == 'S': # 取特征列 df_data = df_raw[[self.target]] # 将数据进行归一化 if self.scale: train_data = df_data[border1s[0]:border2s[0]] self.scaler.fit(train_data.values) data = self.scaler.transform(df_data.values) else: data = df_data.values # 取日期列 df_stamp = df_raw[['date']][border1:border2] # 利用pandas将数据转换为日期格式 df_stamp['date'] = pd.to_datetime(df_stamp.date) # 构建时间特征 if self.timeenc == 0: df_stamp['month'] = df_stamp.date.apply(lambda row: row.month, 1) df_stamp['day'] = df_stamp.date.apply(lambda row: row.day, 1) df_stamp['weekday'] = df_stamp.date.apply(lambda row: row.weekday(), 1) df_stamp['hour'] = df_stamp.date.apply(lambda row: row.hour, 1) data_stamp = df_stamp.drop(['date'], 1).values elif self.timeenc == 1: # 时间特征构造函数 data_stamp = time_features(pd.to_datetime(df_stamp['date'].values), freq=self.freq) # 转置 data_stamp = data_stamp.transpose(1, 0) # 取数据特征列 self.data_x = data[border1:border2] self.data_y = data[border1:border2] self.data_stamp = data_stamp
time_features
函数,用来提取日期特征,比如't':['month','day','weekday','hour','minute']
,表示提取月,天,周,小时,分钟。可以打开timefeatures.py
文件进行查阅,同样后期也可以加一些日期编码进去。__getitem__
进行说明def __getitem__(self, index): # 随机取得标签 s_begin = index # 训练区间 s_end = s_begin + self.seq_len # 有标签区间+无标签区间(预测时间步长) r_begin = s_end - self.label_len r_end = r_begin + self.label_len + self.pred_len # 取训练数据 seq_x = self.data_x[s_begin:s_end] seq_y = self.data_y[r_begin:r_end] # 取训练数据对应时间特征 seq_x_mark = self.data_stamp[s_begin:s_end] # 取有标签区间+无标签区间(预测时间步长)对应时间特征 seq_y_mark = self.data_stamp[r_begin:r_end] return seq_x, seq_y, seq_x_mark, seq_y_mark
models
文件夹下的TimesNet.py
,可以看到TimesBlock
和Model
类FFT_for_Period
函数逐行代码解析请看注释,示意图如下:def FFT_for_Period(x, k=2): # [B, T, C] # 一维离散傅里叶变换,沿T维度[B, T, C] --> [B, T//2+1, C] xf = torch.fft.rfft(x, dim=1) # 通过增幅寻找周期 # 在每个频率上的平均值,然后对所有频率取平均[B, T//2+1, C] --> [T//2+1] frequency_list = abs(xf).mean(0).mean(-1) # 将第一个频率值设置为0(直流分量) frequency_list[0] = 0 # 在频率列表中找到前K个最大值的索引 _, top_list = torch.topk(frequency_list, k) # 将top_list张量转换为numpy数组 top_list = top_list.detach().cpu().numpy() # 通过将序列的总长度除以每个顶部频率,计算周期 period = x.shape[1] // top_list # 返回period,计算选定的顶部频率在最后一个维度C上的平均幅度[B, T//2+1, C] --> [B, k] return period, abs(xf).mean(-1)[:, top_list]
TimesBlock
类,论文中提到的parameter-efficient
块是一个类似于Inception
视觉模型的架构。Inception
模型采用多分支结构,它将1×1卷积、3×3卷积最大池化堆叠在一起。这种结构既可以增加网络的宽度,又可以增强网络对不同尺寸的适应性。layers
文件夹下的Conv_Blocks.py
文件。里面实现了两个类Inception_Block_V1
、Inception_Block_V2
。作者在TimesBlock
中只使用了V1
版本。padding
操作。class Inception_Block_V1(nn.Module): def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True): super(Inception_Block_V1, self).__init__() self.in_channels = in_channels self.out_channels = out_channels self.num_kernels = num_kernels kernels = [] for i in range(self.num_kernels): # 二维卷积层,卷积核(1, 3, 5, 7,...) kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=2 * i + 1, padding=i)) self.kernels = nn.ModuleList(kernels) if init_weight: self._initialize_weights() # 初始化权重 def _initialize_weights(self): for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') if m.bias is not None: nn.init.constant_(m.bias, 0) def forward(self, x): # 新建列表 res_list = [] # 遍历每一层2维卷积 for i in range(self.num_kernels): # 将每一层的结果添加到列表中 res_list.append(self.kernels[i](x)) # 将列表沿最后一个维度拼接, 然后在最后一个维度求平均 res = torch.stack(res_list, dim=-1).mean(-1) return res class Inception_Block_V2(nn.Module): def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True): super(Inception_Block_V2, self).__init__() self.in_channels = in_channels self.out_channels = out_channels self.num_kernels = num_kernels kernels = [] for i in range(self.num_kernels // 2): # 二维卷积,卷积核为[(1,3), (1,5), (1,7)] kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[1, 2 * i + 3], padding=[0, i + 1])) # 二维卷积,卷积核为[(3,1), (5,1), (7,1)] kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[2 * i + 3, 1], padding=[i + 1, 0])) # 再添加一个2维卷积,卷积核为(1,1) kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=1)) self.kernels = nn.ModuleList(kernels) if init_weight: self._initialize_weights() def _initialize_weights(self): for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') if m.bias is not None: nn.init.constant_(m.bias, 0) def forward(self, x): res_list = [] for i in range(self.num_kernels + 1): # 得到每层的输出 res_list.append(self.kernels[i](x)) # 将列表沿最后一个维度拼接, 然后在最后一个维度求平均 res = torch.stack(res_list, dim=-1).mean(-1) return res
TimesBlock
类,再经过一维离散傅里叶变换后,取得频率排名前k的周期,将其进行padding使其符合卷积操作数据维度。将得到的结果进行堆叠拼接。然后把频率权重进行自适应聚合,最后加入残差连接得到输出。具体看代码解析class TimesBlock(nn.Module): def __init__(self, configs): super(TimesBlock, self).__init__() self.seq_len = configs.seq_len self.pred_len = configs.pred_len self.k = configs.top_k # parameter-efficient块 self.conv = nn.Sequential( Inception_Block_V1(configs.d_model, configs.d_ff, num_kernels=configs.num_kernels), nn.GELU(), Inception_Block_V1(configs.d_ff, configs.d_model, num_kernels=configs.num_kernels) ) def forward(self, x): # 得到B, T, N B, T, N = x.size() # 得到周期以及对应权重 period_list, period_weight = FFT_for_Period(x, self.k) res = [] for i in range(self.k): # 取周期 period = period_list[i] # padding操作 if (self.seq_len + self.pred_len) % period != 0: length = (((self.seq_len + self.pred_len) // period) + 1) * period # padding:[batch,length - seq_len - pred_len,feature] padding = torch.zeros([x.shape[0], (length - (self.seq_len + self.pred_len)), x.shape[2]]).to(x.device) # 沿len维度拼接 out = torch.cat([x, padding], dim=1) # 不需要padding else: length = (self.seq_len + self.pred_len) out = x # 重塑[batch,length // period,period,feature] --> [batch,feature,length // period,period](深拷贝) out = out.reshape(B, length // period, period, N).permute(0, 3, 1, 2).contiguous() # 进入卷积网络 out = self.conv(out) # 重塑结果(batch, -1, feature) out = out.permute(0, 2, 3, 1).reshape(B, -1, N) res.append(out[:, :(self.seq_len + self.pred_len), :]) # 将结果沿feature维度拼接 res = torch.stack(res, dim=-1) # 自适应聚合 period_weight = F.softmax(period_weight, dim=1) # 添加两个维度,重复使其与res维度一致 period_weight = period_weight.unsqueeze( 1).unsqueeze(1).repeat(1, T, N, 1) # 沿feature维度求和 res = torch.sum(res * period_weight, -1) # 残差连接 res = res + x return res
model
类,针对每个任务类型,建立函数,forecast
(预测),imputation
(插值),anomaly_detection
(异常检测),classification
(分类)。# 预测类函数 def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec): # 非稳态Transfromer标准归一化 # 计算均值 means = x_enc.mean(1, keepdim=True).detach() # 减去均值 x_enc = x_enc - means # 计算标准差 stdev = torch.sqrt( torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5) # 除以标准差 x_enc /= stdev # embedding enc_out = self.enc_embedding(x_enc, x_mark_enc) # [B,T,C] # [B,T,C] --> [B,C,T] --> [B,T,C] enc_out = self.predict_linear(enc_out.permute(0, 2, 1)).permute( 0, 2, 1) # align temporal dimension # TimesNet for i in range(self.layer): # layer_norm enc_out = self.layer_norm(self.model[i](enc_out)) # 映射层 dec_out = self.projection(enc_out) # 非稳态Transfromer反归一化 dec_out = dec_out * \ (stdev[:, 0, :].unsqueeze(1).repeat( 1, self.pred_len + self.seq_len, 1)) dec_out = dec_out + \ (means[:, 0, :].unsqueeze(1).repeat( 1, self.pred_len + self.seq_len, 1)) return dec_out def imputation(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask): # 非稳态Transfromer标准归一化 # 计算均值 means = torch.sum(x_enc, dim=1) / torch.sum(mask == 1, dim=1) means = means.unsqueeze(1).detach() # 减去均值 x_enc = x_enc - means x_enc = x_enc.masked_fill(mask == 0, 0) # 计算方差 stdev = torch.sqrt(torch.sum(x_enc * x_enc, dim=1) / torch.sum(mask == 1, dim=1) + 1e-5) stdev = stdev.unsqueeze(1).detach() # 除以方差 x_enc /= stdev # embedding enc_out = self.enc_embedding(x_enc, x_mark_enc) # [B,T,C] # TimesNet for i in range(self.layer): enc_out = self.layer_norm(self.model[i](enc_out)) # porject back dec_out = self.projection(enc_out) # 非稳态Transfromer反归一化 dec_out = dec_out * \ (stdev[:, 0, :].unsqueeze(1).repeat( 1, self.pred_len + self.seq_len, 1)) dec_out = dec_out + \ (means[:, 0, :].unsqueeze(1).repeat( 1, self.pred_len + self.seq_len, 1)) return dec_out def anomaly_detection(self, x_enc): # Normalization from Non-stationary Transformer means = x_enc.mean(1, keepdim=True).detach() x_enc = x_enc - means stdev = torch.sqrt( torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5) x_enc /= stdev # embedding enc_out = self.enc_embedding(x_enc, None) # [B,T,C] # TimesNet for i in range(self.layer): enc_out = self.layer_norm(self.model[i](enc_out)) # porject back dec_out = self.projection(enc_out) # De-Normalization from Non-stationary Transformer dec_out = dec_out * \ (stdev[:, 0, :].unsqueeze(1).repeat( 1, self.pred_len + self.seq_len, 1)) dec_out = dec_out + \ (means[:, 0, :].unsqueeze(1).repeat( 1, self.pred_len + self.seq_len, 1)) return dec_out def classification(self, x_enc, x_mark_enc): # embedding enc_out = self.enc_embedding(x_enc, None) # [B,T,C] # TimesNet for i in range(self.layer): enc_out = self.layer_norm(self.model[i](enc_out)) # Output # the output transformer encoder/decoder embeddings don't include non-linearity output = self.act(enc_out) output = self.dropout(output) # zero-out padding embeddings output = output * x_mark_enc.unsqueeze(-1) # (batch_size, seq_length * d_model) output = output.reshape(output.shape[0], -1) output = self.projection(output) # (batch_size, num_classes) return output
model
类中__init__
方法和forward
函数class Model(nn.Module): def __init__(self, configs): super(Model, self).__init__() self.configs = configs self.task_name = configs.task_name self.seq_len = configs.seq_len self.label_len = configs.label_len self.pred_len = configs.pred_len # TimesBlock堆叠 self.model = nn.ModuleList([TimesBlock(configs) for _ in range(configs.e_layers)]) self.enc_embedding = DataEmbedding(configs.enc_in, configs.d_model, configs.embed, configs.freq, configs.dropout) self.layer = configs.e_layers self.layer_norm = nn.LayerNorm(configs.d_model) # 若任务是长期预测或短期预测 if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast': # 线性层输出pred_len self.predict_linear = nn.Linear( self.seq_len, self.pred_len + self.seq_len) # 映射,线性层,输出c_out self.projection = nn.Linear( configs.d_model, configs.c_out, bias=True) # 若任务为插值或异常检测 if self.task_name == 'imputation' or self.task_name == 'anomaly_detection': # 映射,线性层,输出c_out self.projection = nn.Linear( configs.d_model, configs.c_out, bias=True) # 若任务为分类 if self.task_name == 'classification': # gelu激活函数 self.act = F.gelu # dropout层 self.dropout = nn.Dropout(configs.dropout) # 映射,线性层,输出num_class self.projection = nn.Linear( configs.d_model * configs.seq_len, configs.num_class) def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec, mask=None): if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast': dec_out = self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec) return dec_out[:, -self.pred_len:, :] # [B, L, D] if self.task_name == 'imputation': dec_out = self.imputation( x_enc, x_mark_enc, x_dec, x_mark_dec, mask) return dec_out # [B, L, D] if self.task_name == 'anomaly_detection': dec_out = self.anomaly_detection(x_enc) return dec_out # [B, L, D] if self.task_name == 'classification': dec_out = self.classification(x_enc, x_mark_enc) return dec_out # [B, N] return None
DataEmbedding
,其定义在layers
文件夹下Embed.py
文件中。其他所有informer
类模型embedding
操作也在这个文件夹中,这里正好做一个整体的解释。class PositionalEmbedding(nn.Module): def __init__(self, d_model, max_len=5000): super(PositionalEmbedding, self).__init__() # 在对数空间中计算位置编码 pe = torch.zeros(max_len, d_model).float() pe.require_grad = False position = torch.arange(0, max_len).float().unsqueeze(1) div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp() # 使用正弦和余弦函数计算位置编码 pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer('pe', pe) def forward(self, x): return self.pe[:, :x.size(1)] class TokenEmbedding(nn.Module): def __init__(self, c_in, d_model): super(TokenEmbedding, self).__init__() padding = 1 if torch.__version__ >= '1.5.0' else 2 # 通过一维卷积进行标记嵌入 self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=d_model, kernel_size=3, padding=padding, padding_mode='circular', bias=False) for m in self.modules(): if isinstance(m, nn.Conv1d): nn.init.kaiming_normal_( m.weight, mode='fan_in', nonlinearity='leaky_relu') def forward(self, x): x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2) return x class FixedEmbedding(nn.Module): def __init__(self, c_in, d_model): super(FixedEmbedding, self).__init__() w = torch.zeros(c_in, d_model).float() w.require_grad = False position = torch.arange(0, c_in).float().unsqueeze(1) div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp() # 使用正弦和余弦函数计算位置编码 w[:, 0::2] = torch.sin(position * div_term) w[:, 1::2] = torch.cos(position * div_term) # 使用固定嵌入层进行标记嵌入 self.emb = nn.Embedding(c_in, d_model) self.emb.weight = nn.Parameter(w, requires_grad=False) def forward(self, x): return self.emb(x).detach() class TemporalEmbedding(nn.Module): def __init__(self, d_model, embed_type='fixed', freq='h'): super(TemporalEmbedding, self).__init__() minute_size = 4 hour_size = 24 weekday_size = 7 day_size = 32 month_size = 13 Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding # 如果频率为't',使用固定嵌入层进行标记嵌入 if freq == 't': self.minute_embed = Embed(minute_size, d_model) # 使用固定嵌入层进行小时标记嵌入 self.hour_embed = Embed(hour_size, d_model) # 使用固定嵌入层进行星期几标记嵌入 self.weekday_embed = Embed(weekday_size, d_model) # 使用固定嵌入层进行日期标记嵌入 self.day_embed = Embed(day_size, d_model) # 使用固定嵌入层进行月份标记嵌入 self.month_embed = Embed(month_size, d_model) def forward(self, x): x = x.long() # 根据频率选择对应的嵌入进行计算 minute_x = self.minute_embed(x[:, :, 4]) if hasattr( self, 'minute_embed') else 0. hour_x = self.hour_embed(x[:, :, 3]) weekday_x = self.weekday_embed(x[:, :, 2]) day_x = self.day_embed(x[:, :, 1]) month_x = self.month_embed(x[:, :, 0]) # 将不同的嵌入进行相加得到最终的时间特征嵌入 return hour_x + weekday_x + day_x + month_x + minute_x class TimeFeatureEmbedding(nn.Module): def __init__(self, d_model, embed_type='timeF', freq='h'): super(TimeFeatureEmbedding, self).__init__() freq_map = {'h': 4, 't': 5, 's': 6, 'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3} d_inp = freq_map[freq] # 使用线性层进行时间特征嵌入 self.embed = nn.Linear(d_inp, d_model, bias=False) def forward(self, x): return self.embed(x) class DataEmbedding(nn.Module): def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1): super(DataEmbedding, self).__init__() self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model) self.position_embedding = PositionalEmbedding(d_model=d_model) self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type, freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding( d_model=d_model, embed_type=embed_type, freq=freq) self.dropout = nn.Dropout(p=dropout) def forward(self, x, x_mark): if x_mark is None: # 如果没有时间标记,则只使用数值嵌入和位置嵌入 x = self.value_embedding(x) + self.position_embedding(x) else: # 如果有时间标记,则使用数值嵌入、时间嵌入和位置嵌入 x = self.value_embedding( x) + self.temporal_embedding(x_mark) + self.position_embedding(x) return self.dropout(x) class DataEmbedding_wo_pos(nn.Module): def __init__(self, c_in, d_model, embed_type='fixed', freq='h', dropout=0.1): super(DataEmbedding_wo_pos, self).__init__() # 值嵌入层,将输入的特征向量映射到d_model维度的向量空间 self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model) # 位置嵌入层,用于对输入进行位置编码 self.position_embedding = PositionalEmbedding(d_model=d_model) # 时间嵌入层,根据embed_type和freq对输入进行时间编码 # 如果embed_type不是'timeF',则使用TemporalEmbedding # 否则使用TimeFeatureEmbedding self.temporal_embedding = TemporalEmbedding(d_model=d_model, embed_type=embed_type, freq=freq) if embed_type != 'timeF' else TimeFeatureEmbedding( d_model=d_model, embed_type=embed_type, freq=freq) # Dropout层,用于在训练过程中进行随机失活 self.dropout = nn.Dropout(p=dropout) def forward(self, x, x_mark): if x_mark is None: # 如果x_mark为空,则仅应用值嵌入到x x = self.value_embedding(x) else: # 否则,将值嵌入和时间嵌入相加 x = self.value_embedding(x) + self.temporal_embedding(x_mark) return self.dropout(x) class PatchEmbedding(nn.Module): def __init__(self, d_model, patch_len, stride, padding, dropout): super(PatchEmbedding, self).__init__() # Patching # patch_len是每个patch的长度 self.patch_len = patch_len # stride是patch的步幅 self.stride = stride # padding_patch_layer用于对输入进行补零操作 self.padding_patch_layer = nn.ReplicationPad1d((0, padding)) # Backbone, Input encoding: projection of feature vectors onto a d-dim vector space # value_embedding将每个patch的特征向量映射到d_model维度的向量空间 self.value_embedding = nn.Linear(patch_len, d_model, bias=False) # Positional embedding # 位置嵌入层,用于对输入进行位置编码 self.position_embedding = PositionalEmbedding(d_model) # Residual dropout # Dropout层 self.dropout = nn.Dropout(dropout) def forward(self, x): # 对输入进行patch操作 n_vars = x.shape[1] # 对输入进行补零操作 x = self.padding_patch_layer(x) # 滑动窗口分割输入为多个patch x = x.unfold(dimension=-1, size=self.patch_len, step=self.stride) # 重塑张量形状 x = torch.reshape(x, (x.shape[0] * x.shape[1], x.shape[2], x.shape[3])) # 输入 encoding x = self.value_embedding(x) + self.position_embedding(x) return self.dropout(x), n_vars
train
函数,valid
和test
函数都差不多,只是有些操作不需要删减了而已。def train(self, setting): # 取得训练、验证、测试数据及数据加载器 train_data, train_loader = self._get_data(flag='train') vali_data, vali_loader = self._get_data(flag='val') test_data, test_loader = self._get_data(flag='test') path = os.path.join(self.args.checkpoints, setting) # 创建模型保存路径 if not os.path.exists(path): os.makedirs(path) # 获取当前时间 time_now = time.time() # 取训练步数 train_steps = len(train_loader) # 设置早停参数 early_stopping = EarlyStopping(patience=self.args.patience, verbose=True) # 选择优化器 model_optim = self._select_optimizer() # 选择损失函数 criterion = self._select_criterion() # 如果多GPU并行 if self.args.use_amp: scaler = torch.cuda.amp.GradScaler() # 训练次数 for epoch in range(self.args.train_epochs): iter_count = 0 train_loss = [] self.model.train() epoch_time = time.time() for i, (batch_x, batch_y, batch_x_mark, batch_y_mark) in enumerate(train_loader): iter_count += 1 # 梯度归零 model_optim.zero_grad() # 取训练数据 batch_x = batch_x.float().to(self.device) batch_y = batch_y.float().to(self.device) batch_x_mark = batch_x_mark.float().to(self.device) batch_y_mark = batch_y_mark.float().to(self.device) # decoder输入 dec_inp = torch.zeros_like(batch_y[:, -self.args.pred_len:, :]).float() dec_inp = torch.cat([batch_y[:, :self.args.label_len, :], dec_inp], dim=1).float().to(self.device) # encoder - decoder if self.args.use_amp: with torch.cuda.amp.autocast(): if self.args.output_attention: outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0] else: outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark) f_dim = -1 if self.args.features == 'MS' else 0 outputs = outputs[:, -self.args.pred_len:, f_dim:] batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device) loss = criterion(outputs, batch_y) train_loss.append(loss.item()) else: if self.args.output_attention: outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark)[0] else: outputs = self.model(batch_x, batch_x_mark, dec_inp, batch_y_mark) # 如果预测方式为MS,取最后1列否则取第1列 f_dim = -1 if self.args.features == 'MS' else 0 outputs = outputs[:, -self.args.pred_len:, f_dim:] batch_y = batch_y[:, -self.args.pred_len:, f_dim:].to(self.device) # 计算损失 loss = criterion(outputs, batch_y) # 将损失放入train_loss列表中 train_loss.append(loss.item()) # 记录训练过程 if (i + 1) % 100 == 0: print("\titers: {0}, epoch: {1} | loss: {2:.7f}".format(i + 1, epoch + 1, loss.item())) speed = (time.time() - time_now) / iter_count left_time = speed * ((self.args.train_epochs - epoch) * train_steps - i) print('\tspeed: {:.4f}s/iter; left time: {:.4f}s'.format(speed, left_time)) iter_count = 0 time_now = time.time() if self.args.use_amp: scaler.scale(loss).backward() scaler.step(model_optim) scaler.update() else: # 反向传播 loss.backward() # 更新梯度 model_optim.step() print("Epoch: {} cost time: {}".format(epoch + 1, time.time() - epoch_time)) train_loss = np.average(train_loss) vali_loss = self.vali(vali_data, vali_loader, criterion) test_loss = self.vali(test_data, test_loader, criterion) print("Epoch: {0}, Steps: {1} | Train Loss: {2:.7f} Vali Loss: {3:.7f} Test Loss: {4:.7f}".format( epoch + 1, train_steps, train_loss, vali_loss, test_loss)) early_stopping(vali_loss, self.model, path) if early_stopping.early_stop: print("Early stopping") break # 更新学习率 adjust_learning_rate(model_optim, epoch + 1, self.args) # 保存模型 best_model_path = path + '/' + 'checkpoint.pth' self.model.load_state_dict(torch.load(best_model_path)) return self.model
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。