赞
踩
当我们最开始想要进行时间序列预测建模时,其实很多会意识,我们是通过历史指标预测未来指标,但是如果进一步询问这是具体如何实现的,可能很多人可能会抓耳挠腮,不知道怎么用一段连续的时间数据预测他们的未来值。因为和分类预测相比,时序预测没有直接的分类标签。如果我们想要进行监督学习,那么我们时间序列的预测标签是什么呢?我们又是如何间接得到这个预测标签的呢。
答案就是利用时间轴上的窗口分割器。当我们拥有一段连续的时序数据,我们可以利用一个滑动的窗口沿着数据的方向进行滑动从而产生连续的时间序列的数据和相应的预测标签。比如如图所示,假设我们14条数据,选择其中5条数据来预测未来的3条数据,那么我们的标签就是这2条数据。其中通过窗口分割器,我们十六条原始数据通过大小分别为为5和3的滑动窗口得到训练数据8条(14 - ( 5 + 3 - 1) = 7),所以通过滑动窗口分割,我们可以将原始数据分割成可供训练的8条数据。
这里的例子是kaggle上的一个能源数据集。
import numpy as np
import pandas as pd
import os
# load the data
root = "/kaggle/input/energy-consumption-generation-prices-and-weather"
df_energy = pd.read_csv(os.path.join(root, "energy_dataset.csv"))
df_weather = pd.read_csv(os.path.join(root, "weather_features.csv"))
这是数据集的数据:
time generation biomass generation fossil brown coal/lignite generation fossil coal-derived gas generation fossil gas generation fossil hard coal generation fossil oil generation fossil oil shale generation fossil peat generation geothermal ... generation waste generation wind offshore generation wind onshore forecast solar day ahead forecast wind offshore eday ahead forecast wind onshore day ahead total load forecast total load actual price day ahead price actual
0 2015-01-01 00:00:00+01:00 447.0 329.0 0.0 4844.0 4821.0 162.0 0.0 0.0 0.0 ... 196.0 0.0 6378.0 17.0 NaN 6436.0 26118.0 25385.0 50.10 65.41
1 2015-01-01 01:00:00+01:00 449.0 328.0 0.0 5196.0 4755.0 158.0 0.0 0.0 0.0 ... 195.0 0.0 5890.0 16.0 NaN 5856.0 24934.0 24382.0 48.10 64.92
2 2015-01-01 02:00:00+01:00 448.0 323.0 0.0 4857.0 4581.0 157.0 0.0 0.0 0.0 ... 196.0 0.0 5461.0 8.0 NaN 5454.0 23515.0 22734.0 47.33 64.48
3 2015-01-01 03:00:00+01:00 438.0 254.0 0.0 4314.0 4131.0 160.0 0.0 0.0 0.0 ... 191.0 0.0 5238.0 2.0 NaN 5151.0 22642.0 21286.0 42.27 59.32
4 2015-01-01 04:00:00+01:00 428.0 187.0 0.0 4130.0 3840.0 156.0 0.0 0.0 0.0 ... 189.0 0.0 4935.0 9.0 NaN 4861.0 21785.0 20264.0 38.41 56.04
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
35059 2018-12-31 19:00:00+01:00 297.0 0.0 0.0 7634.0 2628.0 178.0 0.0 0.0 0.0 ... 277.0 0.0 3113.0 96.0 NaN 3253.0 30619.0 30653.0 68.85 77.02
35060 2018-12-31 20:00:00+01:00 296.0 0.0 0.0 7241.0 2566.0 174.0 0.0 0.0 0.0 ... 280.0 0.0 3288.0 51.0 NaN 3353.0 29932.0 29735.0 68.40 76.16
35061 2018-12-31 21:00:00+01:00 292.0 0.0 0.0 7025.0 2422.0 168.0 0.0 0.0 0.0 ... 286.0 0.0 3503.0 36.0 NaN 3404.0 27903.0 28071.0 66.88 74.30
35062 2018-12-31 22:00:00+01:00 293.0 0.0 0.0 6562.0 2293.0 163.0 0.0 0.0 0.0 ... 287.0 0.0 3586.0 29.0 NaN 3273.0 25450.0 25801.0 63.93 69.89
35063 2018-12-31 23:00:00+01:00 290.0 0.0 0.0 6926.0 2166.0 163.0 0.0 0.0 0.0 ... 287.0 0.0 3651.0 26.0 NaN 3117.0 24424.0 24455.0 64.27 69.88
我们首先使用之前所提到的方法查看数据集是否有异常值:
def missing_values_table(df): # Total missing values mis_val = df.isnull().sum() # Percentage of missing values mis_val_percent = 100 * df.isnull().sum() / len(df) formatted_percentage = mis_val_percent.apply(lambda x: "{:.3}".format(x)) # Make a table with the results mis_val_table = pd.concat([mis_val, formatted_percentage], axis=1) # Rename the columns mis_val_table_ren_columns = mis_val_table.rename( columns = {0 : 'Missing Values', 1 : '% of Total Values'}) # Sort the table by percentage of missing descending mis_val_table_ren_columns = mis_val_table_ren_columns[ mis_val_table_ren_columns.iloc[:,0] != 0] mis_values = mis_val_table_ren_columns.sort_values( '% of Total Values', ascending=False).round(1) # Print some summary information print ("Your selected dataframe has " + str(df.shape[1]) + " columns.\n" "There are " + str(mis_values.shape[0]) + " columns that have missing values.") # Return the dataframe with missing information return mis_values, mis_val_table_ren_columns mis_val, energy_missing= missing_values_table(df_energy) energy_missing
结果如下
Your selected dataframe has 29 columns. There are 23 columns that have missing values. Missing Values % of Total Values generation biomass 19 0.0542 generation fossil brown coal/lignite 18 0.0513 generation fossil coal-derived gas 18 0.0513 generation fossil gas 18 0.0513 generation fossil hard coal 18 0.0513 generation fossil oil 19 0.0542 generation fossil oil shale 18 0.0513 generation fossil peat 18 0.0513 generation geothermal 18 0.0513 generation hydro pumped storage aggregated 35064 1e+02 generation hydro pumped storage consumption 19 0.0542 generation hydro run-of-river and poundage 19 0.0542 generation hydro water reservoir 18 0.0513 generation marine 19 0.0542 generation nuclear 17 0.0485 generation other 18 0.0513 generation other renewable 18 0.0513 generation solar 18 0.0513 generation waste 19 0.0542 generation wind offshore 18 0.0513 generation wind onshore 18 0.0513 forecast wind offshore eday ahead 35064 1e+02 total load actual 36 0.103
所以我们需要移除异常值,一个是移除所有列,另一个是移除特定值。
# handling missing values
df = df_energy.drop(['generation hydro pumped storage aggregated', 'forecast wind offshore eday ahead'], axis = 1)
df = df.dropna(subset = ['total load actual',
'generation waste', 'generation marine',
'generation hydro pumped storage consumption', 'generation biomass',
'generation fossil oil', 'generation hydro run-of-river and poundage',
'generation fossil peat', 'generation geothermal',
'generation fossil oil shale', 'generation fossil brown coal/lignite',
'generation hydro water reservoir', 'generation fossil hard coal',
'generation other', 'generation other renewable', 'generation solar',
'generation fossil gas', 'generation wind offshore',
'generation wind onshore', 'generation fossil coal-derived gas',
'generation nuclear'])
使用滑动窗口方法分割数据集:
def create_sliding_window_dataset(df, window_size, step_size): targets = df.values features = [] labels = [] train_set = [] for i in range(0, len(targets) - window_size - step_size + 1): window = targets[i:i + window_size] label = targets[i + window_size :i + window_size + step_size] features.append(window) # Convert window to NumPy array labels.append(label) # Convert label to NumPy array train_set.append((window, label)) return features, labels, train_set # Example usage window_size = 5 step_size = 3 features, labels, dataset = create_sliding_window_dataset(df['price actual'], window_size, step_size)
并且将分割好的数据集转化为Dataframe格式:
train_data = pd.DataFrame(dataset, columns=['features', 'target'] )
得到结果如下:
train_data.head()
features target
0 [65.41, 64.92, 64.48, 59.32, 56.04] [53.63, 51.73, 51.43]
1 [64.92, 64.48, 59.32, 56.04, 53.63] [51.73, 51.43, 48.98]
2 [64.48, 59.32, 56.04, 53.63, 51.73] [51.43, 48.98, 54.2]
3 [59.32, 56.04, 53.63, 51.73, 51.43] [48.98, 54.2, 58.94]
4 [56.04, 53.63, 51.73, 51.43, 48.98] [54.2, 58.94, 59.86]
这时候我们使用xgboost建模。
X = np.array(train_data['features'].tolist())
y = np.array(train_data['target'].tolist())
import xgboost as xgb from sklearn.ensemble import BaggingRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error # Split the data into training and testing sets X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Convert the data to DMatrix format dtrain = xgb.DMatrix(X_train, label=y_train) dtest = xgb.DMatrix(X_test, label=y_test) # Set up the parameters for XGBoost params = { 'objective': 'reg:squarederror', 'eval_metric': 'rmse', # Use the appropriate metric for your problem 'eta': 0.1, # Learning rate (controls the step size during training) 'max_depth': 3, # Maximum depth of a tree 'min_child_weight': 1, # Minimum sum of instance weight (hessian) needed in a child 'subsample': 0.8, # Fraction of observations to be randomly sampled for each tree 'colsample_bytree': 0.8, # Fraction of features to be randomly sampled for each tree 'seed': 42 # Random seed for reproducibility } # Specify the evaluation sets evals = [(dtrain, 'train'), (dtest, 'test')] eval_results = {} # Train the XGBoost model and capture evaluation results model = xgb.train(params, dtrain, num_boost_round=100, evals=evals, evals_result=eval_results, early_stopping_rounds=10)
最后查看train和test过程中的评估结果:
import matplotlib.pyplot as plt
# results = model.evals_result()
# Access training and validation metrics
train_rmse = eval_results['train']['rmse']
val_rmse = eval_results['test']['rmse']
# train_mae, val_mae = evals_result["validation_0"]["mae"], evals_result["validation_1"]["mae"]
x_values = range(0, len(train_rmse))
fig, ax = plt.subplots(figsize=(8,4))
ax.plot(x_values, train_rmse, label="Train RMSE")
ax.plot(x_values, val_rmse, label="Validation RMSE")
ax.legend()
plt.ylabel("RMSE Loss")
plt.title("XGBoost RMSE Loss")
plt.show()
我们可以看到这基本算是一个完整的通过滑动窗口获得训练模型的数据集的过程。
但我们看到的这个建模过程非常粗糙,导致模型可以完成拟合但性能非常一般。这个原因(针对本篇建模)基本有:
price actual
一条特征)举一个简单的例子,针对第四项,我们可以看到在诸多时间序列建模中使用xgboost的成功例子,但自己在建模的时候发现很多时候并不能得到很好的结果。在网上寻找答案的时候通常会说更改模型参数可以提高模型效果,但是我们没有讨论过是否选择模型本身就有问题。或者是建模时没有关注业务本身的特点。当使用 XGBoost 执行交叉验证时,它会在内部将训练数据拆分为多个折叠(在本例中为五个折叠)并迭代训练模型,在每次迭代中使用不同的折叠作为验证集。如下:
# Access the evaluation results for each round
evals_result = model.get_fscore()
# Print the results for each boosting round
for dataset, metric_values in evals_result.items():
print(f"{dataset} {params['eval_metric']} values: {metric_values}")
会得到
f0 rmse values: 369.0
f1 rmse values: 117.0
f2 rmse values: 351.0
f3 rmse values: 389.0
f4 rmse values: 871.0
这在实际业务理论中是不合适的。因为在时序预测任务中,使用传统的交叉验证(例如k折交叉验证)可能会引入数据泄漏(data leakage)问题。数据泄露是指在现在预测未来时,未来数据已经被暴露给模型。在实际应用中,我们只能在过去的时间点上训练模型,并在未来的时间点上进行预测。因此,通常不建议在时序预测中使用标准的交叉验证方法。
相反,常见的时序预测评估方法包括:
Walk-Forward Validation(滚动验证): 将数据集分割为训练和测试集,按时间顺序逐步滑动窗口。每次窗口中的数据被用于训练模型,并在窗口之后的时间点上进行预测。
Expanding Window Validation(扩展窗口验证): 类似于滚动验证,但训练集的窗口逐渐扩大,包含过去所有的观测值。
TimeSeriesSplit: TimeSeriesSplit是一种专为时序数据设计的交叉验证策略,与传统的k折交叉验证不同,它确保训练集始终包含过去的时间点,测试集包含未来的时间点。
如:
from sklearn.model_selection import TimeSeriesSplit
tscv = TimeSeriesSplit(n_splits=5) # 5折时序交叉验证
for train_index, test_index in tscv.split(X):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
# 在这里训练和评估模型
我们接下来会继续针对时间序列建模,讨论:
import dask.dataframe as dd
是导入 Dask 库中的 DataFrame 模块。Dask 是一个用于处理大规模数据并进行并行计算的库,它提供了与 Pandas 类似的 API,但能够处理比内存更大的数据集。
在上述代码中,dd.from_pandas(df, npartitions=10)
将一个 Pandas DataFrame
转换为 Dask DataFrame
。Dask DataFrame
由多个小的 Pandas DataFrame
组成,每个小的 DataFrame
称为一个分区(partition)。npartitions
参数指定将原始数据划分为多少个分区,以便进行并行处理。
dd.merge(df_dask, dataset_dask, on='target', how='left')
使用 Dask 的 merge 方法执行合并操作。on=‘target’ 指定合并键为 ‘target’ 列,how=‘left’ 表示左连接。
最后,d_slice_dask.compute()
将 Dask DataFrame
转换回 Pandas DataFrame
,将计算结果放入内存中。这一步是为了获取最终的合并结果。
总体而言,Dask 允许以一种更灵活的方式处理大规模数据,利用分布式和并行计算的优势。如果你的数据集较大,而且你想要进行高效的并行计算,Dask 是一个强大的选择。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。