当前位置:   article > 正文

电脑常识基于SpringBoot使用K stream实现数据埋点\统计\实时更新_spring boot埋点统计

spring boot埋点统计

电脑常识基于SpringBoot使用K stream实现数据埋点\统计\实时更新

 今年刚上手一个项目,需求点是文章页面的点赞,收藏,评论和阅读的实时点击统计,并根据这个统计集成给后台,作为后台实现文章日更,时更和用户推送的一个数据埋点.

       首先用到的技术是

Kafka流式计算

        Kafka除了作为高性能的MQ以外,其实还有一个流式计算的功能,这个功能可以把代码运行中的数据状态以消息的形式发送给统计模块进行分流,聚合和统计输出,自动化完成数据的捕抓和收集;

        同时,这种功能需要在项目初期技术选型的时候就确定下来,避免对原代码造成侵入破坏.

       MQ采用的是消息队列的方式,所以完全不用担心高并发量的问题,或许唯一使您烦恼的可能是消息生产者和消息消费者之间各种自定的主题名字.

         实现思路:

1.数据埋点;

2.Springboot项目整合Kafka

3.Kafka及K Stream原始配置与简化;

4.Kafka Input

5.K Stream流式计算

6.Kafka Output

7.数据收集,处理,更新并返回

        具体步骤:

一.基础依赖环境(版本号根据自己项目需求确定)

  1. org.apache.kafka
  2. kafka-streams

二.yml中需要配置消息组和服务访问地址(我是用的是Linux系统)

  1. kafka:
  2. group: ${spring.application.name}
  3. hosts: 192.168.66.133:9092

 三.Kafka及K Stream启动配置类----------->

1.KafkaStreamConfig

  1. @Configuration
  2. @EnableKafkaStreams //启用kafkastream
  3. @ConfigurationProperties(prefix="kafka")
  4. public class KafkaStreamConfig {
  5. //最大消息的大小
  6. private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
  7. //kafka所在服务器地址
  8. private String hosts;
  9. //kafka所在分组名称 给消费者使用 就是applicationName
  10. private String group;
  11. public String getHosts() {
  12. return hosts;
  13. }
  14. public void setHosts(String hosts) {
  15. this.hosts = hosts;
  16. }
  17. public String getGroup() {
  18. return group;
  19. }
  20. public void setGroup(String group) {
  21. this.group = group;
  22. }
  23. /**
  24. * 重新定义默认的KafkaStreams配置属性,包括:
  25. * 1、服务器地址
  26. * 2、应用ID
  27. * 3、流消息的副本数等配置
  28. * @return
  29. */
  30. @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  31. public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
  32. Map props = new HashMap<>();
  33. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
  34. props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
  35. props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
  36. props.put(StreamsConfig.RETRIES_CONFIG, 10);
  37. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  38. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  39. // 消息副本数量
  40. props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
  41. props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
  42. props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
  43. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
  44. return new KafkaStreamsConfiguration(props);
  45. }
  46. }

 2.工厂注册类

  1. /**
  2. * KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类,完成监听器实际注册的过程
  3. */
  4. @Component
  5. public class KafkaStreamListenerFactory implements InitializingBean {
  6. Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class);
  7. @Autowired
  8. DefaultListableBeanFactory defaultListableBeanFactory;//IOC容器本身
  9. /**
  10. * 初始化完成后自动调用
  11. */
  12. @Override
  13. public void afterPropertiesSet() {
  14. Map map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class);
  15. for (String key : map.keySet()) {
  16. KafkaStreamListener k = map.get(key);
  17. KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k);
  18. String beanName = k.getClass().getSimpleName()+"AutoProcessor" ;
  19. //将对象交给spring容器管理

