当前位置:   article > 正文

黑马头条--day11-kafkaStream热点文章实时计算_实时与定时计算文章分数

实时与定时计算文章分数

目录

一.定时计算与实时计算

二. 实时流式计算

1.概念

2. 应用场景

3.技术方案选型

三. Kafka Stream

1 概述

2.Kafka Streams的关键概念

3. KStream

4. Kafka Stream入门案例编写

5.SpringBoot集成Kafka Stream

四.app端热点文章计算

功能实现

用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例

3,使用kafkaStream实时接收消息,聚合内容

4.重新计算文章的分值,更新到数据库和缓存中


一.定时计算与实时计算

kafkaStream

  • 什么是流式计算

  • kafkaStream概述

  • kafkaStream入门案例

  • Springboot集成kafkaStream

实时计算

  • 用户行为发送消息

  • kafkaStream聚合处理消息

  • 更新文章行为数量

  • 替换热点文章数据

二. 实时流式计算

1.概念

 一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

 流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2. 应用场景

  • 日志分析

    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策

  • 大屏看板统计

    可以实时的查看网站注册数量,订单数量,购买数量,金额等。

  • 公交实时数据

    可以随时更新公交车方位,计算多久到达站牌等

  • 实时文章分值计算

    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

3.技术方案选型

 Hadoop

 

  • Apche Storm

    Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

  • Kafka Stream

    可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成

三. Kafka Stream

1 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署

  • 除了Kafka外,无任何外部依赖

  • 充分利用Kafka分区机制实现水平扩展和顺序性保证

  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)

  • 支持正好一次处理语义

  • 提供记录级的处理能力,从而实现毫秒级的低延迟

  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)

  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

2.Kafka Streams的关键概念

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

 

3. KStream

 1)数据结构类似于map,如下图,key-value键值对

 

 

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果您的流处理应用是要总结每个用户的价值,它将返回4alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

4. Kafka Stream入门案例编写

(1)需求分析,求单词个数(word count)  

(2)引入依赖

在之前的kafka-demo工程的pom文件中引入

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-streams</artifactId>
  4. <exclusions>
  5. <exclusion>
  6. <artifactId>connect-json</artifactId>
  7. <groupId>org.apache.kafka</groupId>
  8. </exclusion>
  9. <exclusion>
  10. <groupId>org.apache.kafka</groupId>
  11. <artifactId>kafka-clients</artifactId>
  12. </exclusion>
  13. </exclusions>
  14. </dependency>

(3)创建原生的kafka staream入门案例

  1. package com.heima.kafka.sample;
  2. import org.apache.kafka.common.serialization.Serdes;
  3. import org.apache.kafka.streams.KafkaStreams;
  4. import org.apache.kafka.streams.KeyValue;
  5. import org.apache.kafka.streams.StreamsBuilder;
  6. import org.apache.kafka.streams.StreamsConfig;
  7. import org.apache.kafka.streams.kstream.KStream;
  8. import org.apache.kafka.streams.kstream.TimeWindows;
  9. import org.apache.kafka.streams.kstream.ValueMapper;
  10. import java.time.Duration;
  11. import java.util.Arrays;
  12. import java.util.Properties;
  13. /**
  14. * 流式处理
  15. */
  16. public class KafkaStreamQuickStart {
  17. public static void main(String[] args) {
  18. //kafka的配置信心
  19. Properties prop = new Properties();
  20. prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
  21. prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  22. prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  23. prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
  24. //stream 构建器
  25. StreamsBuilder streamsBuilder = new StreamsBuilder();
  26. //流式计算
  27. streamProcessor(streamsBuilder);
  28. //创建kafkaStream对象
  29. KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
  30. //开启流式计算
  31. kafkaStreams.start();
  32. }
  33. /**
  34. * 流式计算
  35. * 消息的内容:hello kafka hello itcast
  36. * @param streamsBuilder
  37. */
  38. private static void streamProcessor(StreamsBuilder streamsBuilder) {
  39. //创建kstream对象,同时指定从那个topic中接收消息
  40. KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
  41. /**
  42. * 处理消息的value
  43. */
  44. stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
  45. @Override
  46. public Iterable<String> apply(String value) {
  47. return Arrays.asList(value.split(" "));
  48. }
  49. })
  50. //按照value进行聚合处理
  51. .groupBy((key,value)->value)
  52. //时间窗口
  53. .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
  54. //统计单词的个数
  55. .count()
  56. //转换为kStream
  57. .toStream()
  58. .map((key,value)->{
  59. System.out.println("key:"+key+",vlaue:"+value);
  60. return new KeyValue<>(key.key().toString(),value.toString());
  61. })
  62. //发送消息
  63. .to("itcast-topic-out");
  64. }
  65. }

