赞
踩
参考知乎文章:时间序列数据分析101,作者:厉建扬
除此之外还添加了分类、聚类的评估方法汇总+python实现。
时间值是在哪个过程产生的,以及何时产生的
如果我们在处理历史遗留数据,并没有清洗记录的文档说明,也无法找到处理数据流的人来确认时间戳产生的方式。这时需要我们做一些经验上的调查和推断,例如:
1)通过比较不同类别特征(如不同用户)的数据来理解某些时间模式(pattern)是否是共同的
2)使用聚合数据分析来理解时间特征,如该时间戳是本地时间时区还是标准时间时区,该时间反应的是用户行为还是一些外部限制(网络通信)
最常见的处理方法包括填补和删除两种,绝大多数情况都选择填补。
import pandas as pd import numpy as np import matplotlib.pyplot as plt %matplotlib inline pd.set_option('max_row',1000) # 导入美国年度失业率数据 unemploy = pd.read_csv('data\\unemployment.csv') unemploy.head() # 构建一列随机缺失值列 unemploy['missing'] = unemploy['rate'] # 随机选择10%行手动填充缺失值 mis_index = unemploy.sample(frac=0.1,random_state=999).index unemploy.loc[mis_index,'missing']=None # forward fill unemploy['f_fill'] = unemploy['missing'] unemploy['f_fill'].ffill(inplace=True)
unemploy['moveavg']=np.where(unemploy['missing'].isnull(),
unemploy['missing'].shift(1).rolling(3,min_periods=1).mean(),
unemploy['missing'])
# 线性插值,多项式插值和三次样条插值
unemploy['inter_lin']=unemploy['missing'].interpolate(method='linear')
unemploy['inter_poly']=unemploy['missing'].interpolate(method='polynomial', order=3)
unemploy['inter_poly']=unemploy['missing'].interpolate(method='spline', order=3)
来自不同数据源的时间轴常常无法一一对应,此时就要用到改变时间频率的方法进行数据清洗。由于无法改变实际测量数据的频率,我们能做的是改变数据收集的频率,即下采样和上采样。
下采样
下采样指的是减少数据收集的频率,也就是从原始数据中抽取子集的方式。
使用场景:
数据原始的分辨率不合理:例如有一个记录室外温度的数据,时间频率是每秒钟一次。我们都知道,气温不会在秒钟这个级别有明显的变化,而且秒级的气温数据的测量误差甚至会比数据本身的波动还要大,因此这个数据集有着大量的冗余。在这个案例中,每隔n个元素取一次数据可能更合理一些。
关注某个特定季节的信息:如果担心某些数据存在季节性的波动,我们可以只选择某一个季节(或月份)进行分析,例如只选择每年一月份的数据进行分析。
进行数据匹配:例如你有两个时间序列数据集,一个更低频(年度数据),一个更高频(月度数据),为了将两个数据集匹配进行下一步的分析,可以对高频数据进行合并操作,如计算年度均值或中位数,从而获得相同时间轴的数据集。
上采样
上采样在某种程度上是凭空获得更高频率数据的方式,使用上采样,只是让我们获得了更多的数据标签,而没有增加额外的信息。
使用场景:
不规律的时间序列:用于处理多表关联中存在不规则时间轴的问题。
例如现在有两个数据,一个记录了捐赠的时间和数量,另一个数据记录了公共活动的时间和代号,这时我们需要合并这两个表的数据,为每次捐赠打上标签,记录每次捐赠之前最近发生的一次公共活动,这种操作叫做rolling join。
进行数据匹配:类似下采样的场景,例如我们有一个月度的失业率数据,为了和其他数据匹配需要转换成日度的数据,如果我们假定新工作一般都是从每个月第一天开始的,那么可以推演认为这个月每天的失业率都等于该月的失业率。
通过以上的案例我们发现,即使是在十分干净的数据集中,由于需要比较来自不同维度的具有不同尺度的数据,也经常需要使用到上采样和下采样的方法。
为了能讲述一个更能被理解的故事,在数据分析前常常会进行平滑处理。数据平滑通常是为了消除一些极端值或测量误差。即使有些极端值本身是真实的,但是并没有反应出潜在的数据模式,我们也会把它平滑掉。
(加权)移动平均法
最简单的平滑技术,即可以给予数据点相同的权重,也可以给越邻近的数据点更高的权重。
简单指数平滑法
给越邻近的数据点更高的权重,与前者区别在于衰减的方式不同,指数平滑法顾名思义,从最邻近到最早的数据点的权重呈现指数型下降的规律
t时刻的指数平滑后的值:
S t = α ∗ x t + ( 1 − α ) ∗ S t − 1 S_t=\alpha*x_t+(1-\alpha)*S_{t-1} St=α∗xt+(1−α)∗St−1,平滑系数α越大则越近邻的数据影响越大
Holt指数平滑法
通过引入一个额外的系数,解决了指数平滑无法应用于具有趋势特点数据的不足,但但是依然无法解决具有季节性变化数据的平滑问题。
Holt-Winters指数平滑法(三参数:α水平平滑系数,β趋势平滑系数,γ季节性平滑系数)
再次引入一个新系数的方式同时解决了Holt Exponential Smoothing无法解决具有季节性变化数据的不足。简单来说,它是在指数平滑只有一个平滑系数的基础上,额外引入了趋势系数和季节系数来实现的。这种技术在时间序列的预测上(例如未来销售数据预测)有着很广泛的应用。
python实现:https://blog.csdn.net/u010665216/article/details/78051192?locationNum=11&fps=1
# 简单指数平滑
air['smooth_0.5']= air.Passengers.ewm(alpha =0.5).mean()
air['smooth_0.9']= air.Passengers.ewm(alpha =0.9).mean()
常规方法: 直方图观察频率分布情况 (series.hist())、 散点图观察不同列的相关关系 、 折线图观察趋势变化等
对于完成数据清洗的时间序列数据, 我们要问的第一个问题是它是否反映一个平稳的系统。平稳性的评估很重要,因为这让我们知道在多大程度上历史数据可以用来预测未来数据。确认了平稳性后,我们要接着探索时间序列是否存在一些内部动态关系(例如季节性变化),也叫自相关性,这能解释未来数据和历史数据之间有多紧密的关联性。最后,我们需要确保我们找到的关联是真正的因果关系,而不是虚假的相关性。
许多时间序列的统计学模型都是依赖于时间序列是平稳的这一前提条件,通常来说,一个平稳的时间序列指的是这个时间序列在一段时间内具有稳定的统计值,如均值,方差。由于我们对于一个数据是否平稳是有自己的直觉的,所以在实践的过程中要谨防过于依赖直觉而被直觉所欺骗。
为此我们引入了一些统计上的假设检验来测试一个时间序列数据的平稳性。
ADF Test
ADF Test是单位根检验(unit root test)的一种。单位根是一个使得时间序列非平稳的一个特征,从技术上说,在下面的公式中如果alpha=1,那么我们说存在单位根。
Y t = α Y t − 1 + β X e + ϵ Y_t=\alpha Y_{t-1}+\beta X_e+\epsilon Yt=αYt−1+βXe+ϵ,其中 Y t , Y t − 1 Y_t,Y_{t-1} Yt,Yt−1表示t和t-1时刻的时间序列值, X e X_e Xe表示外生变量, ϵ \epsilon ϵ表示误差项。
从直觉上我们可以理解为,只有当alpha<1时,整个时间序列的趋势才是有可能出现逆转的。而ADF test就是对alpha值的假设检验,它的原假设是alpha =1,即原假设成立,则时间序列非平稳。
from statsmodels.tsa.stattools import adfuller
# 使用ADF Test
result = adfuller(series, autolag='AIC')
print(f'ADF Statistic: {result[0]}')
print(f'n_lags: {result[1]}')
print(f'p-value: {result[1]}')
# ADF Statistic: 3.1451856893067363
# n_lags: 1.0
# p-value: 1.0
p值为1表示不拒绝原假设,也就是存在单位根,时间序列非平稳。
不足之处:
KPSS Test
和ADF的区别是KPSS的原假设是关于平稳过程,而ADF的原假设是关于单位根。
result1 = adfuller(series2, autolag='AIC')
print(f'ADF Statistic: {result1[0]}')
print(f'p-value: {result1[1]}')
# ADF Statistic: -9.588680806555054
# p-value: 2.0639843020333296e-16
p值远远小于0.05,表示拒绝原假设,即时间序列平稳。
时间序列平稳性的重要性在于:
自相关是时间序列的一个重要概念,指的是时间序列中某一个时刻的值和另一个时刻的值具有一定的相关性。举例来说,有一个每天更新的气温数据,你可能会发现每年5月15号和8月15号的气温存在某种关联,当年5月15号更热,则对应的8月15号也更热。当然还有一种可能性是这种关联性趋近于0, 知道5月15号的气温并不会给你任何关于8月15号气温的信息。
The autocorrelation function
自相关(acf)函数描述的是一组时间序列和它前面间隔n个时刻的一组时间序列之前的相关性。
重要性质:
The partial autocorrelation function
偏自相关(pacf)函数描述的是一组时间序列和它前面间隔n个时刻的一组时间序列之前的偏相关性。
偏相关性可以从本质上理解为去除了样本之间的干涉,也就是更早时刻的相关性影响。
重要性质:
举例来说,计算时间间隔3的pacf是去除了时间间隔小于3的样本的影响,而只计算由于时间间隔为3时的时间序列之间的相关性,因为时间间隔为1和2的样本的影响已经在前面的pacf函数中计算过了。
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
# 绘制acf函数
plot_acf(series)
plt.show()
# 绘制pacf函数
plot_pacf(series)
plt.show()
自回归模型基于的基础是可以用历史预测未来,也就是时刻t的数据能用之前时刻的函数来表示。
基本公式: y t = b 0 + b 1 ∗ y t − 1 + e t y_t=b_0+b_1*y_{t-1}+e_t yt=b0+b1∗yt−1+et
这个最简单的模型称为AR(1),其中括号中的1表示时间间隔为1,也就是当前时刻的数据值的计算只考虑到前一个时刻的值。从公式可以看出,它和常规的线性回归十分类似。 b 0 , b 1 b_0,b_1 b0,b1分别代表截距项和回归系数, e t e_t et表示t时刻的错误项,其中错误项均值为0,具有固定方差。
将其拓展到p哥近邻时间值,称为AR§,其公式为:
y t = ϕ 0 + ϕ 1 ∗ y t − 1 + ϕ 2 ∗ y t − 2 + . . . + ϕ p ∗ y t − p + e t y_t=\phi_0+\phi_1*y_{t-1}+\phi_2*y_{t-2}+...+\phi_p*y_{t-p}+e_t yt=ϕ0+ϕ1∗yt−1+ϕ2∗yt−2+...+ϕp∗yt−p+et, ϕ \phi ϕ表示一系列回归系数。
**注意:**在使用AR模型之前,需要检验两个前提假设:相关性和平稳性。
pd.plotting.autocorrelation_plot(sales_data['sales'])
# 从自相关性图中可以看到在lag<=12时,acf值在临界点以外,表现出极强的相关性。
# 将上图中相关性最强的lag=12单独画散点图,观察相关性
pd.plotting.lag_plot(sales_data['sales'],lag=12)
检验平稳性: 使用statsmodels中的seasonal_decompose方法进行趋势项和季节项的分解。 ADF Test和KPSS Test
# 数据集存在明显趋势项(图二单调递增)和季节项(图三周期性变化)
decomposed = seasonal_decompose(sales_data['sales'], model='additive')
x = decomposed.plot()
移动平均模型(MA)依赖的基础是每个时刻点的值是历史数据点错误项的函数,其中这些错误项是互相独立的。
MA模型和AR模型的公式很类似,只是将公式中的历史数值替换成了历史数据的错误项e,由于错误项e是互相独立的,所以在MA模型中,t时刻的数值仅仅和最近的q个数值有关,而和更早的数据之间没有自相关性,如果对MA序列绘制ACF图,它的自相关关系是突然截断的。而AR序列的ACF图常常是缓慢下降的。
公式: y t = e t + θ 1 ∗ e t − 1 + θ 2 ∗ e t − 2 + . . . + θ p ∗ e t − p + μ y_t=e_t+\theta_1*e_{t-1}+\theta_2*e_{t-2}+...+\theta_p*e_{t-p}+\mu yt=et+θ1∗et−1+θ2∗et−2+...+θp∗et−p+μ
ARMA模型公式: y t = e t + θ 1 ∗ e t − 1 + θ 2 ∗ e t − 2 + . . . + θ p ∗ e t − p + μ y_t=e_t+\theta_1*e_{t-1}+\theta_2*e_{t-2}+...+\theta_p*e_{t-p}+\mu yt=et+θ1∗et−1+θ2∗et−2+...+θp∗et−p+μ
ARMA模型就是AR和MA的简单结合,同时包含了历史数值项和错误项。由于AR模型对时间序列有平稳性要求,ARMA模型也存在这个限制,因此我们将其拓展到ARIMA模型,引入的差分概念是一种获得时间序列的方法。最常使用的一种差分方法是计算当前项和前项的差值,获得一组新的时间序列。由于ARIMA强大的功能,这使得在统计领域有着非常广泛的应用,尤其适用于机器学习和深度学习难以应用的小样本数据集上,同时也应当警惕过拟合的情况发生。
函数 | AR§ | MA(q) | ARMA or ARIMA |
---|---|---|---|
ACF曲线 | 缓缓下降 | 在间隔=q后突然下降 | 没有明显截止点 |
PACF曲线 | 在间隔=p后突然下降 | 缓缓下降 | 没有明显截止点 |
在实际分析过程中,我们也可以一律使用ARIMA模型,因为AR,MA,ARMA都是它的一种特殊情况。
ARIMA有三个参数p、d、q,写作ARIMA(p,d,q),其中p代表AR§,自回归阶数,d代表Integrated (d),差分阶数,q代表MA(q),移动平均阶数。我们应当确保这三个参数尽可能的小避免过拟合,一个可供参数的准则是,不要让d超过2,p和q超过5,并且p和q尽量保证一个是模型主导项,另一个相对较小。
特殊例子:
多步迭代的过程,分为以下几步:
在此不过多延展,应用时使用自动拟合法即可。
# step2,检查平稳性 adf_test = ADFTest() adf_test.should_diff(sales_data) #结果表明不平稳,提示我们需要引入差分项 # (0.01, False) # step3,划分训练集和测试集 train = sales_data[:60] test = sales_data[60:] # step4,拟合模型 arima_model = auto_arima(train, start_p=0, d=1,start_q=0, max_p=5,max_d=5,max_q=5, start_P=0, D=1, start_Q=0, max_P=5, max_D=5, max_Q=5, m=12, seasonal=True,trace=True,n_fits=50) Performing stepwise search to minimize aic ARIMA(0,1,0)(0,1,0)[12] : AIC=981.377, Time=0.02 sec ARIMA(1,1,0)(1,1,0)[12] : AIC=982.734, Time=0.11 sec ARIMA(0,1,1)(0,1,1)[12] : AIC=982.307, Time=0.14 sec ARIMA(0,1,0)(1,1,0)[12] : AIC=983.595, Time=0.13 sec ARIMA(0,1,0)(0,1,1)[12] : AIC=1005.088, Time=0.06 sec ARIMA(0,1,0)(1,1,1)[12] : AIC=1007.898, Time=0.31 sec ARIMA(1,1,0)(0,1,0)[12] : AIC=981.328, Time=0.02 sec ARIMA(1,1,0)(0,1,1)[12] : AIC=982.703, Time=0.13 sec ARIMA(1,1,0)(1,1,1)[12] : AIC=984.307, Time=0.68 sec ARIMA(2,1,0)(0,1,0)[12] : AIC=987.645, Time=0.04 sec ARIMA(1,1,1)(0,1,0)[12] : AIC=982.083, Time=0.19 sec ARIMA(0,1,1)(0,1,0)[12] : AIC=980.880, Time=0.06 sec ARIMA(0,1,1)(1,1,0)[12] : AIC=982.336, Time=0.10 sec ARIMA(0,1,1)(1,1,1)[12] : AIC=983.925, Time=0.46 sec ARIMA(0,1,2)(0,1,0)[12] : AIC=982.207, Time=0.08 sec ARIMA(1,1,2)(0,1,0)[12] : AIC=983.139, Time=0.17 sec ARIMA(0,1,1)(0,1,0)[12] intercept : AIC=982.868, Time=0.08 sec Best model: ARIMA(0,1,1)(0,1,0)[12] Total fit time: 2.802 seconds # 查看模型结果 arima_model.summary() # step5,预测时间序列,并和真实值比较 pred = pd.DataFrame(arima_model.predict(n_periods=12),columns=['predicted'],index=test.index) plt.plot(train) plt.plot(test,label='true') plt.plot(pred,label='predict') plt.legend()
以上我们讨论的都是单变量问题,现实世界往往更为复杂,这时我们将把AR模型从单变量拓展到多变量,多变量模型的特点是存在几组并行的时间序列,并且这些时间之间互相影响。
考虑一个三组并行时间序列数据的场景,在二阶的条件下,公式如下:
y = ϕ 0 + ϕ 1 ∗ y t − 1 + ϕ 2 ∗ y t − 2 y=\phi_0+\phi_1*y_{t-1}+\phi_2*y_{t-2} y=ϕ0+ϕ1∗yt−1+ϕ2∗yt−2,y和 ϕ 0 \phi_0 ϕ0是3 * 1的向量, ϕ 1 , ϕ 2 \phi_1,\phi_2 ϕ1,ϕ2是 3 * 3的矩阵
从公式中也能看到随着阶数上升,VAR模型的变量增加很快,因此在使用时只有当期待存在不同时间序列互相影响的关系时才尝试这种方法。VAR在某些场景下十分有用:
import matplotlib.pyplot as plt import pandas as pd import numpy as np from statsmodels.tsa.stattools import adfuller from statsmodels.tsa.api import VAR from statsmodels.tsa.stattools import grangercausalitytests from statsmodels.tsa.vector_ar.vecm import coint_johansen from statsmodels.stats.stattools import durbin_watson %matplotlib inline # step1:导入数据,这是一个有八个指标的经济数据 df = pd.read_csv('data/Raotbl6.csv', parse_dates=['date'], index_col='date') df.head() # 画出八个时间序列的图像 fig, axes = plt.subplots(nrows=4, ncols=2,figsize=(10,6)) for i, ax in enumerate(axes.flatten()): data = df[df.columns[i]] ax.plot(data, color='red', linewidth=1) ax.set_title(df.columns[i]) plt.tight_layout() # step2:Granger’s Causality Test , 检验不同序列之间存在互相影响 maxlag=12 test='ssr_chi2test' variables=df.columns def grangers_causation_matrix(data, variables, test='ssr_chi2test', verbose=False): df = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables) for c in df.columns: for r in df.index: test_result = grangercausalitytests(data[[r, c]], maxlag=maxlag, verbose=False) p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)] min_p_value = np.min(p_values) df.loc[r, c] = min_p_value df.columns = [var + '_x' for var in variables] df.index = [var + '_y' for var in variables] return df grangers_causation_matrix(df, variables = df.columns) # step3:ADF测试,检验单个变量是否平稳 def adfuller_test(series, signif=0.05, name='', verbose=False): r = adfuller(series, autolag='AIC') output = {'test_statistic':round(r[0], 4), 'pvalue':round(r[1], 4), 'n_lags':round(r[2], 4), 'n_obs':r[3]} p_value = output['pvalue'] def adjust(val, length= 6): return str(val).ljust(length) # Print Summary print(f' Augmented Dickey-Fuller Test on "{name}"', "\n ", '-'*47) print(f' Null Hypothesis: Data has unit root. Non-Stationary.') print(f' Significance Level = {signif}') print(f' Test Statistic = {output["test_statistic"]}') print(f' No. Lags Chosen = {output["n_lags"]}') for key,val in r[4].items(): print(f' Critical value {adjust(key)} = {round(val, 3)}') if p_value <= signif: print(f" => P-Value = {p_value}. Rejecting Null Hypothesis.") print(f" => Series is Stationary.") else: print(f" => P-Value = {p_value}. Weak evidence to reject the Null Hypothesis.") print(f" => Series is Non-Stationary.") for name, column in df.iteritems(): adfuller_test(column, name=column.name) print('\n') # step4: 协整检验,检验多变量平稳性 def cointegration_test(df, alpha=0.05): out = coint_johansen(df,-1,5) d = {'0.90':0, '0.95':1, '0.99':2} traces = out.lr1 cvts = out.cvt[:, d[str(1-alpha)]] def adjust(val, length= 6): return str(val).ljust(length) # Summary print('Name :: Test Stat > C(95%) => Signif \n', '--'*20) for col, trace, cvt in zip(df.columns, traces, cvts): print(adjust(col), ':: ', adjust(round(trace,2), 9), ">", adjust(cvt, 8), ' => ' , trace > cvt) cointegration_test(df) # step7:选择模型阶数并训练,根据AIC值,lag=4时达到局部最优 model = VAR(df_differenced) for i in [1,2,3,4,5,6,7,8,9]: result = model.fit(i) print('Lag Order =', i) print('AIC : ', result.aic, '\n') # 选择lag=4拟合模型 model_fitted = model.fit(4) model_fitted.summary() # step8:durbin watson test,检验残差项中是否还存在相关性,这一步的目的是确保模型已经解释了数据中所有的方差和模式 out = durbin_watson(model_fitted.resid) for col, val in zip(df.columns, out): print(adjust(col), ':', round(val, 2)) # 检验值越接近2,说明模型越好 # step9:模型已经足够使用了,下一步进行预测 lag_order = model_fitted.k_ar forecast_input = df_differenced.values[-lag_order:] fc = model_fitted.forecast(y=forecast_input, steps=nobs) df_forecast = pd.DataFrame(fc, index=df.index[-nobs:], columns=df.columns + '_2d') df_forecast # step10:将差分后的值还原为原数据 def invert_transformation(df_train, df_forecast): df_fc = df_forecast.copy() columns = df_train.columns for col in columns: # 写一个长度为6的数列,用纸笔写出差分的计算过程,可以帮助理解下面这两行还原过程 df_fc[str(col)+'_1d'] = (df_train[col].iloc[-1]-df_train[col].iloc[-2]) + df_fc[str(col)+'_2d'].cumsum() df_fc[str(col)+'_forecast'] = df_train[col].iloc[-1] + df_fc[str(col)+'_1d'].cumsum() return df_fc df_results = invert_transformation(df_train, df_forecast) df_results.loc[:, ['rgnp_forecast', 'pgnp_forecast', 'ulc_forecast', 'gdfco_forecast', 'gdf_forecast', 'gdfim_forecast', 'gdfcf_forecast', 'gdfce_forecast']] fig, axes = plt.subplots(nrows=int(len(df.columns)/2), ncols=2, dpi=150, figsize=(10,10)) for i, (col,ax) in enumerate(zip(df.columns, axes.flatten())): df_results[col+'_forecast'].plot(legend=True, ax=ax).autoscale(axis='x',tight=True) df_test[col][-nobs:].plot(legend=True, ax=ax); ax.set_title(col + ": Forecast vs Actuals") ax.xaxis.set_ticks_position('none') ax.yaxis.set_ticks_position('none') ax.spines["top"].set_alpha(0) ax.tick_params(labelsize=6) plt.tight_layout()
在之前的章节中,用到的分析方法都用到了时间序列中所有的数据点,而在接下来要介绍的机器学习部分,我们并不会用到全部的数据点。特征生成是一个找到一种量化的方式,从时间序列中提取出最重要的信息,生成一些数值和类别标签。本质上特征生成做的是压缩原数据,生成一组具有足够代表性的更小的数据。例如用均值和时间点的数量表示原时间序列数据就是一个最简单的例子。
一些常用的特征:
4.1.1 自动特征生成与选择
from tsfresh.examples.robot_execution_failures import download_robot_execution_failures,load_robot_execution_failures from tsfresh import extract_features,select_features import pandas as pd timeseries, y = load_robot_execution_failures() # 加载数据 timeseries.columns #该数据集包含8列,其中id表明类别id,time为时间轴,其他6列为不同维度的时间序列值 # Index(['id', 'time', 'F_x', 'F_y', 'F_z', 'T_x', 'T_y', 'T_z'], dtype='object') # 自动抽取全部特征 X_extracted = extract_features(timeseries,column_id = "id",column_sort = "time") X_extracted # 选择性生成特征 fc_parameters = { "length": None, "large_standard_deviation": [{"r": 0.05}, {"r": 0.1}] } extract_features(timeseries, column_id = "id",column_sort = "time",default_fc_parameters=fc_parameters) # 自动特征选择 X_extracted_cols = X_extracted.isnull().sum().where(lambda x : x==0).dropna().index # 由于不是所有生成的变量都是有意义的,删除掉包含NA的特征,这也是特征选择函数的要求 X_selected = select_features(X_extracted[X_extracted_cols], y) # fresh算法自动从2203个特征中选择出了665个 print('count of raw feature: {}'.format(len(X_extracted_cols))) print('count of auto-selected feature: {}'.format(len(X_selected.columns))) # count of raw feature: 2203 # count of auto-selected feature: 665
官方文档的这个流程展示了用fresh算法进行特征选择的思路,简单来说,它是通过比较不同时间序列类别下特征的显著性差异来确定是否要挑选出这个特征。
除此之外,还有其他用于特征选择的方法,如recursive feature elimination (RFE),不过tsfresh包并没有内置这种方法,可以结合sklearn中的RFE方法自行组合使用。
以原始的脑电图时间序列数据为例,演示如何使用机器学习模型进行时间序列分类。
该脑电图数据集共有5个类别 :
import matplotlib.pyplot as plt from tsfresh import extract_features import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier import xgboost as xgb eeg = pd.read_csv('data\eeg.csv') eeg.head() # 1、观察不同类别的时间序列特征,为后面构造特征做准备 plt.subplot(3, 1, 1) plt.plot(eeg[eeg.id==0]['times'], eeg[eeg.id==0]['measurements']) plt.legend(eeg.loc[0,'classes']) plt.subplot(3, 1, 2) plt.plot(eeg[eeg.id==300]['times'],eeg[eeg.id==300]['measurements']) plt.legend(eeg.loc[300*4097,'classes']) plt.subplot(3, 1, 3) plt.plot(eeg[eeg.id==450]['times'],eeg[eeg.id==450]['measurements']) plt.legend(eeg.loc[450*4097,'classes']) plt.tight_layout() # 从图表中发现,Z 类和 G 类的数据比 S 类曲折更少。此外,每个类都有相当不同的值域范围,数据点的分布似乎也有所不同。 # 2、抽取特征,根据观察可视化人工选择了6个特征 fc_parameters = { "skewness": None, "ratio_beyond_r_sigma": [{"r": 1}], "maximum":None, "minimum":None, "mean_abs_change":None, "longest_strike_above_mean":None } data = extract_features(eeg, column_id = "id",column_sort = "times",column_value="measurements",chunksize=20, default_fc_parameters=fc_parameters) # chunksize设置一个较小的值是为了防止电脑内存不足 data.index.names=['id'] data.reset_index() data = data.merge(eeg[['id','classes']].drop_duplicates(), on='id',how='inner') # 3、使用两种经典集成学习方法,随机森林和xgboost # 训练机器学习模型 X_train, X_test, y_train, y_test = train_test_split(data.drop(['id', 'classes'], axis=1), data["classes"]) rf_clf = RandomForestClassifier() rf_clf.fit(X_train, y_train) rf_clf.score(X_test, y_test) # 使用XGBOOST模型 xgb_clf = xgb.XGBClassifier() xgb_clf.fit(X_train, y_train) xgb_clf.score(X_test, y_test)
评价指标:
# 混淆矩阵
y_true = [0, 1, 2, 2, 2]
y_pred = [0, 0, 2, 2, 1]
target_names = ['class 0', 'class 1', 'class 2']
print(metrics.classification_report(y_true, y_pred))
# ROC/AUC,计算AUC
y_true = np.array([0, 0, 1, 1])
y_scores = np.array([0.1, 0.4, 0.35, 0.8])
metrics.roc_auc_score(y_true, y_scores)
混淆矩阵可视化:http://scikit-learn.org/dev/auto_examples/model_selection/plot_confusion_matrix.html#sphx-glr-auto-examples-model-selection-plot-confusion-matrix-py
ROC曲线可视化:https://blog.csdn.net/lz_peter/article/details/78054914
聚类问题的重要步骤是确定距离,即如何度量相似度。
两大思路:
基于特征计算距离
和传统机器学习的聚类方法没有多大差别, 利用原始的时间序列构造特征,再以特征进行聚类。
基于原始时间序列计算距离
一个最常用的算法叫做动态时间规整dynamic time warping (DTW) ,非常适合用于形状特征很重要的时间序列数据。DTW解决的是一对多的问题,欧氏距离的计算要求两个向量是等长的,也就是要求两组时间序列的长度是一模一样才能计算,而很多情况我们得到的时间序列都不是等长的,此时DTW就能发挥作用了。
通过一幅图可以帮助我们理解DTW的算法本质,DTW的核心是捕获时间序列的模式或形状特征,不要求时间轴一一对应。
DTW方法的基本规则:
from math import sqrt import numpy as np def distDTW(ts1,ts2) DTW = {} for i in range(len(ts1)): DTW[(i, -1)] = np.inf for i in range(len(ts2)): DTW[(-1, i)] = np.inf DTW[(-1, -1)] = 0 for i in range(len(ts1)): for j in range(len(ts2)): dist = (ts1[i] - ts2[j]) ** 2 DTW[(i, j)] = dist + min(DTW[(i - 1, j)], DTW[(i, j - 1)], DTW[(i - 1, j - 1)]) return sqrt(DTW[len(ts1) - 1, len(ts2) - 1])
除此之外,还有其他几种应用于时间序列的距离算法:
弗雷歇距离(Fréchet distance )
弗雷歇距离可以理解为狗绳距离,也就是人和狗各自走过一段路所需要的最短的狗绳距离 。
最长公共子序列(Longest common subsequence )
最长公共子序列主要用于非数值的时间序列,比如两个英文单词,最长的公共序列的长度也可以用于判断距离 。
聚类
import matplotlib.pyplot as plt import pandas as pd import numpy as np from sklearn import preprocessing from sklearn.cluster import AgglomerativeClustering from sklearn.metrics.cluster import homogeneity_score from scipy.spatial.distance import euclidean from fastdtw import fastdtw from tqdm import tqdm ### 1、基于特征计算距离 # 归一化数据 data_cluster = data.drop(['id', 'classes'], axis=1) data_values = preprocessing.scale(data_cluster.values) # 使用第一类聚类思路进行层次聚类 clustering = AgglomerativeClustering(n_clusters = 5,linkage = 'ward') clustering.fit(data_values) # 输出模型评估指标 homogeneity_score(clustering.labels_, data.classes) # Output: 0.43 # 使用构造的特征进行层次聚类,同质性评估为0.43 ### 2、基于原始时间序列计算距离 rowlist= np.array(range(0,50)) for a in [np.array(range(100,150)),np.array(range(200,250)),np.array(range(300,350)),np.array(range(400,450))]: rowlist=np.append(rowlist,a) # 使用第二类聚类思路首先计算DTW距离,这一步计算量比较大,先把结果存下来,并且作为简化,只选择了部分数据送入模型 pairwise_dis = pd.DataFrame(np.zeros((250, 250))) for index_r, r in tqdm(enumerate(rowlist)): for index_c, c in enumerate(rowlist): arr1 = eeg[eeg.id==r]['measurements'].values[:500] arr2 = eeg[eeg.id==c]['measurements'].values[:500] distance, _ = fastdtw(arr1, arr2, dist=euclidean) pairwise_dis.iloc[index_r,index_c]=distance pairwise_dis.to_csv('data/pairwise_dis.csv',index=False) pairwise_distance = pd.read_csv('data/pairwise_dis.csv') # 使用第二类聚类思路进行层次聚类 dtw_clustering = AgglomerativeClustering(linkage = 'average',n_clusters = 5, affinity = 'precomputed') _=dtw_clustering.fit_predict(pairwise_distance) # 输出模型评估指标 homogeneity_score(dtw_clustering.labels_, data[data.index.isin(rowlist)]['classes'])
评估指标:
样本距离最近的聚类中心的总和
调整后的兰德指数
取值范围为[-1,1],负数代表结果不好,越接近于1越好意味着聚类结果与真实情况越吻合。
互信息和调整后的互信息
当两个聚类集相同(即完全匹配)时,AMI返回值为1;随机分区(独立标签)平均预期AMI约为0,也可能为负数。
同质化得分
取值范围[0,1]值越大意味着聚类结果与真实情况越吻合。
完整性得分
取值范围[0,1],值越大意味着聚类结果与真实情况越吻合。
V-measure得分
取值范围[0,1],值越大意味着聚类结果与真实情况越吻合。
轮廓系数
使用平均群内距离和每个样本的平均最近簇距离来计算 , 其最高值为1,最差值为-1,0附近的值表示重叠的聚类,负值通常表示样本已被分配到错误的集群。
群内离散与簇间离散的比值
raw_data = np.loadtxt( './cluster.txt') # 导入数据文件 X = raw_data[:, : -1] # 分割要聚类的数据 y_true = raw_data[:, -1] # 训练聚类模型 n_clusters = 3# 设置聚类数量 model_kmeans = KMeans(n_clusters=n_clusters, random_state= 0) # 建立聚类模型对象 model_kmeans.fit(X) # 训练聚类模型 y_pre = model_kmeans.predict(X) # 预测聚类模型 # 模型效果指标评估 n_samples, n_features = X.shape # 总样本量,总特征数 inertias = model_kmeans.inertia_ # 样本距离最近的聚类中心的总和 adjusted_rand_s = metrics.adjusted_rand_score(y_true, y_pre) # 调整后的兰德指数 mutual_info_s = metrics.mutual_info_score(y_true, y_pre) # 互信息 adjusted_mutual_info_s = metrics.adjusted_mutual_info_score(y_true, y_pre) # 调整后的互信息 homogeneity_s = metrics.homogeneity_score(y_true, y_pre) # 同质化得分 completeness_s = metrics.completeness_score(y_true, y_pre) # 完整性得分 v_measure_s = metrics.v_measure_score(y_true, y_pre) # V-measure得分 silhouette_s = metrics.silhouette_score(X, y_pre, metric= 'euclidean') # 平均轮廓系数 calinski_harabaz_s = metrics.calinski_harabaz_score(X, y_pre) # Calinski和Harabaz得分 print( '总样本量: %d t 总特征数: %d'% (n_samples, n_features)) # 打印输出样本量和特征数量 print( 70* '-') # 打印分隔线 print( 'inetARItMItAMIthomotcomptv_mtsilhtc&h') # 打印输出指标标题 print( '%dt%.2ft%.2ft%.2ft%.2ft%.2ft%.2ft%.2ft%d'% ( inertias, adjusted_rand_s, mutual_info_s, adjusted_mutual_info_s, homogeneity_s, completeness_s, v_measure_s, silhouette_s, calinski_harabaz_s)) # 打印输出指标值 print( 70* '-') # 打印分隔线 print( '简写 t 全称') # 打印输出缩写和全名标题 print( 'ine t 样本距离最近的聚类中心的总和') print( 'ARI t 调整后的兰德指数') print( 'MI t 互信息') print( 'AMI t 调整后的互信息') print( 'homo t 同质化得分') print( 'comp t 完整性得分') print( 'v_m t V-measure得分') print( 'silh t 平均轮廓系数') print( 'c&h t Calinski和Harabaz得分')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。