赞
踩
使用lstm做时间序列,输入为每个月份的乘客数量,用过去N个月份的数量,预测未来M个月份的数量。
思路:用1-12个月的数据预测第13个月,然后将第13个月作为输入,用2-13个月的数据预测第14个月的数据。使用pytorch完成,如果有需要,可以联系:https://docs.qq.com/doc/DWEtRempVZ1NSZHdQ
import pandas as pd
import matplotlib.pyplot as plt
import torch.nn as nn
import torch
import time
import numpy as np
import random
导入数据,只使用第1列的数据
path = "AirPassengers.csv"
data = pd.read_csv(path)
data = data.iloc[:, 1]
data.plot()
plt.show()
取前120个时间步作为训练集,后面的作为测试集
train_data = data[:120].values
test_data = data[120:].values
将训练集的数据归一化到0-1,并且保存min和max的值,在后面反归一化使用。
min_data = np.min(train_data)
max_data = np.max(train_data)
train_data_scaler = (train_data - min_data) / (max_data - min_data)
[1,2,3,4,5,6,7]假设使用前3个值预测第4个值,则生成的序列为:
x | y |
---|---|
1,2,3 | 4 |
2,3,4 | 5 |
3,4,5 | 6 |
4,5,6 | 7 |
def get_x_y(data, step=12):
x_y = []
for i in range(len(data) - step):
x_y.append([list(data[i: i + step]), [data[i + step]]])
return x_y
分别返回x和y,x的维度为:[Batch_size,时间步,每个时间步的变量长度1]
def get_mini_batch(data, batch_size):
for i in range(0, len(data) - batch_size, batch_size):
samples = data[i:i + batch_size]
x, y = [], []
for sample in samples:
x.append(sample[0])
y.append(sample[1])
yield np.expand_dims(np.asarray(x), axis=2), np.expand_dims(np.asarray(y), axis=2)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class LSTM(nn.Module): # 注意Module首字母需要大写
def __init__(self, hidden_size, num_layers, output_size, batch_size):
super().__init__()
self.hidden_size = hidden_size # 隐含层神经元数目 100
self.num_layers = num_layers # 层数 通常设置为2
self.output_size = output_size # 48 一次预测下48个时间步
self.num_directions = 1
self.input_size = 1
self.batch_size = batch_size
# 初始化隐藏层数据
self.hidden_cell = (
torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device),
torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device))
self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True).to(device)
self.fc = nn.Linear(self.hidden_size, self.output_size).to(device)
self.relu = nn.ReLU().to(device)
def forward(self, input):
output, _ = self.lstm(torch.FloatTensor(input).to(device), self.hidden_cell)
pred = self.fc(output)
pred1 = pred[:,-1,:]
#pred1 = self.relu(pred)[:, -1, :]
return pred1
model = LSTM(hidden_size, num_layers, output_size, batch_size).to(device)
loss_function = nn.MSELoss(reduce=True, size_average=True).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 建立优化器实例
print(model)
epochs = 200
for i in range(epochs):
start = time.time()
for seq_batch, label_batch in get_mini_batch(train_x_y, batch_size):
optimizer.zero_grad()
y_pred = model(seq_batch)
loss = loss_function(y_pred, torch.FloatTensor(label_batch[:, :, 0]).to(device))
loss.backward() # 调用loss.backward()自动生成梯度,
optimizer.step() # 使用optimizer.step()执行优化器,把梯度传播回每个网络
# 查看模型训练的结果
print(f'epoch:{i:3} loss:{loss.item():10.8f} time:{time.time() - start:6}')
model.eval()
with torch.no_grad():
model.hidden_cell = (torch.zeros(1 * num_layers, 1, hidden_size).to(device),
torch.zeros(1 * num_layers, 1, hidden_size).to(device))
# 测试集
total_test_loss = 0
test_pred = []
for i in range(len(test_data)):
x = train_data_scaler[-time_step:]
print(x)
x1 = np.expand_dims(np.expand_dims(x, 1), 0)
test_y_pred_scalar = model(x1).cpu().squeeze().item() # 预测的值0-1
train_data_scaler = np.append(train_data_scaler,test_y_pred_scalar)
y = test_y_pred_scalar * (max_data - min_data) + min_data
test_pred.append(y)
print(test_data)
print(test_pred)
plt.plot(list(range(len(test_pred))),test_data,'ro-' )
plt.plot(list(range(len(test_pred))),test_pred,'bo-' )
plt.legend(["true","pred"])
plt.show()
完整代码和数据集,关注公众号:AI学习部 ,发送“时间序列”关键词获取。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。