(4)测试准备

  • 使用生产者在topic为:itcast_topic_input中发送多条消息

  • 使用消费者接收topic为:itcast_topic_out

结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

5.SpringBoot集成Kafka Stream

  1. package com.heima.kafka.config;
  2. import lombok.Getter;
  3. import lombok.Setter;
  4. import org.apache.kafka.clients.consumer.ConsumerConfig;
  5. import org.apache.kafka.common.serialization.Serdes;
  6. import org.apache.kafka.streams.StreamsConfig;
  7. import org.apache.kafka.streams.Topology;
  8. import org.springframework.boot.context.properties.ConfigurationProperties;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.kafka.annotation.EnableKafkaStreams;
  12. import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
  13. import org.springframework.kafka.config.KafkaStreamsConfiguration;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. /**
  17. * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
  18. */
  19. @Setter
  20. @Getter
  21. @Configuration
  22. @EnableKafkaStreams
  23. @ConfigurationProperties(prefix="kafka")
  24. public class KafkaStreamConfig {
  25. private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
  26. private String hosts;
  27. private String group;
  28. @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  29. public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
  30. Map<String, Object> props = new HashMap<>();
  31. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
  32. props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
  33. props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
  34. props.put(StreamsConfig.RETRIES_CONFIG, 10);
  35. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  36. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  37. return new KafkaStreamsConfiguration(props);
  38. }
  39. }

修改application.yml文件,在最下方添加自定义配置

  1. kafka:
  2. hosts: 192.168.200.130:9092
  3. group: ${spring.application.name}

 (2)新增配置类,创建KStream对象,进行聚合

  1. package com.heima.kafka.stream;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.streams.KeyValue;
  4. import org.apache.kafka.streams.StreamsBuilder;
  5. import org.apache.kafka.streams.kstream.KStream;
  6. import org.apache.kafka.streams.kstream.TimeWindows;
  7. import org.apache.kafka.streams.kstream.ValueMapper;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import java.time.Duration;
  11. import java.util.Arrays;
  12. @Configuration
  13. @Slf4j
  14. public class KafkaStreamHelloListener {
  15. @Bean
  16. public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
  17. //创建kstream对象,同时指定从那个topic中接收消息
  18. KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
  19. stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
  20. @Override
  21. public Iterable<String> apply(String value) {
  22. return Arrays.asList(value.split(" "));
  23. }
  24. })
  25. //根据value进行聚合分组
  26. .groupBy((key,value)->value)
  27. //聚合计算时间间隔
  28. .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
  29. //求单词的个数
  30. .count()
  31. .toStream()
  32. //处理后的结果转换为string字符串
  33. .map((key,value)->{
  34. System.out.println("key:"+key+",value:"+value);
  35. return new KeyValue<>(key.key().toString(),value.toString());
  36. })
  37. //发送消息
  38. .to("itcast-topic-out");
  39. return stream;
  40. }
  41. }

 启动测试类,向itcast-topic-input中发送十次消息

  1. @SpringBootTest
  2. public class KafKaStreamTest {
  3. @Test
  4. public void testSendMsg(){
  5. Properties properties = new Properties();
  6. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.81.128:9092");
  7. properties.put(ProducerConfig.RETRIES_CONFIG,10);
  8. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  9. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  10. //生产者对象
  11. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
  12. ProducerRecord<String,String> record = new ProducerRecord<>("itcast-topic-input","hello kafka stream");
  13. for (int i = 0; i < 10; i++) {
  14. //发送消息
  15. kafkaProducer.send(record);
  16. }
  17. }
  18. }

