赞
踩
时间序列单变量预测
- import numpy as np
- import tensorflow as tf
- gpus = tf.config.list_physical_devices("GPU")
-
- if gpus:
- tf.config.experimental.set_memory_growth(gpus[0], True) #设置GPU显存用量按需使用
- tf.config.set_visible_devices([gpus[0]],"GPU")
-
- from sklearn.preprocessing import MinMaxScaler
- from sklearn.metrics import mean_squared_error
- from math import sqrt
- import matplotlib.pyplot as plt
- import pandas as pd
-
- # 支持中文
- plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
- plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
-
- # 确保结果尽可能重现
- from numpy.random import seed
- seed(1)
- tf.random.set_seed(1)
-
- #导入数据
- oil=pd.read_csv('D:/April/postgraduate/研1/课程/时间序列分析/期末论文/china_oil.csv')
- data=oil
- oil
-
-
- from tensorflow.keras.regularizers import l2
-
- # 数据归一化
- scaler = MinMaxScaler(feature_range=(-1, 1))
- data_normalized = scaler.fit_transform(data.values.reshape(-1, 1)).reshape(-1)
-
- # 创建时间窗口
- def create_sequence_data(data, time_steps):
- sequences = []
- labels = []
- for i in range(len(data) - time_steps):
- seq = data[i:i+time_steps]
- label = data[i+time_steps:i+time_steps+1]
- sequences.append(seq)
- labels.append(label)
- return np.array(sequences), np.array(labels)
-
- time_steps = 3 # 可以根据需要调整时间窗口大小
- X, y = create_sequence_data(data_normalized, time_steps)
-
- # 划分训练集和验证集
- val_size = 100
- X_train, X_val = X[:-val_size], X[-val_size:]
- y_train, y_val = y[:-val_size], y[-val_size:]
-
-
- # 构建Bi-LSTM模型
- model = tf.keras.Sequential([
- tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(50, activation='relu', kernel_regularizer=l2(0.01)), input_shape=(X_train.shape[1], 1)),
-
- tf.keras.layers.Dense(1)
- ])
-
- model.compile(optimizer='adam', loss='mse')
-
- # 训练模型
- history = model.fit(X_train, y_train,
- epochs=20,
- batch_size=32,
- validation_data=(X_val, y_val),
- verbose=2)
-
- # 进行预测
- predicted = model.predict(X_val)
-
- # 反归一化
- predicted = scaler.inverse_transform(predicted)
- y_val_original = scaler.inverse_transform(y_val)
- # 绘制预测结果
- plt.plot(predicted, label='Predicted')
- plt.plot(y_val_original, label='True')
- plt.title('price - Predicted vs True')
- plt.legend()
- plt.show()
- # 获取训练过程中的损失值
- train_loss = history.history['loss']
- val_loss = history.history['val_loss']
-
- # 绘制损失曲线
- plt.plot(train_loss, label='Training Loss')
- plt.plot(val_loss, label='Validation Loss')
- plt.title('price Training and Validation Loss')
- plt.xlabel('Epoch')
- plt.ylabel('Loss')
- plt.legend()
- plt.show()
- from sklearn.metrics import mean_squared_error, mean_absolute_error
- from sklearn.utils import check_array
- from sklearn.metrics import r2_score
-
- # 使用RMSE评估模型性能
- rmse = sqrt(mean_squared_error(y_val_original, predicted))
- print(f'MSE: {round(rmse,4)}')
-
- # 计算MAPE
- def mean_absolute_percentage_error(y_true, y_pred):
- y_true, y_pred = np.array(y_true), np.array(y_pred)
- return np.mean(np.abs((y_true - y_pred) / y_true)) * 100
-
- mape = mean_absolute_percentage_error(y_val_original, predicted)
- print(f'MAPE: {round(mape,4)}')
-
- # 计算R-squared (R2)
- r2 = r2_score(y_val_original, predicted)
- print(f'R2: {round(r2,4)}')
-
- # 使用RMSE评估模型性能
- rmse = sqrt(mean_squared_error(y_val_original, predicted))
- print(f'验证集RMSE: {round(rmse,4)}')
- from sklearn.metrics import mean_absolute_error
-
- # 计算MAE
- mae = mean_absolute_error(y_val_original, predicted)
- print("Mean Absolute Error: ", mae)
-
- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt
- from sklearn.preprocessing import MinMaxScaler
- from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
- from tensorflow.keras.models import Sequential
- from tensorflow.keras.layers import Conv1D, MaxPooling1D, Bidirectional, LSTM, Dense, Flatten
-
- # 假设有一个名为data的DataFrame,其中包含你的电力负荷时间序列数据
-
- # 数据归一化
-
- scaler = MinMaxScaler(feature_range=(-1, 1))
- data_normalized = scaler.fit_transform(data.values.reshape(-1, 1)).reshape(-1)
-
- # 创建时间窗口
- def create_sequence_data(data, time_steps):
- sequences = []
- labels = []
- for i in range(len(data) - time_steps):
- seq = data[i:i+time_steps]
- label = data[i+time_steps]
- sequences.append(seq)
- labels.append(label)
- return np.array(sequences), np.array(labels)
-
- time_steps =10 # 可以根据需要调整时间窗口大小
- X, y = create_sequence_data(data_normalized, time_steps)
-
- # 划分训练集和验证集
- val_size = 100 # 取后200个数据
- X_train, X_val = X[:-val_size], X[-val_size:]
- y_train, y_val = y[:-val_size], y[-val_size:]
-
- # 调整 X_train 和 X_val 的形状以匹配模型输入
- X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
- X_val = X_val.reshape(X_val.shape[0], X_val.shape[1], 1)
-
- # 构建CNN-BiLSTM模型
- model = Sequential([
- Conv1D(filters=64, kernel_size=1, activation='relu', input_shape=(X_train.shape[1], 1)),
- MaxPooling1D(pool_size=2),
- Bidirectional(LSTM(128, activation='tanh')),
- Dense(1)
- ])
-
- model.compile(optimizer='adam', loss='mse')
-
- # 训练模型
- history = model.fit(X_train, y_train,
- epochs=50,
- batch_size=32,
- validation_data=(X_val, y_val),
- verbose=2)
-
- # 进行预测
- predicted = model.predict(X_val)
-
- # 反归一化
- predicted = scaler.inverse_transform(predicted)
- y_val_original = scaler.inverse_transform(y_val)
- # 绘制损失曲线
- plt.plot(history.history['loss'], label='Training Loss')
- plt.plot(history.history['val_loss'], label='Validation Loss')
- plt.title('Training and Validation Loss')
- plt.xlabel('Epoch')
- plt.ylabel('Loss')
- plt.legend()
- plt.show()
- # 绘制预测结果
- plt.plot(y_val_original, label='True')
- plt.plot(predicted, label='Predicted')
- plt.title('Validation Set - True vs Predicted')
- plt.xlabel('Time')
- plt.ylabel('Power Load')
- plt.legend()
- plt.show()
- # 计算模型评估指标
- rmse = np.sqrt(mean_squared_error(y_val_original, predicted))
- mape = np.mean(np.abs((y_val_original - predicted) / y_val_original))
- r2 = r2_score(y_val_original, predicted)
-
- # 打印模型评估指标
- print(f'Root Mean Squared Error (RMSE): {rmse:.4f}')
- print(f'Mean Absolute Percentage Error (MAPE): {mape:.4f}')
- print(f'R-squared (R2): {r2:.2f}')
- from sklearn.metrics import mean_absolute_error
-
- # 计算MAE
- mae = mean_absolute_error(y_val_original, predicted)
- print("Mean Absolute Error: ", mae)
- import numpy as np
- from scipy.stats import kendalltau
- from sklearn.neighbors import KernelDensity
-
- ##函数
- def calculate_kendall_tau(X, Y, bandwidth=1.0, kernel='gaussian', sample_size=len(data), random_seed=45):
- """
- 计算两个变量之间的Kendall Tau相关系数。
- 参数:
- - X: 第一个变量的数据
- - Y: 第二个变量的数据
- - bandwidth: 核密度估计的带宽
- - kernel: 核函数的类型
- - sample_size: 从估计的分布中生成样本的大小,默认为输入数据的大小
- - random_seed: 随机种子,用于生成样本
- 返回:
- - kendall_tau: Kendall Tau相关系数
- """
-
- # 将变量堆叠成一个2D数组
- data = np.column_stack((X, Y))
-
- # 拟合核密度估计模型
- kde = KernelDensity(bandwidth=bandwidth, kernel=kernel)
- kde.fit(data)
-
- # 从估计的分布中生成样本
- if sample_size is None:
- sample_size = len(data)
-
- if random_seed is not None:
- np.random.seed(random_seed)
-
- copula_samples = kde.sample(sample_size)
-
- # 计算Kendall Tau相关系数
- kendall_tau, _ = kendalltau(copula_samples[:, 0], copula_samples[:, 1])
-
- return kendall_tau
举例
- no2=data['no2']
- pm2_5=data['pm2_5']
- pm10=data['pm10']
- co=data['co']
- o3=data['o3']
- so2=data['so2']
-
- # 示例用法
- # 注意:请替换下面的 X_data 和 Y_data 为你的实际数据
-
- result = calculate_kendall_tau(no2, Y)
- print(f'no2 Kendall Tau 相关系数: {result:.2f}')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。