赞
踩
在上一篇文章深入理解PyTorch中LSTM的输入和输出(从input输入到Linear输出)中,我详细地解释了如何利用PyTorch来搭建一个LSTM模型,本篇文章的主要目的是搭建一个LSTM模型用于时间序列预测。
系列文章:
数据集为某个地区某段时间内的电力负荷数据,除了负荷以外,还包括温度、湿度等信息。
本篇文章暂时不考虑其它变量,只考虑用历史负荷来预测未来负荷。本文中,我们根据前24个时刻的负荷下一时刻的负荷。有关多变量预测请参考:PyTorch搭建LSTM实现多变量时间序列预测(负荷预测)。
def load_data(file_name):
df = pd.read_csv('data/new_data/' + file_name, encoding='gbk')
columns = df.columns
df.fillna(df.mean(), inplace=True)
return df
class MyDataset(Dataset):
def __init__(self, data):
self.data = data
def __getitem__(self, item):
return self.data[item]
def __len__(self):
return len(self.data)
def nn_seq_us(B):
print('data processing...')
dataset = load_data()
# split
train = dataset[:int(len(dataset) * 0.6)]
val = dataset[int(len(dataset) * 0.6):int(len(dataset) * 0.8)]
test = dataset[int(len(dataset) * 0.8):len(dataset)]
m, n = np.max(train[train.columns[1]]), np.min(train[train.columns[1]])
def process(data, batch_size, shuffle):
load = data[data.columns[1]]
load = load.tolist()
data = data.values.tolist()
load = (load - n) / (m - n)
seq = []
for i in range(len(data) - 24):
train_seq = []
train_label = []
for j in range(i, i + 24):
x = [load[j]]
train_seq.append(x)
# for c in range(2, 8):
# train_seq.append(data[i + 24][c])
train_label.append(load[i + 24])
train_seq = torch.FloatTensor(train_seq)
train_label = torch.FloatTensor(train_label).view(-1)
seq.append((train_seq, train_label))
# print(seq[-1])
seq = MyDataset(seq)
seq = DataLoader(dataset=seq, batch_size=batch_size, shuffle=shuffle, num_workers=0, drop_last=True)
return seq
Dtr = process(train, B, True)
Val = process(val, B, True)
Dte = process(test, B, False)
return Dtr, Val, Dte, m, n
上面代码用了DataLoader来对原始数据进行处理,最终得到了batch_size=B的数据集Dtr、Val以及Dte,Dtr为训练集,Val为验证集,Dte为测试集。
这里采用了深入理解PyTorch中LSTM的输入和输出(从input输入到Linear输出)中的模型:
class LSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size, batch_size):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.output_size = output_size
self.num_directions = 1 # 单向LSTM
self.batch_size = batch_size
self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
self.linear = nn.Linear(self.hidden_size, self.output_size)
def forward(self, input_seq):
batch_size, seq_len = input_seq.shape[0], input_seq.shape[1]
h_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
c_0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device)
# output(batch_size, seq_len, num_directions * hidden_size)
output, _ = self.lstm(input_seq, (h_0, c_0)) # output(5, 30, 64)
pred = self.linear(output) # (5, 30, 1)
pred = pred[:, -1, :] # (5, 1)
return pred
def train(args, Dtr, Val, path):
input_size, hidden_size, num_layers = args.input_size, args.hidden_size, args.num_layers
output_size = args.output_size
if args.bidirectional:
model = BiLSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device)
else:
model = LSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device)
loss_function = nn.MSELoss().to(device)
if args.optimizer == 'adam':
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr,
weight_decay=args.weight_decay)
else:
optimizer = torch.optim.SGD(model.parameters(), lr=args.lr,
momentum=0.9, weight_decay=args.weight_decay)
scheduler = StepLR(optimizer, step_size=args.step_size, gamma=args.gamma)
# training
min_epochs = 10
best_model = None
min_val_loss = 5
for epoch in tqdm(range(args.epochs)):
train_loss = []
for (seq, label) in Dtr:
seq = seq.to(device)
label = label.to(device)
y_pred = model(seq)
loss = loss_function(y_pred, label)
train_loss.append(loss.item())
optimizer.zero_grad()
loss.backward()
optimizer.step()
scheduler.step()
# validation
val_loss = get_val_loss(args, model, Val)
if epoch > min_epochs and val_loss < min_val_loss:
min_val_loss = val_loss
best_model = copy.deepcopy(model)
print('epoch {:03d} train_loss {:.8f} val_loss {:.8f}'.format(epoch, np.mean(train_loss), val_loss))
model.train()
state = {'models': best_model.state_dict()}
torch.save(state, path)
保存训练过程中在验证集上表现最好的模型。
def test(args, Dte, path, m, n):
pred = []
y = []
print('loading models...')
input_size, hidden_size, num_layers = args.input_size, args.hidden_size, args.num_layers
output_size = args.output_size
if args.bidirectional:
model = BiLSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device)
else:
model = LSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device)
# models = LSTM(input_size, hidden_size, num_layers, output_size, batch_size=args.batch_size).to(device)
model.load_state_dict(torch.load(path)['models'])
model.eval()
print('predicting...')
for (seq, target) in tqdm(Dte):
target = list(chain.from_iterable(target.data.tolist()))
y.extend(target)
seq = seq.to(device)
with torch.no_grad():
y_pred = model(seq)
y_pred = list(chain.from_iterable(y_pred.data.tolist()))
pred.extend(y_pred)
y, pred = np.array(y), np.array(pred)
y = (m - n) * y + n
pred = (m - n) * pred + n
print('mape:', get_mape(y, pred))
# plot
x = [i for i in range(1, 151)]
x_smooth = np.linspace(np.min(x), np.max(x), 900)
y_smooth = make_interp_spline(x, y[150:300])(x_smooth)
plt.plot(x_smooth, y_smooth, c='green', marker='*', ms=1, alpha=0.75, label='true')
y_smooth = make_interp_spline(x, pred[150:300])(x_smooth)
plt.plot(x_smooth, y_smooth, c='red', marker='o', ms=1, alpha=0.75, label='pred')
plt.grid(axis='y')
plt.legend()
plt.show()
简单训练30轮,MAPE为5.77%:
暂无。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。