配置监听器

  1. @Component
  2. public class KafkaListener {
  3. /**
  4. * 消费者监听out主题
  5. * @param msg
  6. */
  7. @org.springframework.kafka.annotation.KafkaListener(topics = "itcast-topic-out")
  8. public void listen(String msg){
  9. System.out.println(msg);
  10. }
  11. }

测试:

  1. 2023-12-27 15:37:09.537 INFO 30248 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=kafka-demo-test_stream_cid-StreamThread-1-consumer, groupId=kafka-demo-test_stream_aid] Revoke previously assigned partitions kafka-demo-test_stream_aid-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-0
  2. 2023-12-27 15:37:09.537 INFO 30248 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [kafka-demo-test_stream_cid-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
  3. 2023-12-27 15:37:09.537 INFO 30248 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [kafka-demo-test_stream_cid] State transition from RUNNING to REBALANCING
  4. key:[hello@1703662620000/1703662630000],value:10
  5. key:[kafka@1703662620000/1703662630000],value:10
  6. key:[stream@1703662620000/1703662630000],value:10
  7. 10
  8. 10
  9. 10

可以看到成功的将单词个数统计了出来

四.app端热点文章计算

 

功能实现

用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例

①在heima-leadnews-behavior微服务中集成kafka生产者配置

修改nacos,新增内容

  1. spring:
  2. application:
  3. name: leadnews-behavior
  4. kafka:
  5. bootstrap-servers: 192.168.200.130:9092
  6. producer:
  7. retries: 10
  8. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  9. value-serializer: org.apache.kafka.common.serialization.StringSerializer

②修改ApLikesBehaviorServiceImpl新增发送消息

定义消息发送封装类:UpdateArticleMess

 

  1. package com.heima.model.mess;
  2. import lombok.Data;
  3. @Data
  4. public class UpdateArticleMess {
  5. /**
  6. * 修改文章的字段类型
  7. */
  8. private UpdateArticleType type;
  9. /**
  10. * 文章ID
  11. */
  12. private Long articleId;
  13. /**
  14. * 修改数据的增量,可为正负
  15. */
  16. private Integer add;
  17. public enum UpdateArticleType{
  18. COLLECTION,COMMENT,LIKES,VIEWS;
  19. }
  20. }

topic常量类:

  1. package com.heima.common.constans;
  2. public class HotArticleConstants {
  3. public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
  4. public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
  5. }

 用户点赞行为修改

  1. package com.heima.behavior.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.behavior.service.ApLikesBehaviorService;
  4. import com.heima.common.constants.BehaviorConstants;
  5. import com.heima.common.constants.HotArticleConstants;
  6. import com.heima.common.redis.CacheService;
  7. import com.heima.model.behavior.dtos.LikesBehaviorDto;
  8. import com.heima.model.common.dtos.ResponseResult;
  9. import com.heima.model.common.enums.AppHttpCodeEnum;
  10. import com.heima.model.mess.UpdateArticleMess;
  11. import com.heima.model.user.pojos.ApUser;
  12. import com.heima.utils.thread.AppThreadLocalUtil;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.kafka.core.KafkaTemplate;
  16. import org.springframework.stereotype.Service;
  17. import org.springframework.transaction.annotation.Transactional;
  18. @Service
  19. @Transactional
  20. @Slf4j
  21. public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
  22. @Autowired
  23. private CacheService cacheService;
  24. @Autowired
  25. private KafkaTemplate<String,String> kafkaTemplate;
  26. @Override
  27. public ResponseResult like(LikesBehaviorDto dto) {
  28. //1.检查参数
  29. if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
  30. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  31. }
  32. //2.是否登录
  33. ApUser user = AppThreadLocalUtil.getUser();
  34. if (user == null) {
  35. return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
  36. }
  37. UpdateArticleMess mess = new UpdateArticleMess();
  38. mess.setArticleId(dto.getArticleId());
  39. mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
  40. //3.点赞 保存数据
  41. if (dto.getOperation() == 0) {
  42. Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
  43. if (obj != null) {
  44. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
  45. }
  46. // 保存当前key
  47. log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
  48. cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
  49. mess.setAdd(1);
  50. } else {
  51. // 删除当前key
  52. log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
  53. cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
  54. mess.setAdd(-1);
  55. }
  56. //发送消息,数据聚合
  57. kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
  58. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  59. }
  60. /**
  61. * 检查参数
  62. *
  63. * @return
  64. */
  65. private boolean checkParam(LikesBehaviorDto dto) {
  66. if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
  67. return true;
  68. }
  69. return false;
  70. }
  71. }

