当前位置:   article > 正文

时间序列之拐点检测(changepoints detection)算法

时序分析找到折点的方法

一、前言

对于时间拐点问题,其实就是找changepoint的问题,业务场景比如机器的缩扩容,业务的升级回滚等,都会让一些指标发生这样的现象, 如下图。(这种场景比较理想,现实情况要复杂得多)

52110c62d69365919c9f36ceef6cd79f.jpeg

为了检测这个区域,需要使用changepoint detector:ruptures

二、ruptures

ruptures 是专门用于检测时间序列数据中的结构性变化,这种变化也常被称为断点(breakpoints)或变点(changepoints)。

库提供了多种算法来检测时间序列中的突变或者异常模式变化。这些算法能够识别出时间序列数据中的突变,这些突变可能表明了一些潜在的事件或状态变化。

项目地址:https://github.com/deepcharles/ruptures 官方文档在这里 http://ctruong.perso.math.cnrs.fr/ruptures

2.1 安装

pip install ruptures

2.2 使用

其中有很多算法供选择,这里选择Dynp(dynamic programming.)

其他算法使用可以参考:https://forecastegy.com/posts/change-point-detection-time-series-python/#detecting-change-points-with-binary-segmentation

  1. import matplotlib.pyplot as plt
  2. import ruptures as rpt
  3. import numpy as np
  4. mean = 0
  5. std_dev = 1
  6. length_of_series = 100
  7. values = np.random.normal(mean, std_dev, length_of_series)
  8. values[-35:] = values[-35:] + 10
  9. # 找拐点
  10. algo = rpt.Dynp(model="l2", min_size=3, jump=3).fit(values)
  11. # 检测断点,指定最大断点数
  12. result = algo.predict(n_bkps=1)
  13. result = result[:-1]
  14. # 显示结果
  15. plt.plot(values)
  16. for bkp in result:
  17. plt.axvline(x=bkp, color='r', linestyle='--')
  18. plt.show()
0b00ffcded848bfcce7631fd88951c52.jpeg

参数:

  • model:用于计算cost的function,根据这个来找changepoint

  • custom_cost: 自定义cost function

  • min_size:最小的分段长度。

  • jump:在进行断点搜索时的采样步长。

2.3 源码简单解析

predict源码:

33d6c7f0281127cafa70addfc883409d.jpeg

实则还是self.seg是核心函数,结果通过长度的index进行排序 输出的结果格式为长度的index,看上面给出的results就明白了。

  1. @lru_cache(maxsize=None)
  2. def seg(self, start, end, n_bkps):
  3. """Recurrence to find the optimal partition of signal[start:end].
  4. This method is to be memoized and then used.
  5. Args:
  6. start (int): start of the segment (inclusive)
  7. end (int): end of the segment (exclusive)
  8. n_bkps (int): number of breakpoints
  9. Returns:
  10. dict: {(start, end): cost value, ...}
  11. """
  12. jump, min_size = self.jump, self.min_size
  13. if n_bkps == 0:
  14. cost = self.cost.error(start, end)
  15. return {(start, end): cost}
  16. elif n_bkps > 0:
  17. # Let's fill the list of admissible last breakpoints
  18. multiple_of_jump = (k for k in range(start, end) if k % jump == 0)
  19. admissible_bkps = list()
  20. for bkp in multiple_of_jump:
  21. n_samples = bkp - start
  22. # first check if left subproblem is possible
  23. if sanity_check(
  24. n_samples=n_samples,
  25. n_bkps=n_bkps - 1,
  26. jump=jump,
  27. min_size=min_size,
  28. ):
  29. # second check if the right subproblem has enough points
  30. if end - bkp >= min_size:
  31. admissible_bkps.append(bkp)
  32. assert (
  33. len(admissible_bkps) > 0
  34. ), "No admissible last breakpoints found.\
  35. start, end: ({},{}), n_bkps: {}.".format(
  36. start, end, n_bkps
  37. )
  38. # Compute the subproblems
  39. sub_problems = list()
  40. for bkp in admissible_bkps:
  41. left_partition = self.seg(start, bkp, n_bkps - 1)
  42. right_partition = self.seg(bkp, end, 0)
  43. tmp_partition = dict(left_partition)
  44. tmp_partition[(bkp, end)] = right_partition[(bkp, end)]
  45. sub_problems.append(tmp_partition)
  46. # Find the optimal partition
  47. return min(sub_problems, key=lambda d: sum(d.values()))

稍微看下 这个源码,大致就懂这个算法的含义了。

  1. 通过jump大小得到可能的候选边界

  2. 通过min_size的大小比较,得到每段的前后边界组合

  3. 进行递归

  4. 如果没有分块,则计算此时的时间序列段的cost value

  5. 得到所有分块的cost值,找到此时cost的最小值

这个其实比较暴力了,相当于目标函数cost fuction最小化。不暴力的有贝叶斯的方法:Bayesian Online Changepoint Detection

2.4 一些问题

  1. 这个必须要预先设置要检测多少个拐点

  2. 比如要大于min_size

这两个问题对于实际应用,基本不会知道检测的长度预先有多少个拐点,其次大于min_size这个要求如果是流式场景肯定会有一定的延迟。

所以对于流式场景这个只能检测过去几个点的拐点状态,无法实时检测出来。

并且针对第一个问题,不知道有多少个,那我们也可以通过卡阈值的方式来自动化的选出排名前部的边界

  1. # 找拐点
  2. algo = rpt.Dynp(model="l2", min_size=3, jump=3).fit(values)
  3. # 这块n可以很多,但是要满足分割的条件, 直接采用 seg
  4. partition = algo.seg(0, algo.n_samples, n_bkps=10)
  5. # 输出的肯定有最后一个边界要去掉
  6. filter_partition = {}
  7. for key, value in partition.items():
  8. if key[1] == algo.n_samples:
  9. continue
  10. filter_partition[key] = value
  11. partition = filter_partition
  12. sort_partition = sorted(partition.items(), key=lambda x: x[1], reverse=True)

后续怎么操作就看需求了,可以按照cost大小进行排序,也可以直接根据数量取前面的等等

推荐阅读:

我的2022届互联网校招分享

我的2021总结

浅谈算法岗和开发岗的区别

互联网校招研发薪资汇总

公众号:AI蜗牛车

保持谦逊、保持自律、保持进步

64e9c9bb1b7e750a8c06ddc35a6eaaef.jpeg

发送【蜗牛】获取一份《手把手AI项目》(AI蜗牛车著)

发送【1222】获取一份不错的leetcode刷题笔记

发送【AI四大名著】获取四本经典AI电子书

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/523029
推荐阅读
相关标签
  

闽ICP备14008679号