赞
踩
整理了基于SARIMA的时间序列预测模型python代码,免费分享给大家,记得点赞哦!
- #!/usr/bin/env python
- # coding: utf-8
-
- # # 导入环境中的相关包
- import itertools
- import numpy as np #
- import pandas as pd #
- import matplotlib.pyplot as plt
- from matplotlib.ticker import MultipleLocator
- import warnings
- from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
- from statsmodels.stats.diagnostic import acorr_ljungbox
- from statsmodels.tsa.statespace.sarimax import SARIMAX
- from sklearn.metrics import r2_score,mean_absolute_error,mean_squared_error
- from statsmodels.tsa.stattools import adfuller
- import math
- import seaborn as sns
- import statsmodels.api as sm
- import tensorflow as tf
- from pmdarima import auto_arima
- #显示中文
- #忽略警告
- warnings.filterwarnings('ignore')
- plt.rcParams['font.sans-serif']=['SimHei'] #用来正常显示中文标签
- plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots
- plt.rcParams['image.interpolation'] = 'nearest'
- plt.rcParams['image.cmap'] = 'gray'
-
-
-
- # 调用GPU加速
- gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
- for gpu in gpus:
- tf.config.experimental.set_memory_growth(gpu, True)
-
-
-
-
- df = pd.read_csv("shao - 单.csv",usecols=[1]) #读取数据
- df.head()
-
-
-
-
- plt.figure(figsize=(15, 3))
- plt.title('风速')
- plt.xlabel('时间')
- plt.ylabel('最大风速')
- plt.plot(df, 'b', label='AQI')
- plt.legend()
- plt.show()
-
-
-
- #定义稳定性检验函数
- def adf_val(ts, ts_title):
- adf, pvalue, usedlag, nobs, critical_values, icbest = adfuller(ts)
-
- name = ['adf', 'pvalue', 'usedlag',
- 'nobs', 'critical_values', 'icbest']
- values = [adf, pvalue, usedlag, nobs,
- critical_values, icbest]
- print(list(zip(name, values)))
-
- return adf, pvalue, critical_values,
- # 返回adf值、adf的p值、三种状态的检验值
-
-
- #白噪声检验也称为纯随机性检验,当数据是纯随机数据时,再对数据进行分析就没有任何意义了,所以拿到数据后最好对数据进行一个纯随机性检验。
- def acorr_val(ts):
- '''
- # 白噪声(随机性)检验
- ts: 时间序列数据,Series类型
- 返回白噪声检验的P值
- '''
- lbvalue, pvalue = acorr_ljungbox(ts, lags=1) # 白噪声检验结果
- return lbvalue, pvalue
-
-
-
- def tsplot(y, lags=None, figsize=(14, 8)):
- fig = plt.figure(figsize=figsize)
- layout = (2, 2)
- ts_ax = plt.subplot2grid(layout, (0, 0))
- hist_ax = plt.subplot2grid(layout, (0, 1))
- acf_ax = plt.subplot2grid(layout, (1, 0))
- pacf_ax = plt.subplot2grid(layout, (1, 1))
- y.plot(ax=ts_ax)
- ts_ax.set_title('A Given Training Series')
- y.plot(ax=hist_ax, kind='hist', bins=25)
- hist_ax.set_title('Histogram')
- #自相关(Autocorrelation): 对一个时间序列,现在值与其过去值的相关性。如果相关性为正,则说明现有趋势将继续保持。
- plot_acf(y, lags=lags, ax=acf_ax)
- #可以度量现在值与过去值更纯正的相关性
- plot_pacf(y, lags=lags, ax=pacf_ax)
- [ax.set_xlim(0) for ax in [acf_ax, pacf_ax]]
- sns.despine()
-
- fig.tight_layout()
- fig.show()
-
- return ts_ax, acf_ax, pacf_ax
-
-
-
- ts_data = df.astype('float32')
-
-
- #adf结果为-10.4, 小于三个level的统计值。pvalue也是接近于0 的,所以是平稳的
- adf, pvalue1, critical_values = adf_val(ts_data, 'raw time series')
- print('adf',adf)
- print('pvalue1',pvalue1)
- print('critical_values',critical_values)
- #若p值远小于0.01,认为该时间序列是平稳的
- aco=acorr_val(ts_data)
- print('aco',aco)
-
- ##自相关和偏自相关
- tsplot(ts_data, lags=20)
-
-
- train_data, test_data = df[0:int(len(df)*0.8)], df[int(len(df)*0.8):]
- #画出训练集和测试集的原数据(open 价格)
- plt.figure(dpi=100, figsize=(20,5))
- plt.title('Air Quality Index of Nanning City', size=40)
- plt.xlabel('time/day',size=30)
- plt.ylabel('AQI',size=30)
- plt.plot(train_data, 'b', label='Training Data',linewidth=3)
- plt.plot(test_data, 'g', label='Testing Data',linewidth=3)
- font = {'serif': 'Times New Roman','size': 30}
- plt.rc('font', **font)
- plt.legend()
- plt.show()
-
-
- #取划分的数据
- train_ar = train_data.values
- test_ar = test_data.values
-
-
-
- auto_arima(train_data, seasonal=True, m=12,max_p=7, max_d=2,max_q=7, max_P=4, max_D=4,max_Q=4).summary()
-
-
- def best_sarima_model(train_data,p,q,P,Q,d=1,D=1,s=12):
- best_model_aic = np.Inf
- best_model_bic = np.Inf
- best_model_hqic = np.Inf
- best_model_order = (0,0,0)
- models = []
- for p_ in p:
- for q_ in q:
- for P_ in P:
- for Q_ in Q:
- try:
- no_of_lower_metrics = 0
- model = SARIMAX(endog=train_data,order=(p_,d,q_), seasonal_order=(P_,D,Q_,s),
- enforce_invertibility=False).fit()
- models.append(model)
- if model.aic <= best_model_aic: no_of_lower_metrics+=1
- if model.bic <= best_model_bic: no_of_lower_metrics+=1
- if model.hqic <= best_model_hqic:no_of_lower_metrics+=1
- if no_of_lower_metrics >= 2:
- best_model_aic = np.round(model.aic,0)
- best_model_bic = np.round(model.bic,0)
- best_model_hqic = np.round(model.hqic,0)
- best_model_order = (p_,d,q_,P_,D,Q_,s)
- current_best_model = model
- models.append(model)
- print("Best model: SARIMA" + str(best_model_order) +
- " AIC:{} BIC:{} HQIC:{}".format(best_model_aic,best_model_bic,best_model_hqic)+
- " resid:{}".format(np.round(np.exp(current_best_model.resid).mean(),3)))
-
- except:
- pass
- print('\n')
- print(current_best_model.summary())
- return current_best_model, models
-
- best_model, models = best_sarima_model(train_data=train_ar,p=range(3),q=range(3),P=range(3),Q=range(3))
-
-
-
- p = range(0, 3)
- d = range(0, 1)
- q = range(0, 3)
- pdq = list(itertools.product(p, d, q))
- seasonal_pdq = [(x[0], x[1], x[2], 6) for x in list(itertools.product(p, d, q))]
-
- min_aic = 999999999
- for param in pdq:
- for param_seasonal in seasonal_pdq:
- try:
- mod = sm.tsa.statespace.SARIMAX(train_ar,
- order=param,
- seasonal_order=param_seasonal,
- enforce_stationarity=False,
- enforce_invertibility=False)
-
- results = mod.fit()
- print('ARIMA{}x{}12 - AIC:{}'.format(param, param_seasonal, results.aic))
-
- if results.aic < min_aic:
- min_aic = results.aic
- min_aic_model = results
-
- except:
- continue
-
-
- min_aic_model.summary()
-
-
-
- # # 构建训练数据
- history = [x for x in train_ar]
- print(type(history))
- predictions = list()
-
- #训练ARIMA模型
- for t in range(len(test_ar)):
- model = sm.tsa.SARIMAX(history,order=(2,1,1), seasonal_order=(0,0,1,12),enforce_invertibility=False)
- model_fit = model.fit()
- output = model_fit.forecast()#模型预测
- yhat = output[0]
- predictions.append(yhat)
- obs = test_ar[t]
- history.append(obs)
- print('predicted=%f, expected=%f' % (yhat, obs))
-
-
-
- testScore = math.sqrt(mean_squared_error(test_ar, predictions))
- print('RMSE %.3f ' %(testScore))
- testScore = r2_score(test_ar, predictions)
- print('R2 %.3f' %(testScore))
- testScore = mean_absolute_error(test_ar, predictions)
- print('MAE %.3f ' %(testScore))
-
-
- #只显示预测部分,不显示训练数据部分
- plt.figure(figsize=(12,7))
- plt.plot(test_data.index, predictions, color='b', marker='o', linestyle='dashed',label='Predicted')
- plt.plot(test_data.index, test_data, color='red', label='Actual')
- plt.title('SARIMA')
- plt.xlabel('time')
- plt.ylabel('AQI')
- plt.legend()
- plt.show()
-
-
-
更多时间序列预测代码:时间序列预测算法全集合--深度学习
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。