③修改阅读行为的类ApReadBehaviorServiceImpl发送消息

  1. package com.heima.behavior.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.behavior.service.ApReadBehaviorService;
  4. import com.heima.common.constants.BehaviorConstants;
  5. import com.heima.common.constants.HotArticleConstants;
  6. import com.heima.common.redis.CacheService;
  7. import com.heima.model.behavior.dtos.ReadBehaviorDto;
  8. import com.heima.model.common.dtos.ResponseResult;
  9. import com.heima.model.common.enums.AppHttpCodeEnum;
  10. import com.heima.model.mess.UpdateArticleMess;
  11. import com.heima.model.user.pojos.ApUser;
  12. import com.heima.utils.thread.AppThreadLocalUtil;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.apache.commons.lang3.StringUtils;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.kafka.core.KafkaTemplate;
  17. import org.springframework.stereotype.Service;
  18. import org.springframework.transaction.annotation.Transactional;
  19. @Service
  20. @Transactional
  21. @Slf4j
  22. public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {
  23. @Autowired
  24. private CacheService cacheService;
  25. @Autowired
  26. private KafkaTemplate<String,String> kafkaTemplate;
  27. @Override
  28. public ResponseResult readBehavior(ReadBehaviorDto dto) {
  29. //1.检查参数
  30. if (dto == null || dto.getArticleId() == null) {
  31. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  32. }
  33. //2.是否登录
  34. ApUser user = AppThreadLocalUtil.getUser();
  35. if (user == null) {
  36. return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
  37. }
  38. //更新阅读次数
  39. String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
  40. if (StringUtils.isNotBlank(readBehaviorJson)) {
  41. ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
  42. dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
  43. }
  44. // 保存当前key
  45. log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);
  46. cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
  47. //发送消息,数据聚合
  48. UpdateArticleMess mess = new UpdateArticleMess();
  49. mess.setArticleId(dto.getArticleId());
  50. mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
  51. mess.setAdd(1);
  52. kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
  53. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  54. }
  55. }

3,使用kafkaStream实时接收消息,聚合内容

①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)

