当前位置:   article > 正文

利用storm1.0的窗口机制做pv统计(一)_统计storm窗口内数据条数

统计storm窗口内数据条数

       近期需要用storm(1.0版本)做pv统计,现在通过博客把topology的设计,coding,和上线调优的过程记录下来,留着给做相关事情的小伙伴阅读一下。

       一. 首先叙述一下业务场景,网站的访问信息被网关层nginx记录下来,nginx日志以json的数据结构发到我们的kafka消息队列中,需要利用storm实时处理平台去fetch数据,将数据指标按照不同的访问域,不同的访问方法,和一定的时间窗口频度进行统计,结果保存在mysql中,供前端监控页面展示。

        二,废话不多说,先上topology设计的DAG

            

        commonSpout从kafka中拿到数据之后,进行简单的转码解析(尽量把只负责处理数据类型和简单的筛选逻辑),emit到第一层executeBolt(extends BaseBasicBolt),并行度为3(暂定,后续会根据运行情况调节),分组为Shuffle Grouping(后面会解释为什么用这个分组),然后在executeBolt里分析数据,把域名,请求方法名,请求分段时间标志作为Map的key,利用stom的心跳机制,在Bolt中的getComponentConfiguration()方法中设置executeBolt的心跳频率,以统计频率为一分钟为例,定时的让tickTuple按设置的时间发送给Bolt中的execute(),代码片段如下:

  1. public Map<String, Object> getComponentConfiguration() {
  2. //设置发送ticktuple的时间间隔
  3. Config conf = new Config();
  4. conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
  5. return conf;
  6. }

        这样,在execute()中可以进行按照频率进行数据积攒,得到1min中的窗口Map。将Map.toString()发向windowBolt。

        在介绍第二层Bolt前,先介绍一下storm1.0的窗口的机制,这个机制是在storm1.0版本之后引入的,可以解决实时统计场景中典型的窗口计算问题:例如每隔m时间,就计算n时间内窗口的数据;每隔k条数据,就计算j条数据内窗口的数据;每隔m时间,就计算j条数据内窗口的数据;每隔k条数据,就计算n时间内窗口的数据。通过构建topology时候简单的参数配置,实现了窗口数据的捕获,开发者只需关注窗口内的处理逻辑即可,而不需要为积攒窗口数据复制而低效的逻辑而操心。例如此topology中的windowBolt被我设计为,每隔3条数据,就计算3条数据中的数据,进行汇总。那么为什么这么设计呢?其实这是有一个过程的,设计之初,这种pv统计的问题有一个最关键的地方,就是如何设置好你的并行计算和全局统计,简单来说,如果全程下来是单个线程的spout,单个线程的bolt,那么,统计出来的数据是准确的,没有问题,但是随着数据量级的增加,计算压力的增大,在executeBolt中的计算压力增大,需要设置并行计算,这样,多个线程一起利用心跳做累计,每个线程计算的只是部分的数据,而并不是全局的,如果在统计场景,在mysql里出现的是多条同样的key的数据,本质上来说,只是增大了数据的粒度,而并没有进行准确的统计。后来为了解决这个问题,引入了第二层bolt,即窗口bolt,让第一层的bolt统一把计算结果发给窗口统计Bolt,这样相当于窗口Bolt做了一个全局的汇总,从而实现了全局数据的统计,第三层Bolt的并行度一定要只为1,否则将不能实现窗口统计全局唯一的功能,那么这个时候我们不仅要问,这样一来窗口Bolt的压力不会大吗??其实这样的设计结构需要你把所有的计算压力全都放在第一层可并行的bolt中,让统计Bolt只做简单的数据统计叠加就可以了,把解析和逻辑处理这种耗费cpu的压力全部都给第一层Bolt去做。这样设计的topology就可以保证统计的准确性,并将计算和统计解耦。

        那么我来解释一下分组选用的原因,我第窗口Bolt的设计是每收到3条,就统计3条的数据的窗口,进行计算,为什么这样呢?因为executeBolt的并行度是3,也就是说数据进来之后,我必须保证每一个Bolt都不阻塞,按照顺畅的工作模式去处理数据,也就是数据要均匀的通过我的第一层Bolt,假设通过6条,那么一定是随机分组,我的三个并行里都能拿到2条,这样第二层Bolt才能按照3-3的标准去统计,在全局中计算这6条数据,如果不是随机分组,那么executeBolt中并行的数据不均匀,第二层的按照条数计算的窗口就会出现消费缓慢,或者极端情况下,一直不计算的问题。

这里面后续优化性能的过程中有两个参数注意:

1.第一层Bolt的并行度一定要和第二层窗口层的统计维度条数一样

2.第二层窗口Bolt的并行度一定只能为1

下面写一下窗口bolt的API。

  1. withWindow(Count windowLength, Count slidingInterval)
  2.   每收到slidingInterval条数据统计最近的windowLength条数据。
  3. withWindow(Count windowLength)
  4.   每收到1条数据统计最近的windowLength条数据。
  5. withWindow(Count windowLength, Duration slidingInterval)
  6.   每过slidingInterval秒统计最近的windowLength条数据。
  7. withWindow(Duration windowLength, Count slidingInterval)
  8.   每收到slidingInterval条数据统计最近的windowLength秒的数据。
  9. withWindow(Duration windowLength, Duration slidingInterval)
  10.   每过slidingInterval秒统计最近的windowLength秒的数据。
  11. withWindow(Duration windowLength)
  12.   每收到1条数据统计最近的windowLength秒的数据。

        在构建Topology时通过此代码片段即可:

builder.setBolt("windowBolt",new WindowBolt().withWindow(Count windowLength, Count slidingInterval),1).shuffleGrouping("上一个Bolt");
于是,通过这样的一个Topology设计,便实现了Pv窗口统计功能,为监控方实时提供了数据。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/544078
推荐阅读
相关标签
  

闽ICP备14008679号