赞
踩
最近做了一个时序数据的异常检测和预测项目,总结了几种常见的方法,针对的数据类型主要有两种,有周期特征和无周期特征的两种数据。对于没有周期特征的数据,可以直接使用基于统计学的3sigma方法,针对有周期特征的数据可以使用SARIMA、Prophet、XGBoost方法。下面针对每种方法进行逐个讲解,讲述一下如何运用以及效果展示。
3 Sigma方法,又称为3倍标准差法,是一种基于统计学的质量控制方法。它是在正态分布假设下,通过统计数据的变异程度来判断数据是否合格的一种方法。在3 Sigma方法中,以平均值为中心,向两侧范围扩展3个标准差的距离,形成了一个带状区间。如果样本数据点落在这个带状区间内,就认为数据正常;如果落在带状区间外部,就认为数据异常。
下面是一个案例,用来检测时序数据的异常值:
import pandas as pd import matplotlib.pyplot as plt # 读取数据 df = pd.read_csv('data.csv', encoding='gb2312') # 这里我的字段名是汉字,所以用了encoding='gb2312' # 将日期转换为Pandas日期格式 df['时间'] = pd.to_datetime(df['时间']) # 设置日期为索引 df.set_index('时间', inplace=True) plt.figure(figsize=(10, 4), dpi=150) # 设置画板的大小和分辨率 columns = ['字段1', '字段2', '字段3', '字段4', '字段5'] for column in columns: # 对每个字段逐个进行异常值检测 # 计算标准差和平均值 std = df[column].std() mean = df[column].mean() # 定义异常数据的条件 condition = (df[column] > mean + 3 * std) | (df[column] < mean - 3 * std) # 筛选出异常数据的行 anomalies = df.loc[condition][column] # 绘制时序图 plt.plot(df.index, df[column], label='Normal value') # 标记异常点 plt.scatter(anomalies.index, anomalies, color='red', label='Outliers') # 添加标题和标签 plt.title('Sequence Diagram of ' + column) plt.xlabel('Date') plt.ylabel('Value') # 图像保存 plt.legend(loc='best') plt.savefig('results/' + column + '.png', bbox_inches='tight', pad_inches=0.1) plt.clf() # 清空画板
异常检测效果如下:
可以看出,3sigma对于异常值检测效果还是不错的,但是如果有周期,比如上面这张图,那个大大的黑色的点按理来说也是异常值,但是3sigma是对整体数据设置一个阈值上下限,这就没办法检测出这个异常值了,所以下面看一下能处理具有周期特征的方法。
SARIMA是一种时间序列分析模型,也称为季节性自回归滑动平均模型(Seasonal Autoregressive Integrated Moving Average model)。它是ARIMA模型的扩展,用于处理具有明显季节性周期的时间序列数据,比如月度、季度、年度数据。
SARIMA模型分为三部分,包括:季节性差分、非季节性差分和ARMA模型。其中季节性差分和非季节性差分用于使时间序列数据平稳化,ARMA模型则用于描述时间序列数据的自相关和移动平均性质。
SARIMA模型的参数比较复杂,包括p、d、q和P、D、Q、s。其中,p、d、q表示非季节性差分的阶数、自回归项的阶数和移动平均项的阶数;P、D、Q表示季节性差分的阶数、自回归项的阶数和移动平均项的阶数;s表示季节周期的长度。
SARIMA模型可以用于时间序列数据的预测和异常检测,对于需要考虑季节性周期因素的数据分析具有很好的效果。
下面讲述一下利用SARIMA进行建模的过程,先给出代码,然后进行分析:
import pandas as pd from statsmodels.tsa.seasonal import seasonal_decompose from statsmodels.tsa.stattools import adfuller import matplotlib.pyplot as plt from statsmodels.stats.diagnostic import acorr_ljungbox from statsmodels.tsa.statespace.sarimax import SARIMAX import tqdm import itertools import statsmodels.tsa.stattools as ts import numpy as np import warnings warnings.filterwarnings('ignore') # 加载数据 df = pd.read_csv('data.csv', encoding='gb2312', index_col=['时间'], parse_dates=['时间']) # 参数设置 plt.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # 定义画图的时候使其正常显示中文字体黑体 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示表示负号 columns =['字段1', '字段2', '字段3', '字段4', '字段5'] for column in columns: data = df[column] # 读取列数据并去掉空值 # 画出时序图 plt.figure(figsize=[15, 7], dpi=300) data.plot() plt.show() p_pingwen = adfuller(data)[1] # 检验平稳性,p值小于0.05说明是平稳的 p_baizaosheng = acorr_ljungbox(data, lags=1)['lb_pvalue'].values[0] # # 检验是否为白噪声序列,p值小于0.05说明是非白噪声的 # 对数据进行差分后得到 自相关图和 偏相关图 diff = False # 如果数据平稳的话就不需要进行差分了 if diff: D_data = data.diff().dropna() D_data.columns = [column] D_data.plot() # 画出差分后的时序图 plt.show() p_pingwen = adfuller(D_data)[1] p_baizaosheng = acorr_ljungbox(D_data, lags=1)['lb_pvalue'].values[0] # 季节性分解 decomposition = seasonal_decompose(data, period=24) # 以一天为周期,以小时为粒度,所以周期是24 trend = decomposition.trend seasonal = decomposition.seasonal residual = decomposition.resid plt.figure(figsize=[15, 7]) decomposition.plot() print("test: p={}".format(ts.adfuller(seasonal)[1])) plt.show() # 对模型进行定阶 def optimize_ARIMA(order_list, exog): results = [] for order in tqdm.tqdm(order_list): try: model = SARIMAX(exog, order=order).fit(disp=-1) except: continue aic = model.aic results.append([order, aic]) result_df = pd.DataFrame(results) result_df.columns = ['(p, d, q)', 'AIC'] # Sort in ascending order, lower AIC is better result_df = result_df.sort_values(by='AIC', ascending=True).reset_index(drop=True) return result_df ps = range(0, 8, 1) # 我们将尝试所有阶数 (p,q) 从 0 到 8 的组合,但保持差分阶数等于 1。 qs = range(0, 8, 1) parameters = itertools.product(ps, qs) # Create a list with all possible combination of parameters parameters_list = list(parameters) order_list = [] for each in parameters_list: each = list(each) each.insert(1, 1) # 元组中第二个1是指差分的阶数,一般都是1,差分太多了就会导致过差分 each = tuple(each) order_list.append(each) result_df = optimize_ARIMA(order_list, exog=data) # 得到的AIC最小的参数就是模型的最佳参数 # 通过网格搜索对seasonal_order进行定阶 def get_ARIMA_params(data, pdq, m=24): p = d = q = range(0, 2) seasonal_pdq = [(x[0], x[1], x[2], m) for x in list(itertools.product(p, d, q))] score_aic = 1000000.0 warnings.filterwarnings("ignore") # specify to ignore warning messages for param_seasonal in tqdm.tqdm(seasonal_pdq): mod = SARIMAX(data, order=pdq, seasonal_order=param_seasonal, enforce_stationarity=False, enforce_invertibility=False) results = mod.fit() print('x{}12 - AIC:{}'.format(param_seasonal, results.aic)) if results.aic < score_aic: score_aic = results.aic params = param_seasonal, score_aic param_seasonal, aic = params print('x{}12 - AIC:{}'.format(param_seasonal, aic)) return param_seasonal, aic order = result_df.iloc[0, :].values[0] # 这个order是ARIMA模型的参数,也就是前面result_df确定的最佳参数 result_seasonal, aic = get_ARIMA_params(data, order, m=24) # 模型拟合 pred_num = 12 best_model = SARIMAX(data[:-pred_num], order=result_df.iloc[0, :].values[0], seasonal_order=result_seasonal).fit() # 留出最后pred_num条数据不进行拟合,这是因为后面要对这pred_num条数据进行预测 # 模型拟合效果检查 print(best_model.summary().tables[1]) best_model.plot_diagnostics(figsize=(15, 12)) plt.show() # 对拟合值的阈值上下限和真实值作图,并计算RMSE值作为评估参数 predict_ts = best_model.predict(tpy='levels', start=len(data) - 500, end=len(data)-pred_num)[1:] # 由于数据太长,这里只拟合最后500条数据的时序图;留出pred_num条数据,这是因为我们后面要进行预测, myts = data[-500:-pred_num] predict_ts.index = myts.index predict_lower = best_model.get_prediction(tpy='levels', start=len(data)-500, end=len(data)-pred_num).conf_int()[1:]['lower ' + column] # 这里得到拟合值的阈值下限 predict_upper = best_model.get_prediction(tpy='levels', start=len(data)-500, end=len(data)-pred_num).conf_int()[1:]['upper ' + column] predict_lower.index = myts.index predict_upper.index = myts.index outliers = myts[(myts < predict_lower) | (myts > predict_upper)] # 找出异常值 outliers.plot(color='red', label='异常值', figsize=(12, 8), marker='o', linestyle='', linewidth=2) # predict_ts.plot(color='blue', label='拟合值', figsize=(12, 8), linestyle='-', linewidth=2) myts.plot(color='blue', label='真实值', figsize=(12, 8), linestyle='-', linewidth=2) predict_lower.plot(color='green', label='阈值下限', figsize=(12, 8), linestyle='--', linewidth=2) predict_upper.plot(color='green', label='阈值上限', figsize=(12, 8), linestyle='--', linewidth=2) plt.legend(loc='best') plt.title(column + '拟合原始数据平均误差为: %.4f' % np.sqrt(sum((predict_ts - myts) ** 2) / myts.size)) plt.savefig('model_fitting_effect/' + column + '.png', bbox_inches='tight', pad_inches=0.1) # 放在model_fitting_effect文件下 plt.clf() # 向后做pred_num个步长的预测 forecast_ts = best_model.forecast(pred_num) myts_fore = data[-pred_num:] forecast_lower = best_model.get_forecast(pred_num, alpha=0.05).conf_int()['lower '+column] forecast_upper = best_model.get_forecast(pred_num, alpha=0.05).conf_int()['upper '+column] forecast_ts.index, forecast_lower.index, forecast_upper.index = myts_fore.index, myts_fore.index, myts_fore.index # 索引对齐 outliers = myts_fore[(myts_fore < forecast_lower) | (myts_fore > forecast_upper)] outliers.plot(color='red', label='异常值', figsize=(12, 8), marker='o', linestyle='', linewidth=2) forecast_ts.plot(color='blue', label='预测值', figsize=(12, 8), linestyle='-', linewidth=2) myts_fore.plot(color='green', label='真实值', figsize=(12, 8), linestyle='-', linewidth=2) forecast_lower.plot(color='black', label='阈值下限', figsize=(12, 8), linestyle='--', linewidth=2) forecast_upper.plot(color='black', label='阈值上限', figsize=(12, 8), linestyle='--', linewidth=2) plt.legend(loc='best') plt.title(column + '预测' + str(pred_num) + '期数据平均误差为: %.4f' % np.sqrt(sum((forecast_ts - myts_fore) ** 2) / myts_fore.size)) plt.savefig('model_forecast_effect/' + column + '.png', bbox_inches='tight', pad_inches=0.1) plt.clf()
Prophet是Facebook开发的一种用于时间序列预测的开源库。它基于加法模型(additive model)和分解(decomposition)方法,可以对具有季节性、趋势性和节假日效应的时间序列数据进行准确的预测。Prophet库提供了简单而强大的API,使得时间序列的建模和预测变得非常方便。以下是安装Prophet库的一般步骤:
# 安装pystan
conda install pystan
# 安装prophet
sudo pip install fbprophet
老样子,先把代码贴出来,下面再详细讲解:
from fbprophet import Prophet import numpy as np import pandas as pd import matplotlib.pyplot as plt # 参数设置 plt.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # 定义使其正常显示中文字体黑体 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示表示负号 datas = pd.read_csv('data.csv', encoding='gb2312') columns = ['时间', '字段1', '字段2'] for idx, column in enumerate(columns): # 读取数据 datadf = pd.DataFrame(columns=['ds', 'y']) # 初始化一个列名为ds和y的dataframe变量,这是因为模型默认数据的列名都是ds和y data, datatime = datas[column], datas['时间'] datadf['ds'], datadf['y'] = pd.to_datetime(datatime), data # 读取数据 # 初始化一个模型 np.random.seed(1234) ## 设置随机数种子 pred_num = 48 confidence_interval = 0.95 model = Prophet(growth="linear", daily_seasonality=True, weekly_seasonality=False, seasonality_mode='multiplicative', interval_width=confidence_interval, # 获取95%的置信区间 ) # 训练模型 model = model.fit(datadf) # 使用数据拟合模型 [:-pred_num] # 预测未来数据和预测历史数据,prophet一步完成对历史数据和未来数据的预测 future = model.make_future_dataframe(periods=pred_num, freq='H') # periods 表示需要预测的步数,freq 表示时间序列的频率。 forecast = model.predict(future) # 使用模型对数据进行预测,interval_width默认为0.8,即80%的置信区间 forecast["y"] = datadf["y"].reset_index(drop=True) # 根据模型预测值的置信区间"yhat_lower"和"yhat_upper"判断样本是否为异常值 def outlier_detection(forecast): index = np.where((forecast["y"] <= forecast["yhat_lower"]) | (forecast["y"] >= forecast["yhat_upper"]), True, False) return index outlier_index = outlier_detection(forecast) outlier_df = datadf[outlier_index] # 可视化异常值的结果 fig, ax = plt.subplots() forecast[-pred_num:].plot(x="ds", y="yhat", style="b--", figsize=(18, 8), label="预测值", ax=ax, linewidth=2) forecast[:-pred_num].plot(x="ds", y="yhat", style="b-", figsize=(18, 8), label="拟合值", ax=ax, linewidth=2) ax.fill_between(forecast["ds"].values, forecast["yhat_lower"], forecast["yhat_upper"], color='green', alpha=.2, label=str(int(confidence_interval*100))+"%置信区间") ## 可视化出置信区间,alpha=.2表示透明度为0.2 forecast.plot(kind="scatter", x="ds", y="y", c="k", s=20, label="原始数据", ax=ax) # 原始数据散点图表示 outlier_df.plot(x="ds", y="y", style="ro", ax=ax, label="异常值") ## 可视化出异常值的点,"ro"表示红色圆点,‘rs’表示红色方块 plt.legend(loc='best') plt.grid() rmse = np.sqrt(sum(((forecast["y"][-pred_num:] - forecast["yhat"][-pred_num:]).dropna()) ** 2) / pred_num) plt.title(column + '_模型拟合效果图_异常值检测效果图_预测未来' + str(int(pred_num)) + '小时数据效果图_预测均方根误差为: %.4f' % rmse) plt.savefig('Prophet/Hour/model_effect/' + column + '.png', bbox_inches='tight', pad_inches=0.1, dpi=300) plt.clf()
XGBoost(eXtreme Gradient Boosting)是一种基于梯度提升树的机器学习算法,它通过迭代训练多个决策树模型来逐步提升整体模型的性能。具有梯度提升的优势,XGBoost在处理分类和回归问题时表现出色,并且具备高效性、灵活性和可解释性等特点,被广泛应用于数据挖掘、预测建模和特征选择等领域。
下面贴出代码:
import xgboost as xgb import pandas as pd import numpy as np import matplotlib.pyplot as plt # 导入数据 df = pd.read_csv('data.csv', encoding='gb2312', index_col=['时间'], parse_dates=['时间']) plt.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # 定义使其正常显示中文字体黑体 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示表示负号 # 按时间顺序排序 df.sort_index(inplace=True) # 将时间序列按时间窗口切割成多个样本 def create_dataset(dataset, look_back=1): dataX, dataY = [], [] for i in range(len(dataset)-look_back-1): a = dataset[i:(i+look_back)] dataX.append(a) dataY.append(dataset[i + look_back]) return np.array(dataX), np.array(dataY) columns = ['时间', '字段1', '字段2'] for column in columns: # 数据预处理 dataset = df[column] # 如果有空值需要先进行填充或者删除 train_size = int(len(dataset) * 0.7) # 划分训练集和测试集 test_size = len(dataset) - train_size train, test = dataset[0:train_size], dataset[train_size:len(dataset)] # 创建数据集 look_back = 3 trainX, trainY = create_dataset(train, look_back) testX, testY = create_dataset(test, look_back) # 定义模型 model = xgb.XGBRegressor() # 拟合模型并预测 model.fit(trainX, trainY) trainPredict = model.predict(trainX) # 拟合值 testPredict = model.predict(testX) # 预测值 # 将得到的数据存入dataframe中 true_value = pd.DataFrame(columns=['x', 'y']) true_value_x = np.concatenate((train[look_back:-1].index, test[look_back:-1].index), axis=-1) true_value_y = np.concatenate((trainY, testY), axis=-1) true_value['x'] = true_value_x true_value['y'] = true_value_y fitting_value = pd.DataFrame(columns=['x', 'y']) fitting_value['x'] = train[look_back:-1].index fitting_value['y'] = trainPredict pred_value = pd.DataFrame(columns=['x', 'y']) pred_value['x'] = test[look_back:-1].index pred_value['y'] = testPredict a = pred_value['y'] b = true_value['y'][-len(testX):] b.index = a.index c = np.sqrt(sum(((a - b).dropna()) ** 2) / len(testX)) print(column + ' 小时数据效果图_预测均方根误差为: %.4f' % c)
上面就是利用XGBoost的建模过程了,只是输出了模型的评估效果,具体的效果展示与前面介绍的方法一模一样,把前面随便哪一个方法的代码搞懂,XGBoost效果展示也很容易实现了。
从最后输出的评估结果来看,XGBoost取得了最好的预测效果,不得不说,XGBoost是真的牛。
以上就是全部内容了,如有问题,欢迎评论区一起交流。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。