②定义实体类,用于聚合之后的分值封装

  1. package com.heima.model.article.mess;
  2. import lombok.Data;
  3. @Data
  4. public class ArticleVisitStreamMess {
  5. /**
  6. * 文章id
  7. */
  8. private Long articleId;
  9. /**
  10. * 阅读
  11. */
  12. private int view;
  13. /**
  14. * 收藏
  15. */
  16. private int collect;
  17. /**
  18. * 评论
  19. */
  20. private int comment;
  21. /**
  22. * 点赞
  23. */
  24. private int like;
  25. }

 ③ 定义stream,接收消息并聚合

  1. package com.heima.article.stream;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.common.constants.HotArticleConstants;
  4. import com.heima.model.mess.ArticleVisitStreamMess;
  5. import com.heima.model.mess.UpdateArticleMess;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.apache.kafka.streams.KeyValue;
  9. import org.apache.kafka.streams.StreamsBuilder;
  10. import org.apache.kafka.streams.kstream.*;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.context.annotation.Configuration;
  13. import java.time.Duration;
  14. @Configuration
  15. @Slf4j
  16. public class HotArticleStreamHandler {
  17. @Bean
  18. public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
  19. //接收消息
  20. KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
  21. //聚合流式处理
  22. stream.map((key,value)->{
  23. UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
  24. //重置消息的key:1234343434 和 value: likes:1
  25. return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
  26. })
  27. //按照文章id进行聚合
  28. .groupBy((key,value)->key)
  29. //时间窗口
  30. .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
  31. /**
  32. * 自行的完成聚合的计算
  33. */
  34. .aggregate(new Initializer<String>() {
  35. /**
  36. * 初始方法,返回值是消息的value
  37. * @return
  38. */
  39. @Override
  40. public String apply() {
  41. return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
  42. }
  43. /**
  44. * 真正的聚合操作,返回值是消息的value
  45. */
  46. }, new Aggregator<String, String, String>() {
  47. @Override
  48. public String apply(String key, String value, String aggValue) {
  49. if(StringUtils.isBlank(value)){
  50. return aggValue;
  51. }
  52. String[] aggAry = aggValue.split(",");
  53. int col = 0,com=0,lik=0,vie=0;
  54. for (String agg : aggAry) {
  55. String[] split = agg.split(":");
  56. /**
  57. * 获得初始值,也是时间窗口内计算之后的值
  58. */
  59. switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
  60. case COLLECTION:
  61. col = Integer.parseInt(split[1]);
  62. break;
  63. case COMMENT:
  64. com = Integer.parseInt(split[1]);
  65. break;
  66. case LIKES:
  67. lik = Integer.parseInt(split[1]);
  68. break;
  69. case VIEWS:
  70. vie = Integer.parseInt(split[1]);
  71. break;
  72. }
  73. }
  74. /**
  75. * 累加操作
  76. */
  77. String[] valAry = value.split(":");
  78. switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
  79. case COLLECTION:
  80. col += Integer.parseInt(valAry[1]);
  81. break;
  82. case COMMENT:
  83. com += Integer.parseInt(valAry[1]);
  84. break;
  85. case LIKES:
  86. lik += Integer.parseInt(valAry[1]);
  87. break;
  88. case VIEWS:
  89. vie += Integer.parseInt(valAry[1]);
  90. break;
  91. }
  92. String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
  93. System.out.println("文章的id:"+key);
  94. System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
  95. return formatStr;
  96. }
  97. }, Materialized.as("hot-atricle-stream-count-001"))
  98. .toStream()
  99. .map((key,value)->{
  100. return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
  101. })
  102. //发送消息
  103. .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
  104. return stream;
  105. }
  106. /**
  107. * 格式化消息的value数据
  108. * @param articleId
  109. * @param value
  110. * @return
  111. */
  112. public String formatObj(String articleId,String value){
  113. ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
  114. mess.setArticleId(Long.valueOf(articleId));
  115. //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
  116. String[] valAry = value.split(",");
  117. for (String val : valAry) {
  118. String[] split = val.split(":");
  119. switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
  120. case COLLECTION:
  121. mess.setCollect(Integer.parseInt(split[1]));
  122. break;
  123. case COMMENT:
  124. mess.setComment(Integer.parseInt(split[1]));
  125. break;
  126. case LIKES:
  127. mess.setLike(Integer.parseInt(split[1]));
  128. break;
  129. case VIEWS:
  130. mess.setView(Integer.parseInt(split[1]));
  131. break;
  132. }
  133. }
  134. log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
  135. return JSON.toJSONString(mess);
  136. }
  137. }

4.重新计算文章的分值,更新到数据库和缓存中