3.K Stream监听器接口

  1. /**
  2. * 流数据的监听消费者实现的接口类,系统自动会通过
  3. * KafkaStreamListenerFactory类扫描项目中实现该接口的类
  4. * 并注册为流数据的消费端
  5. *

* 其中泛型可是KStream或KTable * * @param */ public interface KafkaStreamListener { // 流式处理的时候需要监听的主题是什么 INPUTTOPIC String listenerTopic(); //流式处理完成之后继续发送到的主题是什么 OUTTOPIC String sendTopic(); // 流式业务的对象处理逻辑 T getService(T stream); }

4.基本Bean

  1. /**
  2. * KafkaStream自动处理包装类
  3. */
  4. public class KafkaStreamProcessor {
  5. // 流构建器
  6. StreamsBuilder streamsBuilder;
  7. private String type;
  8. KafkaStreamListener listener;
  9. public KafkaStreamProcessor(StreamsBuilder streamsBuilder, KafkaStreamListener kafkaStreamListener) {
  10. this.streamsBuilder = streamsBuilder;
  11. this.listener = kafkaStreamListener;
  12. this.parseType();
  13. Assert.notNull(this.type, "Kafka Stream 监听器只支持kstream、ktable,当前类型是" + this.type);
  14. }
  15. /**
  16. * 通过泛型类型自动注册对应类型的流处理器对象
  17. * 支持KStream、KTable
  18. *
  19. * @return
  20. */
  21. public Object doAction() {
  22. if ("kstream".equals(this.type)) {
  23. KStream<?, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
  24. stream = (KStream) listener.getService(stream);
  25. stream.to(listener.sendTopic());
  26. return stream;
  27. } else {
  28. KTable<?, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
  29. table = (KTable) listener.getService(table);
  30. table.toStream().to(listener.sendTopic());
  31. return table;
  32. }
  33. }
  34. /**
  35. * 解析传入listener类的泛型类
  36. */
  37. private void parseType() {
  38. Type[] types = listener.getClass().getGenericInterfaces();
  39. if (types != null) {
  40. for (int i = 0; i < types.length; i++) {
  41. if (types[i] instanceof ParameterizedType) {
  42. ParameterizedType t = (ParameterizedType) types[i];
  43. String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();
  44. if (name.contains("org.apache.kafka.streams.kstream.kstream") || name.contains("org.apache.kafka.streams.kstream.ktable")) {
  45. this.type = name.substring(0, name.indexOf('<')).replace("org.apache.kafka.streams.kstream.", "").trim();
  46. break;
  47. }
  48. }
  49. }
  50. }
  51. }
  52. }

5.消息生产者(数据入口)

  1. /**
  2. * 消息生产者
  3. */
  4. @RestController
  5. @RequestMapping("/sendstream")
  6. public class StreamProducer {
  7. @Autowired
  8. private KafkaTemplate kafkaTemplate;
  9. private static final String INPUT_TOPIC = "input-stream-topic";
  10. @GetMapping
  11. public String sendstream(){
  12. for(int i=1;i<=20;i++){
  13. if(i%2==0){
  14. kafkaTemplate.send(INPUT_TOPIC,"0001"+i,"hello kafka");
  15. }else{
  16. kafkaTemplate.send(INPUT_TOPIC,"0002"+i,"hello stream");
  17. }
  18. }
  19. return "成功";
  20. }
  21. }

6.消息消费者(K Stream数据出口)

  1. /**
  2. * 消息消费者
  3. */
  4. @Component
  5. public class StreamConsumer {
  6. private static final String OUT_TOPIC = "out-stream-topic";
  7. @KafkaListener(topics =OUT_TOPIC )
  8. public void handleMsg(ConsumerRecord consumerRecord){
  9. if(consumerRecord!=null){
  10. String key = consumerRecord.key();
  11. String value = consumerRecord.value();
  12. System.out.println("--------"+key+":"+value);
  13. }
  14. }
  15. }

四.数据埋点(简单的数据封装,由生产者发送到消费者)

五.K Stream--->K Table--->K Stream

  1. @Component
  2. public class HotArticleStreamHandler implements KafkaStreamListener> {
  3. @Override
  4. public String listenerTopic() {//输入主题
  5. return MQConstants.HOT_ARTICLE_INPUT_TOPIC;
  6. }
  7. @Override
  8. public String sendTopic() { //输出主题
  9. return MQConstants.HOT_ARTICLE_OUTPUT_TOPIC;
  10. }
  11. @Override
  12. public KStream getService(KStream stream) {
  13. /**
  14. * 现在的消息格式:
  15. * key value
  16. * "topic" {"data"}
  17. * topic" {"data"}
  18. * .......
  19. */
  20. KTable, Long> ktable = stream.flatMapValues(new ValueMapper>() {
  21. @Override
  22. public Iterable<?> apply(String value) {
  23. UpdateArticleMsg articleMsg = JsonUtils.toBean(value, Msg.class);
  24. String tag = Msg;
  25. return Arrays.asList(tag);
  26. }
  27. // 搜集值
  28. }).map(new KeyValueMapper>() {
  29. @Override
  30. public KeyValue<?, ?> apply(String key, Object value) {
  31. return new KeyValue<>(value, value);
  32. }
  33. }).groupByKey()
  34. //设置聚合间隔时间
  35. .windowedBy(TimeWindows.of(Duration.ofSeconds(3)))
  36. .count(Materialized.as("count"));

 

百度网盘搜索
阿哇教育
作文哥
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/826359
推荐阅读
相关标签
  

闽ICP备14008679号