①在ApArticleService添加方法,用于更新数据库中的文章分值  

  1. /**
  2. * 更新文章的分值 同时更新缓存中的热点文章数据
  3. * @param mess
  4. */
  5. public void updateScore(ArticleVisitStreamMess mess);

 实现类方法

  1. /**
  2. * 更新文章的分值 同时更新缓存中的热点文章数据
  3. * @param mess
  4. */
  5. @Override
  6. public void updateScore(ArticleVisitStreamMess mess) {
  7. //1.更新文章的阅读、点赞、收藏、评论的数量
  8. ApArticle apArticle = updateArticle(mess);
  9. //2.计算文章的分值
  10. Integer score = computeScore(apArticle);
  11. score = score * 3;
  12. //3.替换当前文章对应频道的热点数据
  13. replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());
  14. //4.替换推荐对应的热点数据
  15. replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
  16. }
  17. /**
  18. * 替换数据并且存入到redis
  19. * @param apArticle
  20. * @param score
  21. * @param s
  22. */
  23. private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
  24. String articleListStr = cacheService.get(s);
  25. if (StringUtils.isNotBlank(articleListStr)) {
  26. List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);
  27. boolean flag = true;
  28. //如果缓存中存在该文章,只更新分值
  29. for (HotArticleVo hotArticleVo : hotArticleVoList) {
  30. if (hotArticleVo.getId().equals(apArticle.getId())) {
  31. hotArticleVo.setScore(score);
  32. flag = false;
  33. break;
  34. }
  35. }
  36. //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
  37. if (flag) {
  38. if (hotArticleVoList.size() >= 30) {
  39. hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
  40. HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
  41. if (lastHot.getScore() < score) {
  42. hotArticleVoList.remove(lastHot);
  43. HotArticleVo hot = new HotArticleVo();
  44. BeanUtils.copyProperties(apArticle, hot);
  45. hot.setScore(score);
  46. hotArticleVoList.add(hot);
  47. }
  48. } else {
  49. HotArticleVo hot = new HotArticleVo();
  50. BeanUtils.copyProperties(apArticle, hot);
  51. hot.setScore(score);
  52. hotArticleVoList.add(hot);
  53. }
  54. }
  55. //缓存到redis
  56. hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
  57. cacheService.set(s, JSON.toJSONString(hotArticleVoList));
  58. }
  59. }
  60. /**
  61. * 更新文章行为数量
  62. * @param mess
  63. */
  64. private ApArticle updateArticle(ArticleVisitStreamMess mess) {
  65. ApArticle apArticle = getById(mess.getArticleId());
  66. apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
  67. apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
  68. apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
  69. apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
  70. updateById(apArticle);
  71. return apArticle;
  72. }
  73. /**
  74. * 计算文章的具体分值
  75. * @param apArticle
  76. * @return
  77. */
  78. private Integer computeScore(ApArticle apArticle) {
  79. Integer score = 0;
  80. if(apArticle.getLikes() != null){
  81. score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
  82. }
  83. if(apArticle.getViews() != null){
  84. score += apArticle.getViews();
  85. }
  86. if(apArticle.getComment() != null){
  87. score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
  88. }
  89. if(apArticle.getCollection() != null){
  90. score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
  91. }
  92. return score;
  93. }

 ②定义监听,接收聚合之后的数据,文章的分值重新进行计算

  1. package com.heima.article.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.article.service.ApArticleService;
  4. import com.heima.common.constants.HotArticleConstants;
  5. import com.heima.model.mess.ArticleVisitStreamMess;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.kafka.annotation.KafkaListener;
  10. import org.springframework.stereotype.Component;
  11. @Component
  12. @Slf4j
  13. public class ArticleIncrHandleListener {
  14. @Autowired
  15. private ApArticleService apArticleService;
  16. @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
  17. public void onMessage(String mess){
  18. if(StringUtils.isNotBlank(mess)){
  19. ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
  20. apArticleService.updateScore(articleVisitStreamMess);
  21. }
  22. }
  23. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/505021
推荐阅读
相关标签
  

闽ICP备14008679号