当前位置:   article > 正文

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka

黑马头条

目录

《黑马头条》SpringBoot+SpringCloud+ Nacos等企业级微服务架构项目_黑马头条项目_软工菜鸡的博客-CSDN博客

04自媒体文章-自动审核

1)自媒体文章自动审核流程

2)内容安全第三方接口

2.1)概述

2.2)准备工作

2.3)文本内容审核接口

2.4)图片审核接口

2.5)项目集成

3)app端文章保存接口

3.1)表结构说明

3.2)分布式id

分布式id-技术选型

3.3)思路分析

3.4)feign接口

4)自媒体文章自动审核功能实现

4.1)表结构说明

4.2)实现

4.3)单元测试

4.4)feign远程接口调用方式

4.5)服务降级处理

5)发布文章提交审核集成

5.1)同步调用与异步调用

5.2)Springboot集成异步线程调用

6)文章审核功能-综合测试

6.1)服务启动列表

6.2)测试情况列表

7)新需求-自管理敏感词

7.1)需求分析

7.2)敏感词-过滤

7.3)DFA实现原理

7.4)自管理敏感词集成到文章审核中

测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+

8)新需求-图片识别文字审核敏感词

8.1)需求分析

8.2)图片文字识别

8 .3)Tess4j案例

8.4)管理敏感词和图片文字识别集成到文章审核

9)文章详情-静态文件生成

9.1)思路分析

9.2)实现步骤

05延迟任务精准发布文章

1)文章定时发布

2)延迟任务概述

2.1)什么是延迟任务

2.2)技术对比

2.2.1)DelayQueue

2.2.2)RabbitMQ实现延迟任务

2.2.3)redis实现

3)redis实现延迟任务

锐评:完全为了学list zset而编出来的场景,实际工作中延迟队列要设计成这样只能说太蠢了

4)延迟任务服务实现

4.1)搭建heima-leadnews-schedule模块

4.2)数据库准备

乐观锁/悲观锁

4.3)安装redis

4.4)项目集成redis

4.5)添加任务

4.6)取消任务

4.7)消费任务

4.8)未来数据定时刷新

4.8.1)reids key值匹配

4.8.2)reids管道

4.8.3)未来数据定时刷新-功能完成

4.9)分布式锁解决集群下的方法抢占执行

4.9.1)问题描述

4.9.2)分布式锁

4.9.3)redis分布式锁

4.9.4)在工具类CacheService中添加方法

4.10)数据库同步到redis

5)延迟队列解决精准时间发布文章

5.1)延迟队列服务提供对外接口

5.2)发布文章集成添加延迟队列接口

序列化工具对比

5.3)消费任务进行审核文章

06kafka及异步通知文章上下架

1)自媒体文章上下架

2)kafka概述

消息中间件对比-选择建议

kafka介绍-名词解释

3)kafka安装配置

4)kafka入门

分区机制—topic剖析

5)kafka高可用设计

5.1)集群

5.2)备份机制(Replication)

6)kafka生产者详解

6.1)发送类型

6.2)参数详解

ack确认机制

retries 重试次数

消息压缩

7)kafka消费者详解

7.1)消费者组

7.2)消息有序性

7.3)提交和偏移量

1.提交当前偏移量(同步提交)

2.异步提交

3.同步和异步组合提交

8)springboot集成kafka

8.1)入门

8.2)传递消息为对象

9)自媒体文章上下架功能完成

9.1)需求分析

9.2)流程说明

9.3)接口定义

9.4)自媒体文章上下架-功能实现

9.5)消息通知article端文章上下架



04自媒体文章-自动审核

1)自媒体文章自动审核流程

1 自媒体端发布文章后,开始审核文章
2 审核的主要是审核文章的 内容(文本内容和图片)
3 借助 第三方提供的接口审核文本
4 借助第三方提供的接口审核图片,由于图片存储到minIO中,需要先下载才能审核
5 如果审核失败,则需要修改自媒体文章的状态,status:2 审核失败 status:3 转到人工审核
6 如果审核成功,则需要在文章微服务中创建app端需要的文章

2)内容安全第三方接口

2.1)概述

内容安全是识别服务,支持对图片、视频、文本、语音等对象多样化场景检测,有效降低内容违规风险

目前很多平台都支持内容检测,如阿里云、腾讯云、百度AI、网易云等国内大型互联网公司都对外提供了API。

按照性能和收费来看,黑马头条项目使用的就是阿里云的内容安全接口,使用到了图片和文本的审核

阿里云收费标准:https://www.aliyun.com/price/product/?spm=a2c4g.11186623.2.10.4146401eg5oeu8#/lvwang/detail

2.2)准备工作

您在使用内容检测API之前,需要先注册阿里云账号,添加Access Key并签约云盾内容安全

操作步骤

  1. 前往阿里云官网注册账号。如果已有注册账号,请跳过此步骤。

进入阿里云首页后,如果没有阿里云的账户需要先进行注册,才可以进行登录。由于注册较为简单,课程和讲义不在进行体现(注册可以使用多种方式,如淘宝账号、支付宝账号、微博账号等...)。

需要实名认证和活体认证。

  1. 打开云盾内容安全产品试用页面,单击立即开通,正式开通服务。

内容安全控制台

  1. AccessKey管理页面管理您的AccessKeyID和AccessKeySecret。

管理自己的AccessKey,可以新建和删除AccessKey

查看自己的AccessKey,

AccessKey默认是隐藏的,第一次申请的时候可以保存AccessKey,点击显示,通过验证手机号后也可以查看

2.3)文本内容审核接口

文本垃圾内容检测:如何调用文本检测接口进行文本内容审核_内容安全-阿里云帮助中心

文本垃圾内容Java SDK: 如何使用JavaSDK文本反垃圾接口_内容安全-阿里云帮助中心

2.4)图片审核接口

图片垃圾内容检测:调用图片同步检测接口/green/image/scan审核图片内容_内容安全-阿里云帮助中心

图片垃圾内容Java SDK: 如何使用JavaSDK接口检测图片是否包含风险内容_内容安全-阿里云帮助中心

2.5)项目集成

①:拷贝资料文件夹中的类到common模块下面,并添加到自动配置

包括了GreenImageScan和GreenTextScan及对应的工具类

添加到自动配置中

②: accessKeyId和secret(需自己申请)

在heima-leadnews-wemedia中的nacos配置中心添加以下配置:

  1. aliyun:
  2. accessKeyId: ...
  3. secret: ...
  4. #aliyun.scenes=porn,terrorism,ad,qrcode,live,logo
  5. scenes: terrorism

③:在自媒体微服务中测试类中注入审核文本和图片的bean进行测试

  1. package com.heima.wemedia;
  2. import java.util.Arrays;
  3. import java.util.Map;
  4. @SpringBootTest(classes = WemediaApplication.class)
  5. @RunWith(SpringRunner.class)
  6. public class AliyunTest {
  7. @Autowired
  8. private GreenTextScan greenTextScan;
  9. @Autowired
  10. private GreenImageScan greenImageScan;
  11. @Autowired
  12. private FileStorageService fileStorageService;
  13. @Test
  14. public void testScanText() throws Exception {
  15. Map map = greenTextScan.greeTextScan("我是一个好人,冰毒");
  16. System.out.println(map);
  17. }
  18. @Test
  19. public void testScanImage() throws Exception {
  20. byte[] bytes = fileStorageService.downLoadFile("http://192.168.200.130:9000/leadnews/2021/04/26/ef3cbe458db249f7bd6fb4339e593e55.jpg");
  21. Map map = greenImageScan.imageScan(Arrays.asList(bytes));
  22. System.out.println(map);
  23. }
  24. }

我用的是 阿里云 云安全 增强版1小时,没审核出效果为null;估计是阿里 改接口了;

图片审核页报错

  1. java.lang.RuntimeException: upload file fail.
  2. at com.heima.common.aliyun.util.ClientUploader.uploadBytes(ClientUploader.java:129)
  3. at com.heima.common.aliyun.GreenImageScan.imageScan(GreenImageScan.java:71)
  4. at com.heima.wemedia.test.AliyunTest.testScanImage(AliyunTest.java:51)
  5. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  6. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  7. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  8. at java.lang.reflect.Method.invoke(Method.java:498)
  9. at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
  10. at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  11. at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
  12. at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  13. at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
  14. at org.springframework.test.context.junit4.statements.RunAfterTestExecutionC

3)app端文章保存接口

3.1)表结构说明
3.2)分布式id

随着业务的增长,文章表可能要占用很大的物理存储空间,为了解决该问题,后期使用数据库分片技术。将一个数据库进行拆分,通过数据库中间件连接。如果数据库中该表选用ID自增策略,则可能产生重复的ID,此时应该使用分布式ID生成策略来生成ID。

分布式id-技术选型

snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。

其核心思想是:使用41bit作为毫秒数,10bit作为机器的ID(5个bit是数据中心,5个bit的机器ID)(最多32个机房*32台机器(也可以自己设)),12bit作为毫秒内的流水号(意味着每个节点在每毫秒可以产生 4096 个 ID),最后还有一个符号位,永远是0(1为负数)

文章端相关的表使用雪花算法生成id,包括ap_article、 ap_article_config、 ap_article_content

mybatis-plus已经集成了雪花算法,完成以下两步即可在项目中集成雪花算法

第一:在实体类中的id上加入如下配置,指定类型为id_worker

  1. @TableId(value = "id",type = IdType.ID_WORKER)
  2. private Long id;

第二:在application.yml文件中配置数据中心id和机器id

  1. mybatis-plus:
  2. mapper-locations: classpath*:mapper/*.xml
  3. # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  4. type-aliases-package: com.heima.model.article.pojos
  5. global-config:
  6. datacenter-id: 1
  7. workerId: 1

datacenter-id:数据中心id(取值范围:0-31) ;workerId:机器id(取值范围:0-31)

3.3)思路分析

在文章审核成功以后需要在app的article库中新增文章数据

1.保存文章信息 ap_article

2.保存文章配置信息 ap_article_config

3.保存文章内容 ap_article_content

实现思路:

3.4)feign接口

ArticleDto

  1. package com.heima.model.article.dtos;
  2. import com.heima.model.article.pojos.ApArticle;
  3. import lombok.Data;
  4. @Data
  5. public class ArticleDto extends ApArticle {
  6. /**
  7. * 文章内容
  8. */
  9. private String content;
  10. }

功能实现:

①:在heima-leadnews-feign-api中新增接口

第一:线导入feign的依赖

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-openfeign</artifactId>
  4. </dependency>

第二:定义文章端的接口

  1. package com.heima.apis.article;
  2. import org.springframework.web.bind.annotation.RequestBody;
  3. @FeignClient(value = "leadnews-article")
  4. public interface IArticleClient {
  5. @PostMapping("/api/v1/article/save")
  6. public ResponseResult saveArticle(@RequestBody ArticleDto dto) ;
  7. }

②:在heima-leadnews-article中实现该方法

  1. package com.heima.article.feign;
  2. import java.io.IOException;
  3. @RestController
  4. public class ArticleClient implements IArticleClient {
  5. @Autowired
  6. private ApArticleService apArticleService;
  7. @Override
  8. @PostMapping("/api/v1/article/save")
  9. public ResponseResult saveArticle(@RequestBody ArticleDto dto) {
  10. return apArticleService.saveArticle(dto);
  11. }
  12. }

③:拷贝mapper

在资料文件夹中拷贝ApArticleConfigMapper类到mapper文件夹中

同时,修改ApArticleConfig类,添加如下构造函数

  1. package com.heima.model.article.pojos;
  2. import java.io.Serializable;
  3. /**
  4. * <p>
  5. * APP已发布文章配置表
  6. * </p>
  7. *
  8. * @author itheima
  9. */
  10. @Data
  11. @NoArgsConstructor
  12. @TableName("ap_article_config")
  13. public class ApArticleConfig implements Serializable {
  14. public ApArticleConfig(Long articleId){
  15. this.articleId = articleId;
  16. this.isComment = true;
  17. this.isForward = true;
  18. this.isDelete = false;
  19. this.isDown = false;
  20. }
  21. @TableId(value = "id",type = IdType.ID_WORKER)
  22. private Long id;
  23. /**
  24. * 文章id
  25. */
  26. @TableField("article_id")
  27. private Long articleId;
  28. /**
  29. * 是否可评论
  30. * true: 可以评论 1
  31. * false: 不可评论 0
  32. */
  33. @TableField("is_comment")
  34. private Boolean isComment;
  35. /**
  36. * 是否转发
  37. * true: 可以转发 1
  38. * false: 不可转发 0
  39. */
  40. @TableField("is_forward")
  41. private Boolean isForward;
  42. /**
  43. * 是否下架
  44. * true: 下架 1
  45. * false: 没有下架 0
  46. */
  47. @TableField("is_down")
  48. private Boolean isDown;
  49. /**
  50. * 是否已删除
  51. * true: 删除 1
  52. * false: 没有删除 0
  53. */
  54. @TableField("is_delete")
  55. private Boolean isDelete;
  56. }

④:在ApArticleService中新增方法

  1. /**
  2. * 保存app端相关文章
  3. * @param dto
  4. * @return
  5. */
  6. ResponseResult saveArticle(ArticleDto dto) ;

实现类:

  1. @Autowired
  2. private ApArticleConfigMapper apArticleConfigMapper;
  3. @Autowired
  4. private ApArticleContentMapper apArticleContentMapper;
  5. /**
  6. * 保存app端相关文章
  7. * @param dto
  8. * @return
  9. */
  10. @Override
  11. public ResponseResult saveArticle(ArticleDto dto) {
  12. //1.检查参数
  13. if(dto == null){
  14. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  15. }
  16. ApArticle apArticle = new ApArticle();
  17. BeanUtils.copyProperties(dto,apArticle);
  18. //2.判断是否存在id
  19. if(dto.getId() == null){
  20. //2.1 不存在id 保存 文章 文章配置 文章内容
  21. //保存文章
  22. save(apArticle);
  23. //保存配置
  24. ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId());
  25. apArticleConfigMapper.insert(apArticleConfig);
  26. //保存 文章内容
  27. ApArticleContent apArticleContent = new ApArticleContent();
  28. apArticleContent.setArticleId(apArticle.getId());
  29. apArticleContent.setContent(dto.getContent());
  30. apArticleContentMapper.insert(apArticleContent);
  31. }else {
  32. //2.2 存在id 修改 文章 文章内容
  33. //修改 文章
  34. updateById(apArticle);
  35. //修改文章内容
  36. ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.<ApArticleContent>lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId()));
  37. apArticleContent.setContent(dto.getContent());
  38. apArticleContentMapper.updateById(apArticleContent);
  39. }
  40. //3.结果返回 文章的id
  41. return ResponseResult.okResult(apArticle.getId());
  42. }

⑤:测试

编写junit单元测试,或使用postman进行测试

http://localhost:51802/api/v1/article/save

  1. {
  2. "id":这个id要去数据库自己找 ,
  3. "title":"黑马头条项目背景22222222222222",
  4. "authoId":1102,
  5. "layout":1,
  6. "labels":"黑马头条",
  7. "publishTime":"2028-03-14T11:35:49.000Z",
  8. "images": "http://192.168.200.130:9000/leadnews/2021/04/26/5ddbdb5c68094ce393b08a47860da275.jpg",
  9. "content":"22222222222222222黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景"
  10. }

4)自媒体文章自动审核功能实现

4.1)表结构说明

wm_news 自媒体文章表

status字段:0 草稿 1 待审核 2 审核失败 3 人工审核 4 人工审核通过 8 审核通过(待发布) 9 已发布

4.2)实现

在heima-leadnews-wemedia中的service新增接口

  1. package com.heima.wemedia.service;
  2. public interface WmNewsAutoScanService {
  3. /**
  4. * 自媒体文章审核
  5. * @param id 自媒体文章id
  6. */
  7. public void autoScanWmNews(Integer id);
  8. }

实现类:

  1. package com.heima.wemedia.service.impl;
  2. import java.util.*;
  3. import java.util.stream.Collectors;
  4. @Service
  5. @Slf4j
  6. @Transactional
  7. public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {
  8. @Autowired
  9. private WmNewsMapper wmNewsMapper;
  10. /**
  11. * 自媒体文章审核
  12. *
  13. * @param id 自媒体文章id
  14. */
  15. @Override
  16. public void autoScanWmNews(Integer id) {
  17. //1.查询自媒体文章
  18. WmNews wmNews = wmNewsMapper.selectById(id);
  19. if(wmNews == null){
  20. throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
  21. }
  22. if(wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())){
  23. //从内容中提取纯文本内容和图片
  24. Map<String,Object> textAndImages = handleTextAndImages(wmNews);
  25. //2.审核文本内容 阿里云接口
  26. boolean isTextScan = handleTextScan((String) textAndImages.get("content"),wmNews);
  27. if(!isTextScan)return;
  28. //3.审核图片 阿里云接口
  29. boolean isImageScan = handleImageScan((List<String>) textAndImages.get("images"),wmNews);
  30. if(!isImageScan)return;
  31. //4.审核成功,保存app端的相关的文章数据
  32. ResponseResult responseResult = saveAppArticle(wmNews);
  33. if(!responseResult.getCode().equals(200)){
  34. throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
  35. }
  36. //回填article_id
  37. wmNews.setArticleId((Long) responseResult.getData());
  38. updateWmNews(wmNews,(short) 9,"审核成功");
  39. }
  40. }
  41. @Autowired
  42. private IArticleClient articleClient;
  43. @Autowired
  44. private WmChannelMapper wmChannelMapper;
  45. @Autowired
  46. private WmUserMapper wmUserMapper;
  47. /**
  48. * 保存app端相关的文章数据
  49. * @param wmNews
  50. */
  51. private ResponseResult saveAppArticle(WmNews wmNews) {
  52. ArticleDto dto = new ArticleDto();
  53. //属性的拷贝
  54. BeanUtils.copyProperties(wmNews,dto);
  55. //文章的布局
  56. dto.setLayout(wmNews.getType());
  57. //频道
  58. WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
  59. if(wmChannel != null){
  60. dto.setChannelName(wmChannel.getName());
  61. }
  62. //作者
  63. dto.setAuthorId(wmNews.getUserId().longValue());
  64. WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
  65. if(wmUser != null){
  66. dto.setAuthorName(wmUser.getName());
  67. }
  68. //设置文章id
  69. if(wmNews.getArticleId() != null){
  70. dto.setId(wmNews.getArticleId());
  71. }
  72. dto.setCreatedTime(new Date());
  73. ResponseResult responseResult = articleClient.saveArticle(dto);
  74. return responseResult;
  75. }
  76. @Autowired
  77. private FileStorageService fileStorageService;
  78. @Autowired
  79. private GreenImageScan greenImageScan;
  80. /**
  81. * 审核图片
  82. * @param images
  83. * @param wmNews
  84. * @return
  85. */
  86. private boolean handleImageScan(List<String> images, WmNews wmNews) {
  87. boolean flag = true;
  88. if(images == null || images.size() == 0){
  89. return flag;
  90. }
  91. //下载图片 minIO
  92. //图片去重
  93. images = images.stream().distinct().collect(Collectors.toList());
  94. List<byte[]> imageList = new ArrayList<>();
  95. for (String image : images) {
  96. byte[] bytes = fileStorageService.downLoadFile(image);
  97. imageList.add(bytes);
  98. }
  99. //审核图片
  100. try {
  101. Map map = greenImageScan.imageScan(imageList);
  102. if(map != null){
  103. //审核失败
  104. if(map.get("suggestion").equals("block")){
  105. flag = false;
  106. updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
  107. }
  108. //不确定信息 需要人工审核
  109. if(map.get("suggestion").equals("review")){
  110. flag = false;
  111. updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
  112. }
  113. }
  114. } catch (Exception e) {
  115. flag = false;
  116. e.printStackTrace();
  117. }
  118. return flag;
  119. }
  120. @Autowired
  121. private GreenTextScan greenTextScan;
  122. /**
  123. * 审核纯文本内容
  124. * @param content
  125. * @param wmNews
  126. * @return
  127. */
  128. private boolean handleTextScan(String content, WmNews wmNews) {
  129. boolean flag = true;
  130. if((wmNews.getTitle()+"-"+content).length() == 0){
  131. return flag;
  132. }
  133. try {
  134. Map map = greenTextScan.greeTextScan((wmNews.getTitle()+"-"+content));
  135. if(map != null){
  136. //审核失败
  137. if(map.get("suggestion").equals("block")){
  138. flag = false;
  139. updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
  140. }
  141. //不确定信息 需要人工审核
  142. if(map.get("suggestion").equals("review")){
  143. flag = false;
  144. updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
  145. }
  146. }
  147. } catch (Exception e) {
  148. flag = false;
  149. e.printStackTrace();
  150. }
  151. return flag;
  152. }
  153. /**
  154. * 修改文章内容
  155. * @param wmNews
  156. * @param status
  157. * @param reason
  158. */
  159. private void updateWmNews(WmNews wmNews, short status, String reason) {
  160. wmNews.setStatus(status);
  161. wmNews.setReason(reason);
  162. wmNewsMapper.updateById(wmNews);
  163. }
  164. /**
  165. * 1。从自媒体文章的内容中提取文本和图片
  166. * 2.提取文章的封面图片
  167. * @param wmNews
  168. * @return
  169. */
  170. private Map<String, Object> handleTextAndImages(WmNews wmNews) {
  171. //存储纯文本内容
  172. StringBuilder stringBuilder = new StringBuilder();
  173. List<String> images = new ArrayList<>();
  174. //1。从自媒体文章的内容中提取文本和图片
  175. if(StringUtils.isNotBlank(wmNews.getContent())){
  176. List<Map> maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
  177. for (Map map : maps) {
  178. if (map.get("type").equals("text")){
  179. stringBuilder.append(map.get("value"));
  180. }
  181. if (map.get("type").equals("image")){
  182. images.add((String) map.get("value"));
  183. }
  184. }
  185. }
  186. //2.提取文章的封面图片
  187. if(StringUtils.isNotBlank(wmNews.getImages())){
  188. String[] split = wmNews.getImages().split(",");
  189. images.addAll(Arrays.asList(split));
  190. }
  191. Map<String, Object> resultMap = new HashMap<>();
  192. resultMap.put("content",stringBuilder.toString());
  193. resultMap.put("images",images);
  194. return resultMap;
  195. }
  196. }
4.3)单元测试
  1. package com.heima.wemedia.service;
  2. import com.heima.wemedia.WemediaApplication;
  3. import org.junit.Test;
  4. import org.junit.runner.RunWith;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.test.context.junit4.SpringRunner;
  8. import static org.junit.Assert.*;
  9. @SpringBootTest(classes = WemediaApplication.class)
  10. @RunWith(SpringRunner.class)
  11. public class WmNewsAutoScanServiceTest {
  12. @Autowired
  13. private WmNewsAutoScanService wmNewsAutoScanService;
  14. @Test
  15. public void autoScanWmNews() {
  16. wmNewsAutoScanService.autoScanWmNews(6238);
  17. }
  18. }
4.4)feign远程接口调用方式

在heima-leadnews-wemedia服务中已经依赖了heima-leadnews-feign-apis工程,只需要在自媒体的引导类中开启feign的远程调用即可

注解为:@EnableFeignClients(basePackages = "com.heima.apis") 需要指向apis这个包

4.5)服务降级处理
  • 服务降级是服务自我保护的一种方式,或者保护下游服务的一种方式,用于确保服务不会受请求突增影响变得不可用,确保服务不会崩溃

  • 服务降级虽然会导致请求失败,但是不会导致阻塞。

实现步骤:

①:在heima-leadnews-feign-api编写降级逻辑

  1. package com.heima.apis.article.fallback;
  2. import org.springframework.stereotype.Component;
  3. /**
  4. * feign失败配置
  5. * @author itheima
  6. */
  7. @Component
  8. public class IArticleClientFallback implements IArticleClient {
  9. @Override
  10. public ResponseResult saveArticle(ArticleDto dto) {
  11. return ResponseResult.errorResult(AppHttpCodeEnum.SERVER_ERROR,"获取数据失败");
  12. }
  13. }

在自媒体微服务中添加类,扫描降级代码类的包

  1. package com.heima.wemedia.config;
  2. import org.springframework.context.annotation.ComponentScan;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. @ComponentScan("com.heima.apis.article.fallback")
  6. public class InitConfig {
  7. }

②:远程接口中指向降级代码

  1. package com.heima.apis.article;
  2. import org.springframework.web.bind.annotation.RequestBody;
  3. @FeignClient(value = "leadnews-article",fallback = IArticleClientFallback.class)
  4. public interface IArticleClient {
  5. @PostMapping("/api/v1/article/save")
  6. public ResponseResult saveArticle(@RequestBody ArticleDto dto);
  7. }

③:客户端开启降级heima-leadnews-wemedia

在wemedia的nacos配置中心里添加如下内容,开启服务降级,也可以指定服务响应的超时的时间

  1. feign:
  2. # 开启feign对hystrix熔断降级的支持
  3. hystrix:
  4. enabled: true
  5. # 修改调用超时时间
  6. client:
  7. config:
  8. default:
  9. connectTimeout: 2000
  10. readTimeout: 2000

④:测试

在ApArticleServiceImpl类中saveArticle方法添加代码

  1. try {
  2. Thread.sleep(3000);
  3. } catch (InterruptedException e) {
  4. e.printStackTrace();
  5. }

在自媒体端进行审核测试,会出现服务降级的现象

5)发布文章提交审核集成

5.1)同步调用与异步调用

同步:就是在发出一个调用时,在没有得到结果之前, 该调用就不返回(实时处理)

异步:调用在发出之后,这个调用就直接返回了,没有返回结果(分时处理)

异步线程的方式审核文章

5.2)Springboot集成异步线程调用

①:在自动审核的方法上加上@Async注解(标明要异步调用)

  1. @Override
  2. @Async //标明当前方法是一个异步方法
  3. public void autoScanWmNews(Integer id) {
  4. //代码略
  5. }

②:在文章发布成功后调用审核的方法

  1. @Autowired
  2. private WmNewsAutoScanService wmNewsAutoScanService;
  3. /**
  4. * 发布修改文章或保存为草稿
  5. * @param dto
  6. * @return
  7. */
  8. @Override
  9. public ResponseResult submitNews(WmNewsDto dto) {
  10. //代码略
  11. //审核文章
  12. wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  13. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  14. }

③:在自媒体引导类中使用@EnableAsync注解开启异步调用

  1. @SpringBootApplication
  2. @EnableDiscoveryClient
  3. @MapperScan("com.heima.wemedia.mapper")
  4. @EnableFeignClients(basePackages = "com.heima.apis")
  5. @EnableAsync //开启异步调用
  6. public class WemediaApplication {
  7. public static void main(String[] args) {
  8. SpringApplication.run(WemediaApplication.class,args);
  9. }
  10. @Bean
  11. public MybatisPlusInterceptor mybatisPlusInterceptor() {
  12. MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
  13. interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
  14. return interceptor;
  15. }
  16. }

6)文章审核功能-综合测试

6.1)服务启动列表

1,nacos服务端

2,article微服务

3,wemedia微服务

4,启动wemedia网关微服务

5,启动前端系统wemedia

6.2)测试情况列表

1,自媒体前端发布一篇正常的文章

审核成功后,app端的article相关数据是否可以正常保存,自媒体文章状态和app端文章id是否回显

2,自媒体前端发布一篇包含敏感词的文章

正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常保存

3,自媒体前端发布一篇包含敏感图片的文章

正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常保存

7)新需求-自管理敏感词

7.1)需求分析

文章审核功能已经交付了,文章也能正常发布审核。突然,产品经理过来说要开会。

会议的内容核心有以下内容:

  • 文章审核不能过滤一些敏感词

私人侦探、针孔摄象、信用卡提现、广告代理、代开发票、刻章办、出售答案、小额贷款…

需要完成的功能:

需要自己维护一套敏感词,在文章审核的时候,需要验证文章是否包含这些敏感词

7.2)敏感词-过滤

技术选型

方案

说明

数据库模糊查询

效率太低

String.indexOf("")查找

数据库量大的话也是比较慢

全文检索

分词再匹配

DFA算法

确定有穷自动机(一种数据结构)

7.3)DFA实现原理

DFA全称为:Deterministic Finite Automaton,即确定有穷自动机

存储:一次性的把所有的敏感词存储到了多个map中,就是下图表示这种结构

敏感词:冰毒、大麻、大坏蛋

检索的过程

7.4)自管理敏感词集成到文章审核中

①:创建敏感词表,导入资料中wm_sensitive到leadnews_wemedia库中

  1. package com.heima.model.wemedia.pojos;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. /**
  5. * <p>
  6. * 敏感词信息表
  7. * </p>
  8. *
  9. * @author itheima
  10. */
  11. @Data
  12. @TableName("wm_sensitive")
  13. public class WmSensitive implements Serializable {
  14. private static final long serialVersionUID = 1L;
  15. /**
  16. * 主键
  17. */
  18. @TableId(value = "id", type = IdType.AUTO)
  19. private Integer id;
  20. /**
  21. * 敏感词
  22. */
  23. @TableField("sensitives")
  24. private String sensitives;
  25. /**
  26. * 创建时间
  27. */
  28. @TableField("created_time")
  29. private Date createdTime;
  30. }

②:拷贝对应的wm_sensitive的mapper到项目中

  1. package com.heima.wemedia.mapper;
  2. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  3. import com.heima.model.wemedia.pojos.WmSensitive;
  4. import org.apache.ibatis.annotations.Mapper;
  5. @Mapper
  6. public interface WmSensitiveMapper extends BaseMapper<WmSensitive> {
  7. }

③:在文章审核的代码中添加自管理敏感词审核

第一:在WmNewsAutoScanServiceImpl中的autoScanWmNews方法上添加如下代码

  1. //从内容中提取纯文本内容和图片
  2. //.....省略
  3. //自管理的敏感词过滤
  4. boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
  5. if(!isSensitive) return;
  6. //2.审核文本内容 阿里云接口
  7. //.....省略
测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+

//自管理的敏感词过滤

boolean isSensitive = handleSensitiveScan(

wmNews.getTitle()+textAndImages.get("content"), wmNews);

新增自管理敏感词审核代码

  1. @Autowired
  2. private WmSensitiveMapper wmSensitiveMapper;
  3. /**
  4. * 自管理的敏感词审核
  5. * @param content
  6. * @param wmNews
  7. * @return
  8. */
  9. private boolean handleSensitiveScan(String content, WmNews wmNews) {
  10. boolean flag = true;
  11. //获取所有的敏感词
  12. List<WmSensitive> wmSensitives = wmSensitiveMapper.selectList(Wrappers.<WmSensitive>lambdaQuery().select(WmSensitive::getSensitives));
  13. List<String> sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());
  14. //初始化敏感词库
  15. SensitiveWordUtil.initMap(sensitiveList);
  16. //查看文章中是否包含敏感词
  17. Map<String, Integer> map = SensitiveWordUtil.matchWords(content);
  18. if(map.size() >0){
  19. updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
  20. flag = false;
  21. }
  22. return flag;
  23. }

8)新需求-图片识别文字审核敏感词

8.1)需求分析

产品经理召集开会,文章审核功能已经交付了,文章也能正常发布审核。对于上次提出的自管理敏感词也很满意,这次会议核心的内容如下:

  • 文章中包含的图片要识别文字,过滤掉图片文字的敏感词

8.2)图片文字识别

什么是OCR?

OCR (Optical Character Recognition,光学字符识别)是指电子设备(例如扫描仪或数码相机)检查纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成计算机文字的过程

方案

说明

百度OCR

收费

Tesseract-OCR

Google维护的开源OCR引擎,支持Java,Python等语言调用

Tess4J

封装了Tesseract-OCR ,支持Java调用

8 .3)Tess4j案例

①:创建项目导入tess4j对应的依赖

  1. <dependency>
  2. <groupId>net.sourceforge.tess4j</groupId>
  3. <artifactId>tess4j</artifactId>
  4. <version>4.1.1</version>
  5. </dependency>

②:导入中文字体库, 把资料中的tessdata文件夹拷贝到自己的工作空间下

③:编写测试类进行测试

  1. package com.heima.tess4j;
  2. import net.sourceforge.tess4j.ITesseract;
  3. import net.sourceforge.tess4j.Tesseract;
  4. import java.io.File;
  5. public class Application {
  6. public static void main(String[] args) {
  7. try {
  8. //获取本地图片
  9. File file = new File("D:\\26.png");
  10. //创建Tesseract对象
  11. ITesseract tesseract = new Tesseract();
  12. //设置字体库路径
  13. tesseract.setDatapath("D:\\workspace\\tessdata");
  14. //中文识别
  15. tesseract.setLanguage("chi_sim");
  16. //执行ocr识别
  17. String result = tesseract.doOCR(file);
  18. //替换回车和tal键 使结果为一行
  19. result = result.replaceAll("\\r|\\n","-").replaceAll(" ","");
  20. System.out.println("识别的结果为:"+result);
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
8.4)管理敏感词和图片文字识别集成到文章审核

①:在heima-leadnews-common中创建工具类,简单封装一下tess4j

需要先导入pom

  1. <dependency>
  2. <groupId>net.sourceforge.tess4j</groupId>
  3. <artifactId>tess4j</artifactId>
  4. <version>4.1.1</version>
  5. </dependency>

工具类

  1. package com.heima.common.tess4j;
  2. import lombok.Getter;
  3. import lombok.Setter;
  4. import net.sourceforge.tess4j.ITesseract;
  5. import net.sourceforge.tess4j.Tesseract;
  6. import net.sourceforge.tess4j.TesseractException;
  7. import org.springframework.boot.context.properties.ConfigurationProperties;
  8. import org.springframework.stereotype.Component;
  9. import java.awt.image.BufferedImage;
  10. @Getter
  11. @Setter
  12. @Component
  13. @ConfigurationProperties(prefix = "tess4j")
  14. public class Tess4jClient {
  15. private String dataPath;
  16. private String language;
  17. public String doOCR(BufferedImage image) throws TesseractException {
  18. //创建Tesseract对象
  19. ITesseract tesseract = new Tesseract();
  20. //设置字体库路径
  21. tesseract.setDatapath(dataPath);
  22. //中文识别
  23. tesseract.setLanguage(language);
  24. //执行ocr识别
  25. String result = tesseract.doOCR(image);
  26. //替换回车和tal键 使结果为一行
  27. result = result.replaceAll("\\r|\\n", "-").replaceAll(" ", "");
  28. return result;
  29. }
  30. }

在spring.factories配置中添加该类,完整如下:

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2. com.heima.common.exception.ExceptionCatch,\
  3. com.heima.common.swagger.SwaggerConfiguration,\
  4. com.heima.common.swagger.Swagger2Configuration,\
  5. com.heima.common.aliyun.GreenTextScan,\
  6. com.heima.common.aliyun.GreenImageScan,\
  7. com.heima.common.tess4j.Tess4jClient

②:在heima-leadnews-wemedia中的配置中添加两个属性

  1. tess4j:
  2. data-path: D:\workspace\tessdata
  3. language: chi_sim

③:在WmNewsAutoScanServiceImpl中的handleImageScan方法上添加如下代码

  1. try {
  2. for (String image : images) {
  3. byte[] bytes = fileStorageService.downLoadFile(image);
  4. //图片识别文字审核---begin-----
  5. //从byte[]转换为butteredImage
  6. ByteArrayInputStream in = new ByteArrayInputStream(bytes);
  7. BufferedImage imageFile = ImageIO.read(in);
  8. //识别图片的文字
  9. String result = tess4jClient.doOCR(imageFile);
  10. //审核是否包含自管理的敏感词
  11. boolean isSensitive = handleSensitiveScan(result, wmNews);
  12. if(!isSensitive){
  13. return isSensitive;
  14. }
  15. //图片识别文字审核---end-----
  16. imageList.add(bytes);
  17. }
  18. }catch (Exception e){
  19. e.printStackTrace();
  20. }

最后附上文章审核的完整代码如下:

  1. package com.heima.wemedia.service.impl;
  2. import com.alibaba.fastjson.JSONArray;
  3. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  4. import com.heima.apis.article.IArticleClient;
  5. import com.heima.common.aliyun.GreenImageScan;
  6. import com.heima.common.aliyun.GreenTextScan;
  7. import com.heima.common.tess4j.Tess4jClient;
  8. import com.heima.file.service.FileStorageService;
  9. import com.heima.model.article.dtos.ArticleDto;
  10. import com.heima.model.common.dtos.ResponseResult;
  11. import com.heima.model.wemedia.pojos.WmChannel;
  12. import com.heima.model.wemedia.pojos.WmNews;
  13. import com.heima.model.wemedia.pojos.WmSensitive;
  14. import com.heima.model.wemedia.pojos.WmUser;
  15. import com.heima.utils.common.SensitiveWordUtil;
  16. import com.heima.wemedia.mapper.WmChannelMapper;
  17. import com.heima.wemedia.mapper.WmNewsMapper;
  18. import com.heima.wemedia.mapper.WmSensitiveMapper;
  19. import com.heima.wemedia.mapper.WmUserMapper;
  20. import com.heima.wemedia.service.WmNewsAutoScanService;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.apache.commons.lang3.StringUtils;
  23. import org.springframework.beans.BeanUtils;
  24. import org.springframework.beans.factory.annotation.Autowired;
  25. import org.springframework.scheduling.annotation.Async;
  26. import org.springframework.stereotype.Service;
  27. import org.springframework.transaction.annotation.Transactional;
  28. import javax.imageio.ImageIO;
  29. import java.awt.image.BufferedImage;
  30. import java.io.ByteArrayInputStream;
  31. import java.util.*;
  32. import java.util.stream.Collectors;
  33. @Service
  34. @Slf4j
  35. @Transactional
  36. public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {
  37. @Autowired
  38. private WmNewsMapper wmNewsMapper;
  39. /**
  40. * 自媒体文章审核
  41. *
  42. * @param id 自媒体文章id
  43. */
  44. @Override
  45. @Async //标明当前方法是一个异步方法
  46. public void autoScanWmNews(Integer id) {
  47. // int a = 1/0;
  48. //1.查询自媒体文章
  49. WmNews wmNews = wmNewsMapper.selectById(id);
  50. if (wmNews == null) {
  51. throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
  52. }
  53. if (wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())) {
  54. //从内容中提取纯文本内容和图片
  55. Map<String, Object> textAndImages = handleTextAndImages(wmNews);
  56. //自管理的敏感词过滤
  57. boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
  58. if(!isSensitive) return;
  59. //2.审核文本内容 阿里云接口
  60. boolean isTextScan = handleTextScan((String) textAndImages.get("content"), wmNews);
  61. if (!isTextScan) return;
  62. //3.审核图片 阿里云接口
  63. boolean isImageScan = handleImageScan((List<String>) textAndImages.get("images"), wmNews);
  64. if (!isImageScan) return;
  65. //4.审核成功,保存app端的相关的文章数据
  66. ResponseResult responseResult = saveAppArticle(wmNews);
  67. if (!responseResult.getCode().equals(200)) {
  68. throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
  69. }
  70. //回填article_id
  71. wmNews.setArticleId((Long) responseResult.getData());
  72. updateWmNews(wmNews, (short) 9, "审核成功");
  73. }
  74. }
  75. @Autowired
  76. private WmSensitiveMapper wmSensitiveMapper;
  77. /**
  78. * 自管理的敏感词审核
  79. * @param content
  80. * @param wmNews
  81. * @return
  82. */
  83. private boolean handleSensitiveScan(String content, WmNews wmNews) {
  84. boolean flag = true;
  85. //获取所有的敏感词
  86. List<WmSensitive> wmSensitives = wmSensitiveMapper.selectList(Wrappers.<WmSensitive>lambdaQuery().select(WmSensitive::getSensitives));
  87. List<String> sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());
  88. //初始化敏感词库
  89. SensitiveWordUtil.initMap(sensitiveList);
  90. //查看文章中是否包含敏感词
  91. Map<String, Integer> map = SensitiveWordUtil.matchWords(content);
  92. if(map.size() >0){
  93. updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
  94. flag = false;
  95. }
  96. return flag;
  97. }
  98. @Autowired
  99. private IArticleClient articleClient;
  100. @Autowired
  101. private WmChannelMapper wmChannelMapper;
  102. @Autowired
  103. private WmUserMapper wmUserMapper;
  104. /**
  105. * 保存app端相关的文章数据
  106. *
  107. * @param wmNews
  108. */
  109. private ResponseResult saveAppArticle(WmNews wmNews) {
  110. ArticleDto dto = new ArticleDto();
  111. //属性的拷贝
  112. BeanUtils.copyProperties(wmNews, dto);
  113. //文章的布局
  114. dto.setLayout(wmNews.getType());
  115. //频道
  116. WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
  117. if (wmChannel != null) {
  118. dto.setChannelName(wmChannel.getName());
  119. }
  120. //作者
  121. dto.setAuthorId(wmNews.getUserId().longValue());
  122. WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
  123. if (wmUser != null) {
  124. dto.setAuthorName(wmUser.getName());
  125. }
  126. //设置文章id
  127. if (wmNews.getArticleId() != null) {
  128. dto.setId(wmNews.getArticleId());
  129. }
  130. dto.setCreatedTime(new Date());
  131. ResponseResult responseResult = articleClient.saveArticle(dto);
  132. return responseResult;
  133. }
  134. @Autowired
  135. private FileStorageService fileStorageService;
  136. @Autowired
  137. private GreenImageScan greenImageScan;
  138. @Autowired
  139. private Tess4jClient tess4jClient;
  140. /**
  141. * 审核图片
  142. *
  143. * @param images
  144. * @param wmNews
  145. * @return
  146. */
  147. private boolean handleImageScan(List<String> images, WmNews wmNews) {
  148. boolean flag = true;
  149. if (images == null || images.size() == 0) {
  150. return flag;
  151. }
  152. //下载图片 minIO
  153. //图片去重
  154. images = images.stream().distinct().collect(Collectors.toList());
  155. List<byte[]> imageList = new ArrayList<>();
  156. try {
  157. for (String image : images) {
  158. byte[] bytes = fileStorageService.downLoadFile(image);
  159. //图片识别文字审核---begin-----
  160. //从byte[]转换为butteredImage
  161. ByteArrayInputStream in = new ByteArrayInputStream(bytes);
  162. BufferedImage imageFile = ImageIO.read(in);
  163. //识别图片的文字
  164. String result = tess4jClient.doOCR(imageFile);
  165. //审核是否包含自管理的敏感词
  166. boolean isSensitive = handleSensitiveScan(result, wmNews);
  167. if(!isSensitive){
  168. return isSensitive;
  169. }
  170. //图片识别文字审核---end-----
  171. imageList.add(bytes);
  172. }
  173. }catch (Exception e){
  174. e.printStackTrace();
  175. }
  176. //审核图片
  177. try {
  178. Map map = greenImageScan.imageScan(imageList);
  179. if (map != null) {
  180. //审核失败
  181. if (map.get("suggestion").equals("block")) {
  182. flag = false;
  183. updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
  184. }
  185. //不确定信息 需要人工审核
  186. if (map.get("suggestion").equals("review")) {
  187. flag = false;
  188. updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
  189. }
  190. }
  191. } catch (Exception e) {
  192. flag = false;
  193. e.printStackTrace();
  194. }
  195. return flag;
  196. }
  197. @Autowired
  198. private GreenTextScan greenTextScan;
  199. /**
  200. * 审核纯文本内容
  201. *
  202. * @param content
  203. * @param wmNews
  204. * @return
  205. */
  206. private boolean handleTextScan(String content, WmNews wmNews) {
  207. boolean flag = true;
  208. if ((wmNews.getTitle() + "-" + content).length() == 0) {
  209. return flag;
  210. }
  211. try {
  212. Map map = greenTextScan.greeTextScan((wmNews.getTitle() + "-" + content));
  213. if (map != null) {
  214. //审核失败
  215. if (map.get("suggestion").equals("block")) {
  216. flag = false;
  217. updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
  218. }
  219. //不确定信息 需要人工审核
  220. if (map.get("suggestion").equals("review")) {
  221. flag = false;
  222. updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
  223. }
  224. }
  225. } catch (Exception e) {
  226. flag = false;
  227. e.printStackTrace();
  228. }
  229. return flag;
  230. }
  231. /**
  232. * 修改文章内容
  233. *
  234. * @param wmNews
  235. * @param status
  236. * @param reason
  237. */
  238. private void updateWmNews(WmNews wmNews, short status, String reason) {
  239. wmNews.setStatus(status);
  240. wmNews.setReason(reason);
  241. wmNewsMapper.updateById(wmNews);
  242. }
  243. /**
  244. * 1。从自媒体文章的内容中提取文本和图片
  245. * 2.提取文章的封面图片
  246. *
  247. * @param wmNews
  248. * @return
  249. */
  250. private Map<String, Object> handleTextAndImages(WmNews wmNews) {
  251. //存储纯文本内容
  252. StringBuilder stringBuilder = new StringBuilder();
  253. List<String> images = new ArrayList<>();
  254. //1。从自媒体文章的内容中提取文本和图片
  255. if (StringUtils.isNotBlank(wmNews.getContent())) {
  256. List<Map> maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
  257. for (Map map : maps) {
  258. if (map.get("type").equals("text")) {
  259. stringBuilder.append(map.get("value"));
  260. }
  261. if (map.get("type").equals("image")) {
  262. images.add((String) map.get("value"));
  263. }
  264. }
  265. }
  266. //2.提取文章的封面图片
  267. if (StringUtils.isNotBlank(wmNews.getImages())) {
  268. String[] split = wmNews.getImages().split(",");
  269. images.addAll(Arrays.asList(split));
  270. }
  271. Map<String, Object> resultMap = new HashMap<>();
  272. resultMap.put("content", stringBuilder.toString());
  273. resultMap.put("images", images);
  274. return resultMap;
  275. }
  276. }

9)文章详情-静态文件生成

9.1)思路分析

文章端创建app相关文章时,生成文章详情静态页上传到MinIO中

9.2)实现步骤

1.新建ArticleFreemarkerService创建静态文件并上传到minIO中

  1. package com.heima.article.service;
  2. import com.heima.model.article.pojos.ApArticle;
  3. public interface ArticleFreemarkerService {
  4. /**
  5. * 生成静态文件上传到minIO中
  6. * @param apArticle
  7. * @param content
  8. */
  9. public void buildArticleToMinIO(ApArticle apArticle,String content);
  10. }

实现

  1. package com.heima.article.service.impl;
  2. import java.util.Map;
  3. @Service
  4. @Slf4j
  5. @Transactional
  6. public class ArticleFreemarkerServiceImpl implements ArticleFreemarkerService {
  7. @Autowired
  8. private ApArticleContentMapper apArticleContentMapper;
  9. @Autowired
  10. private Configuration configuration;
  11. @Autowired
  12. private FileStorageService fileStorageService;
  13. @Autowired
  14. private ApArticleService apArticleService;
  15. /**
  16. * 生成静态文件上传到minIO中
  17. * @param apArticle
  18. * @param content
  19. */
  20. @Async
  21. @Override
  22. public void buildArticleToMinIO(ApArticle apArticle, String content) {
  23. //已知文章的id
  24. //4.1 获取文章内容
  25. if(StringUtils.isNotBlank(content)){
  26. //4.2 文章内容通过freemarker生成html文件
  27. Template template = null;
  28. StringWriter out = new StringWriter();
  29. try {
  30. template = configuration.getTemplate("article.ftl");
  31. //数据模型
  32. Map<String,Object> contentDataModel = new HashMap<>();
  33. contentDataModel.put("content", JSONArray.parseArray(content));
  34. //合成
  35. template.process(contentDataModel,out);
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. //4.3 把html文件上传到minio中
  40. InputStream in = new ByteArrayInputStream(out.toString().getBytes());
  41. String path = fileStorageService.uploadHtmlFile("", apArticle.getId() + ".html", in);
  42. //4.4 修改ap_article表,保存static_url字段
  43. apArticleService.update(Wrappers.<ApArticle>lambdaUpdate().eq(ApArticle::getId,apArticle.getId())
  44. .set(ApArticle::getStaticUrl,path));
  45. }
  46. }
  47. }

2.在ApArticleService的saveArticle实现方法中添加调用生成文件的方法

  1. /**
  2. * 保存app端相关文章
  3. * @param dto
  4. * @return
  5. */
  6. @Override
  7. public ResponseResult saveArticle(ArticleDto dto) {
  8. // try {
  9. // Thread.sleep(3000);
  10. // } catch (InterruptedException e) {
  11. // e.printStackTrace();
  12. // }
  13. //1.检查参数
  14. if(dto == null){
  15. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  16. }
  17. ApArticle apArticle = new ApArticle();
  18. BeanUtils.copyProperties(dto,apArticle);
  19. //2.判断是否存在id
  20. if(dto.getId() == null){
  21. //2.1 不存在id 保存 文章 文章配置 文章内容
  22. //保存文章
  23. save(apArticle);
  24. //保存配置
  25. ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId());
  26. apArticleConfigMapper.insert(apArticleConfig);
  27. //保存 文章内容
  28. ApArticleContent apArticleContent = new ApArticleContent();
  29. apArticleContent.setArticleId(apArticle.getId());
  30. apArticleContent.setContent(dto.getContent());
  31. apArticleContentMapper.insert(apArticleContent);
  32. }else {
  33. //2.2 存在id 修改 文章 文章内容
  34. //修改 文章
  35. updateById(apArticle);
  36. //修改文章内容
  37. ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.<ApArticleContent>lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId()));
  38. apArticleContent.setContent(dto.getContent());
  39. apArticleContentMapper.updateById(apArticleContent);
  40. }
  41. //异步调用 生成静态文件上传到minio中
  42. articleFreemarkerService.buildArticleToMinIO(apArticle,dto.getContent());
  43. //3.结果返回 文章的id
  44. return ResponseResult.okResult(apArticle.getId());
  45. }

3.文章微服务开启异步调用


05延迟任务精准发布文章

1)文章定时发布

2)延迟任务概述

2.1)什么是延迟任务
  • 定时任务:有固定周期的,有明确的触发时间

  • 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟

应用场景:

场景一:

订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消

场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止

2.2)技术对比
2.2.1)DelayQueue

JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素

DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法

getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列

compareTo方法:用于排序,确定元素出队列的顺序

实现:

1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,

2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,

3:循环的从延迟队列中拉取任务

  1. public class DelayedTask implements Delayed{
  2. // 任务的执行时间
  3. private int executeTime = 0;
  4. public DelayedTask(int delay){
  5. Calendar calendar = Calendar.getInstance();
  6. calendar.add(Calendar.SECOND,delay);
  7. this.executeTime = (int)(calendar.getTimeInMillis() /1000 );
  8. }
  9. /**
  10. * 元素在队列中的剩余时间
  11. * @param unit
  12. * @return
  13. */
  14. @Override
  15. public long getDelay(TimeUnit unit) {
  16. Calendar calendar = Calendar.getInstance();
  17. return executeTime - (calendar.getTimeInMillis()/1000);
  18. }
  19. /**
  20. * 元素排序
  21. * @param o
  22. * @return
  23. */
  24. @Override
  25. public int compareTo(Delayed o) {
  26. long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
  27. return val == 0 ? 0 : ( val < 0 ? -1: 1 );
  28. }
  29. public static void main(String[] args) {
  30. DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
  31. queue.add(new DelayedTask(5));
  32. queue.add(new DelayedTask(10));
  33. queue.add(new DelayedTask(15));
  34. System.out.println(System.currentTimeMillis()/1000+" start consume ");
  35. while(queue.size() != 0){
  36. DelayedTask delayedTask = queue.poll();
  37. if(delayedTask !=null ){
  38. System.out.println(System.currentTimeMillis()/1000+" cosume task");
  39. }
  40. //每隔一秒消费一次
  41. try {
  42. Thread.sleep(1000);
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }
  48. }

DelayQueue实现完成之后思考一个问题:

使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)

2.2.2)RabbitMQ实现延迟任务
  • TTL:Time To Live (消息存活时间)

  • 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)

2.2.3)redis实现

zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序

3)redis实现延迟任务

实现思路

问题思路

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑

2.为什么redis中使用两种数据类型,list和zset?

效率问题,算法的时间复杂度; list是双向链表

3.在添加zset数据的时候,为什么不需要预加载?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止zset阻塞,只需要把未来几分钟要执行的数据存入缓存即可

锐评:完全为了学list zset而编出来的场景,实际工作中延迟队列要设计成这样只能说太蠢了

实际工作绝对用MQ

4)延迟任务服务实现

4.1)搭建heima-leadnews-schedule模块

leadnews-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务

①:导入资料文件夹的heima-leadnews-schedule模块到heima-leadnews-service下,如下图所示:

②:添加bootstrap.yml

  1. server:
  2. port: 51701
  3. spring:
  4. application:
  5. name: leadnews-schedule
  6. cloud:
  7. nacos:
  8. discovery:
  9. server-addr: 192.168.200.130:8848
  10. config:
  11. server-addr: 192.168.200.130:8848
  12. file-extension: yml

③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置

  1. spring:
  2. datasource:
  3. driver-class-name: com.mysql.jdbc.Driver
  4. url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
  5. username: root
  6. password: root
  7. # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
  8. mybatis-plus:
  9. mapper-locations: classpath*:mapper/*.xml
  10. # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  11. type-aliases-package: com.heima.model.schedule.pojos
4.2)数据库准备

导入资料中leadnews_schedule数据库

taskinfo 任务表

实体类

  1. package com.heima.model.schedule.pojos;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. /**
  5. * <p>
  6. *
  7. * </p>
  8. *
  9. * @author itheima
  10. */
  11. @Data
  12. @TableName("taskinfo")
  13. public class Taskinfo implements Serializable {
  14. private static final long serialVersionUID = 1L;
  15. /**
  16. * 任务id
  17. */
  18. @TableId(type = IdType.ID_WORKER)
  19. private Long taskId;
  20. /**
  21. * 执行时间
  22. */
  23. @TableField("execute_time")
  24. private Date executeTime;
  25. /**
  26. * 参数
  27. */
  28. @TableField("parameters")
  29. private byte[] parameters;
  30. /**
  31. * 优先级
  32. */
  33. @TableField("priority")
  34. private Integer priority;
  35. /**
  36. * 任务类型
  37. */
  38. @TableField("task_type")
  39. private Integer taskType;
  40. }

taskinfo_logs 任务日志表

实体类

  1. package com.heima.model.schedule.pojos;
  2. import java.io.Serializable;
  3. import java.util.Date;
  4. /**
  5. * <p>
  6. *
  7. * </p>
  8. *
  9. * @author itheima
  10. */
  11. @Data
  12. @TableName("taskinfo_logs")
  13. public class TaskinfoLogs implements Serializable {
  14. private static final long serialVersionUID = 1L;
  15. /**
  16. * 任务id
  17. */
  18. @TableId(type = IdType.ID_WORKER)
  19. private Long taskId;
  20. /**
  21. * 执行时间
  22. */
  23. @TableField("execute_time")
  24. private Date executeTime;
  25. /**
  26. * 参数
  27. */
  28. @TableField("parameters")
  29. private byte[] parameters;
  30. /**
  31. * 优先级
  32. */
  33. @TableField("priority")
  34. private Integer priority;
  35. /**
  36. * 任务类型
  37. */
  38. @TableField("task_type")
  39. private Integer taskType;
  40. /**
  41. * 版本号,用乐观锁
  42. */
  43. @Version
  44. private Integer version;
  45. /**
  46. * 状态 0=int 1=EXECUTED 2=CANCELLED
  47. */
  48. @TableField("status")
  49. private Integer status;
  50. }
乐观锁/悲观锁

悲观锁效率低;

乐观锁支持:

  1. /**
  2. * mybatis-plus乐观锁支持
  3. * @return
  4. */
  5. @Bean
  6. public MybatisPlusInterceptor optimisticLockerInterceptor(){
  7. MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
  8. interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
  9. return interceptor;
  10. }
4.3)安装redis

①拉取镜像

docker pull redis

② 创建容器

docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

③链接测试

打开资料中的Redis Desktop Manager,输入host、port、password链接测试

能链接成功,即可

4.4)项目集成redis

① 在项目导入redis相关依赖,已经完成

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>
  5. <!-- redis依赖commons-pool 这个依赖一定要添加 -->
  6. <dependency>
  7. <groupId>org.apache.commons</groupId>
  8. <artifactId>commons-pool2</artifactId>
  9. </dependency>

② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis

  1. spring:
  2. redis:
  3. host: 192.168.200.130
  4. password: leadnews
  5. port: 6379

③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置

④:测试

  1. package com.heima.schedule.test;
  2. import java.util.Set;
  3. @SpringBootTest(classes = ScheduleApplication.class)
  4. @RunWith(SpringRunner.class)
  5. public class RedisTest {
  6. @Autowired
  7. private CacheService cacheService;
  8. @Test
  9. public void testList(){
  10. //在list的左边添加元素
  11. // cacheService.lLeftPush("list_001","hello,redis");
  12. //在list的右边获取元素,并删除
  13. String list_001 = cacheService.lRightPop("list_001");
  14. System.out.println(list_001);
  15. }
  16. @Test
  17. public void testZset(){
  18. //添加数据到zset中 分值
  19. /*cacheService.zAdd("zset_key_001","hello zset 001",1000);
  20. cacheService.zAdd("zset_key_001","hello zset 002",8888);
  21. cacheService.zAdd("zset_key_001","hello zset 003",7777);
  22. cacheService.zAdd("zset_key_001","hello zset 004",999999);*/
  23. //按照分值获取数据
  24. Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
  25. System.out.println(zset_key_001);
  26. }
  27. }
4.5)添加任务

①:拷贝mybatis-plus生成的文件,mapper

②:创建task类,用于接收添加任务的参数

  1. package com.heima.model.schedule.dtos;
  2. import lombok.Data;
  3. import java.io.Serializable;
  4. @Data
  5. public class Task implements Serializable {
  6. /**
  7. * 任务id
  8. */
  9. private Long taskId;
  10. /**
  11. * 类型
  12. */
  13. private Integer taskType;
  14. /**
  15. * 优先级
  16. */
  17. private Integer priority;
  18. /**
  19. * 执行id
  20. */
  21. private long executeTime;
  22. /**
  23. * task参数
  24. */
  25. private byte[] parameters;
  26. }

③:创建TaskService

  1. package com.heima.schedule.service;
  2. import com.heima.model.schedule.dtos.Task;
  3. /**
  4. * 对外访问接口
  5. */
  6. public interface TaskService {
  7. /**
  8. * 添加任务
  9. * @param task 任务对象
  10. * @return 任务id
  11. */
  12. public long addTask(Task task) ;
  13. }

实现:

  1. package com.heima.schedule.service.impl;
  2. import java.util.Calendar;
  3. import java.util.Date;
  4. @Service
  5. @Transactional
  6. @Slf4j
  7. public class TaskServiceImpl implements TaskService {
  8. /**
  9. * 添加延迟任务
  10. *
  11. * @param task
  12. * @return
  13. */
  14. @Override
  15. public long addTask(Task task) {
  16. //1.添加任务到数据库中
  17. boolean success = addTaskToDb(task);
  18. if (success) {
  19. //2.添加任务到redis
  20. addTaskToCache(task);
  21. }
  22. return task.getTaskId();
  23. }
  24. @Autowired
  25. private CacheService cacheService;
  26. /**
  27. * 把任务添加到redis中
  28. *
  29. * @param task
  30. */
  31. private void addTaskToCache(Task task) {
  32. String key = task.getTaskType() + "_" + task.getPriority();
  33. //获取5分钟之后的时间 毫秒值
  34. Calendar calendar = Calendar.getInstance();
  35. calendar.add(Calendar.MINUTE, 5);
  36. long nextScheduleTime = calendar.getTimeInMillis();
  37. //2.1 如果任务的执行时间小于等于当前时间,存入list
  38. if (task.getExecuteTime() <= System.currentTimeMillis()) {
  39. cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
  40. } else if (task.getExecuteTime() <= nextScheduleTime) {
  41. //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
  42. cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
  43. }
  44. }
  45. @Autowired
  46. private TaskinfoMapper taskinfoMapper;
  47. @Autowired
  48. private TaskinfoLogsMapper taskinfoLogsMapper;
  49. /**
  50. * 添加任务到数据库中
  51. *
  52. * @param task
  53. * @return
  54. */
  55. private boolean addTaskToDb(Task task) {
  56. boolean flag = false;
  57. try {
  58. //保存任务表
  59. Taskinfo taskinfo = new Taskinfo();
  60. BeanUtils.copyProperties(task, taskinfo);
  61. taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
  62. taskinfoMapper.insert(taskinfo);
  63. //设置taskID
  64. task.setTaskId(taskinfo.getTaskId());
  65. //保存任务日志数据
  66. TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
  67. BeanUtils.copyProperties(taskinfo, taskinfoLogs);
  68. taskinfoLogs.setVersion(1);
  69. taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
  70. taskinfoLogsMapper.insert(taskinfoLogs);
  71. flag = true;
  72. } catch (Exception e) {
  73. e.printStackTrace();
  74. }
  75. return flag;
  76. }
  77. }

ScheduleConstants常量类

  1. package com.heima.common.constants;
  2. public class ScheduleConstants {
  3. //task状态
  4. public static final int SCHEDULED=0; //初始化状态
  5. public static final int EXECUTED=1; //已执行状态
  6. public static final int CANCELLED=2; //已取消状态
  7. public static String FUTURE="future_"; //未来数据key前缀
  8. public static String TOPIC="topic_"; //当前数据key前缀
  9. }

④:测试

4.6)取消任务

在TaskService中添加方法

  1. /**
  2. * 取消任务
  3. * @param taskId 任务id
  4. * @return 取消结果
  5. */
  6. public boolean cancelTask(long taskId);

实现

  1. /**
  2. * 取消任务
  3. * @param taskId
  4. * @return
  5. */
  6. @Override
  7. public boolean cancelTask(long taskId) {
  8. boolean flag = false;
  9. //删除任务,更新日志
  10. Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
  11. //删除redis的数据
  12. if(task != null){
  13. removeTaskFromCache(task);
  14. flag = true;
  15. }
  16. return false;
  17. }
  18. /**
  19. * 删除redis中的任务数据
  20. * @param task
  21. */
  22. private void removeTaskFromCache(Task task) {
  23. String key = task.getTaskType()+"_"+task.getPriority();
  24. if(task.getExecuteTime()<=System.currentTimeMillis()){
  25. cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
  26. }else {
  27. cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
  28. }
  29. }
  30. /**
  31. * 删除任务,更新任务日志状态
  32. * @param taskId
  33. * @param status
  34. * @return
  35. */
  36. private Task updateDb(long taskId, int status) {
  37. Task task = null;
  38. try {
  39. //删除任务
  40. taskinfoMapper.deleteById(taskId);
  41. TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
  42. taskinfoLogs.setStatus(status);
  43. taskinfoLogsMapper.updateById(taskinfoLogs);
  44. task = new Task();
  45. BeanUtils.copyProperties(taskinfoLogs,task);
  46. task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
  47. }catch (Exception e){
  48. log.error("task cancel exception taskid={}",taskId);
  49. }
  50. return task;
  51. }

测试

4.7)消费任务

在TaskService中添加方法

  1. /**
  2. * 按照类型和优先级来拉取任务
  3. * @param type
  4. * @param priority
  5. * @return
  6. */
  7. public Task poll(int type,int priority);

实现

  1. /**
  2. * 按照类型和优先级拉取任务
  3. * @return
  4. */
  5. @Override
  6. public Task poll(int type,int priority) {
  7. Task task = null;
  8. try {
  9. String key = type+"_"+priority;
  10. String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
  11. if(StringUtils.isNotBlank(task_json)){
  12. task = JSON.parseObject(task_json, Task.class);
  13. //更新数据库信息
  14. updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
  15. }
  16. }catch (Exception e){
  17. e.printStackTrace();
  18. log.error("poll task exception");
  19. }
  20. return task;
  21. }
4.8)未来数据定时刷新
4.8.1)reids key值匹配

方案1:keys 模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

方案2:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

代码案例:

  1. @Test
  2. public void testKeys(){
  3. Set<String> keys = cacheService.keys("future_*");
  4. System.out.println(keys);
  5. Set<String> scan = cacheService.scan("future_*");
  6. System.out.println(scan);
  7. }
4.8.2)reids管道

普通redis客户端和服务器交互模式 性能很低

Pipeline请求模型

官方测试结果数据对比

测试案例对比:

  1. //耗时6151
  2. @Test
  3. public void testPiple1(){
  4. long start =System.currentTimeMillis();
  5. for (int i = 0; i <10000 ; i++) {
  6. Task task = new Task();
  7. task.setTaskType(1001);
  8. task.setPriority(1);
  9. task.setExecuteTime(new Date().getTime());
  10. cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
  11. }
  12. System.out.println("耗时"+(System.currentTimeMillis()- start));
  13. }
  14. @Test
  15. public void testPiple2(){
  16. long start = System.currentTimeMillis();
  17. //使用管道技术
  18. List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
  19. @Nullable
  20. @Override
  21. public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
  22. for (int i = 0; i <10000 ; i++) {
  23. Task task = new Task();
  24. task.setTaskType(1001);
  25. task.setPriority(1);
  26. task.setExecuteTime(new Date().getTime());
  27. redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
  28. }
  29. return null;
  30. }
  31. });
  32. System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
  33. }
4.8.3)未来数据定时刷新-功能完成

在TaskService中添加方法

  1. @Scheduled(cron = "0 */1 * * * ?")//定时 (每分钟执行一次
  2. //{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
  3. public void refresh() {
  4. System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
  5. // 获取所有未来数据集合的key值
  6. Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
  7. for (String futureKey : futureKeys) { // future_250_250
  8. String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
  9. //获取该组key下当前需要消费的任务数据
  10. Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  11. if (!tasks.isEmpty()) {
  12. //将这些任务数据添加到消费者队列中
  13. cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
  14. System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
  15. }
  16. }
  17. }

引导类中添加开启任务调度注解:@EnableScheduling

4.9)分布式锁解决集群下的方法抢占执行
4.9.1)问题描述

启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法

4.9.2)分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性

解决方案:

4.9.3)redis分布式锁

sexnx (SET if Not eXists)命令在指定的 key 不存在时,为 key 设置指定的值

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功

  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败

  • 客户端A执行代码完成,删除锁

  • 客户端B在等待一段时间后再去请求设置key的值,设置成功

  • 客户端B执行代码完成,删除锁

4.9.4)在工具类CacheService中添加方法
  1. /**
  2. * 加锁
  3. *
  4. * @param name
  5. * @param expire
  6. * @return
  7. */
  8. public String tryLock(String name, long expire) {
  9. name = name + "_lock";
  10. String token = UUID.randomUUID().toString();
  11. RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
  12. RedisConnection conn = factory.getConnection();
  13. try {
  14. //参考redis命令:
  15. //set key value [EX seconds] [PX milliseconds] [NX|XX]
  16. Boolean result = conn.set(
  17. name.getBytes(),
  18. token.getBytes(),
  19. Expiration.from(expire, TimeUnit.MILLISECONDS),
  20. RedisStringCommands.SetOption.SET_IF_ABSENT //NX
  21. );
  22. if (result != null && result)
  23. return token;
  24. } finally {
  25. RedisConnectionUtils.releaseConnection(conn, factory,false);
  26. }
  27. return null;
  28. }

修改未来数据定时刷新的方法,如下:

  1. /**
  2. * 未来数据定时刷新
  3. */
  4. @Scheduled(cron = "0 */1 * * * ?")
  5. public void refresh(){
  6. String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
  7. if(StringUtils.isNotBlank(token)){
  8. log.info("未来数据定时刷新---定时任务");
  9. //获取所有未来数据的集合key
  10. Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
  11. for (String futureKey : futureKeys) {//future_100_50
  12. //获取当前数据的key topic
  13. String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
  14. //按照key和分值查询符合条件的数据
  15. Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  16. //同步数据
  17. if(!tasks.isEmpty()){
  18. cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
  19. log.info("成功的将"+futureKey+"刷新到了"+topicKey);
  20. }
  21. }
  22. }
  23. }
4.10)数据库同步到redis
  1. @Scheduled(cron = "0 */5 * * * ?")
  2. @PostConstruct
  3. public void reloadData() {
  4. clearCache();
  5. log.info("数据库数据同步到缓存");
  6. Calendar calendar = Calendar.getInstance();
  7. calendar.add(Calendar.MINUTE, 5);
  8. //查看小于未来5分钟的所有任务
  9. List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
  10. if(allTasks != null && allTasks.size() > 0){
  11. for (Taskinfo taskinfo : allTasks) {
  12. Task task = new Task();
  13. BeanUtils.copyProperties(taskinfo,task);
  14. task.setExecuteTime(taskinfo.getExecuteTime().getTime());
  15. addTaskToCache(task);
  16. }
  17. }
  18. }
  19. private void clearCache(){
  20. // 删除缓存中未来数据集合和当前消费者队列的所有key
  21. Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
  22. Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
  23. cacheService.delete(futurekeys);
  24. cacheService.delete(topickeys);
  25. }

5)延迟队列解决精准时间发布文章

5.1)延迟队列服务提供对外接口

提供远程的feign接口,在heima-leadnews-feign-api编写类如下:

  1. package com.heima.apis.schedule;
  2. import org.springframework.web.bind.annotation.RequestBody;
  3. @FeignClient("leadnews-schedule")
  4. public interface IScheduleClient {
  5. /**
  6. * 添加任务
  7. * @param task 任务对象
  8. * @return 任务id
  9. */
  10. @PostMapping("/api/v1/task/add")
  11. public ResponseResult addTask(@RequestBody Task task);
  12. /**
  13. * 取消任务
  14. * @param taskId 任务id
  15. * @return 取消结果
  16. */
  17. @GetMapping("/api/v1/task/cancel/{taskId}")
  18. public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
  19. /**
  20. * 按照类型和优先级来拉取任务
  21. * @param type
  22. * @param priority
  23. * @return
  24. */
  25. @GetMapping("/api/v1/task/poll/{type}/{priority}")
  26. public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority);
  27. }

在heima-leadnews-schedule微服务下提供对应的实现

  1. package com.heima.schedule.feign;
  2. import org.springframework.web.bind.annotation.*;
  3. @RestController
  4. public class ScheduleClient implements IScheduleClient {
  5. @Autowired
  6. private TaskService taskService;
  7. /**
  8. * 添加任务
  9. * @param task 任务对象
  10. * @return 任务id
  11. */
  12. @PostMapping("/api/v1/task/add")
  13. @Override
  14. public ResponseResult addTask(@RequestBody Task task) {
  15. return ResponseResult.okResult(taskService.addTask(task));
  16. }
  17. /**
  18. * 取消任务
  19. * @param taskId 任务id
  20. * @return 取消结果
  21. */
  22. @GetMapping("/api/v1/task/cancel/{taskId}")
  23. @Override
  24. public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
  25. return ResponseResult.okResult(taskService.cancelTask(taskId));
  26. }
  27. /**
  28. * 按照类型和优先级来拉取任务
  29. * @param type
  30. * @param priority
  31. * @return
  32. */
  33. @GetMapping("/api/v1/task/poll/{type}/{priority}")
  34. @Override
  35. public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
  36. return ResponseResult.okResult(taskService.poll(type,priority));
  37. }
  38. }
5.2)发布文章集成添加延迟队列接口

在创建WmNewsTaskService

  1. package com.heima.wemedia.service;
  2. import com.heima.model.wemedia.pojos.WmNews;
  3. public interface WmNewsTaskService {
  4. /**
  5. * 添加任务到延迟队列中
  6. * @param id 文章的id
  7. * @param publishTime 发布的时间 可以做为任务的执行时间
  8. */
  9. public void addNewsToTask(Integer id, Date publishTime);
  10. }

实现:

  1. package com.heima.wemedia.service.impl;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. @Slf4j
  5. public class WmNewsTaskServiceImpl implements WmNewsTaskService {
  6. @Autowired
  7. private IScheduleClient scheduleClient;
  8. /**
  9. * 添加任务到延迟队列中
  10. * @param id 文章的id
  11. * @param publishTime 发布的时间 可以做为任务的执行时间
  12. */
  13. @Override
  14. @Async
  15. public void addNewsToTask(Integer id, Date publishTime) {
  16. log.info("添加任务到延迟服务中----begin");
  17. Task task = new Task();
  18. task.setExecuteTime(publishTime.getTime());
  19. task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
  20. task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  21. WmNews wmNews = new WmNews();
  22. wmNews.setId(id);
  23. task.setParameters(ProtostuffUtil.serialize(wmNews));
  24. scheduleClient.addTask(task);
  25. log.info("添加任务到延迟服务中----end");
  26. }
  27. }

枚举类:

  1. package com.heima.model.common.enums;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Getter;
  4. @Getter
  5. @AllArgsConstructor
  6. public enum TaskTypeEnum {
  7. NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
  8. REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
  9. private final int taskType; //对应具体业务
  10. private final int priority; //业务不同级别
  11. private final String desc; //描述信息
  12. }
序列化工具对比
  • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组

  • Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo

拷贝资料中的两个类到heima-leadnews-utils下

Protostuff需要引导依赖:

  1. <dependency>
  2. <groupId>io.protostuff</groupId>
  3. <artifactId>protostuff-core</artifactId>
  4. <version>1.6.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>io.protostuff</groupId>
  8. <artifactId>protostuff-runtime</artifactId>
  9. <version>1.6.0</version>
  10. </dependency>

修改发布文章代码:

把之前的异步调用修改为调用延迟任务

  1. @Autowired
  2. private WmNewsTaskService wmNewsTaskService;
  3. /**
  4. * 发布修改文章或保存为草稿
  5. * @param dto
  6. * @return
  7. */
  8. @Override
  9. public ResponseResult submitNews(WmNewsDto dto) {
  10. //0.条件判断
  11. if(dto == null || dto.getContent() == null){
  12. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  13. }
  14. //1.保存或修改文章
  15. WmNews wmNews = new WmNews();
  16. //属性拷贝 属性名词和类型相同才能拷贝
  17. BeanUtils.copyProperties(dto,wmNews);
  18. //封面图片 list---> string
  19. if(dto.getImages() != null && dto.getImages().size() > 0){
  20. //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg
  21. String imageStr = StringUtils.join(dto.getImages(), ",");
  22. wmNews.setImages(imageStr);
  23. }
  24. //如果当前封面类型为自动 -1
  25. if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
  26. wmNews.setType(null);
  27. }
  28. saveOrUpdateWmNews(wmNews);
  29. //2.判断是否为草稿 如果为草稿结束当前方法
  30. if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
  31. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  32. }
  33. //3.不是草稿,保存文章内容图片与素材的关系
  34. //获取到文章内容中的图片信息
  35. List<String> materials = ectractUrlInfo(dto.getContent());
  36. saveRelativeInfoForContent(materials,wmNews.getId());
  37. //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
  38. saveRelativeInfoForCover(dto,wmNews,materials);
  39. //审核文章
  40. // wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  41. wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
  42. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  43. }
5.3)消费任务进行审核文章

WmNewsTaskService中添加方法


<code class="language-plaintext hljs">/**
 * 消费延迟队列数据
 */
public void scanNewsByTask();</code>

实现

  1. @Autowired
  2. private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
  3. /**
  4. * 消费延迟队列数据
  5. */
  6. @Scheduled(fixedRate = 1000)
  7. @Override
  8. @SneakyThrows
  9. public void scanNewsByTask() {
  10. log.info("文章审核---消费任务执行---begin---");
  11. ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  12. if(responseResult.getCode().equals(200) && responseResult.getData() != null){
  13. String json_str = JSON.toJSONString(responseResult.getData());
  14. Task task = JSON.parseObject(json_str, Task.class);
  15. byte[] parameters = task.getParameters();
  16. WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
  17. System.out.println(wmNews.getId()+"-----------");
  18. wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  19. }
  20. log.info("文章审核---消费任务执行---end---");
  21. }

在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling


06kafka及异步通知文章上下架

1)自媒体文章上下架

需求分析

2)kafka概述

消息中间件对比

消息中间件对比-选择建议

消息中间件

建议

Kafka

追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务

RocketMQ

可靠性要求很高的金互联网领域,稳定性高,经历了多次阿里双11考验

RabbitMQ

性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ

kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统

kafka官网:Apache Kafka

kafka介绍-名词解释

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

3)kafka安装配置

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

  • Docker安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安装kafka

下载镜像:

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

  1. docker run -d --name kafka \
  2. --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
  3. --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
  4. --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
  5. --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
  6. --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
  7. --net=host wurstmeister/kafka:2.12-2.3.1

云主机无法使用--net

4)kafka入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息

  • 生产者发送消息,多个消费者都可以接收到消息

(1)创建kafka-demo项目,导入依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. </dependency>

(2)生产者发送消息

  1. package com.heima.kafka.sample;
  2. import java.util.Properties;
  3. /**
  4. * 生产者
  5. */
  6. public class ProducerQuickStart {
  7. public static void main(String[] args) {
  8. //1.kafka的配置信息
  9. Properties properties = new Properties();
  10. //kafka的连接地址
  11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
  12. //发送失败,失败的重试次数
  13. properties.put(ProducerConfig.RETRIES_CONFIG,5);
  14. //消息key的序列化器
  15. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  16. //消息value的序列化器
  17. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  18. //2.生产者对象
  19. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
  20. //封装发送的消息
  21. ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
  22. //3.发送消息
  23. producer.send(record);
  24. //4.关闭消息通道,必须关闭,否则消息发送不成功
  25. producer.close();
  26. }
  27. }

(3)消费者接收消息

  1. package com.heima.kafka.sample;
  2. import java.util.Properties;
  3. /**
  4. * 消费者
  5. */
  6. public class ConsumerQuickStart {
  7. public static void main(String[] args) {
  8. //1.添加kafka的配置信息
  9. Properties properties = new Properties();
  10. //kafka的连接地址
  11. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
  12. //消费者组
  13. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
  14. //消息的反序列化器
  15. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  16. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  17. //2.消费者对象
  18. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
  19. //3.订阅主题
  20. consumer.subscribe(Collections.singletonList("itheima-topic"));
  21. //当前线程一直处于监听状态
  22. while (true) {
  23. //4.获取消息
  24. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
  25. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  26. System.out.println(consumerRecord.key());
  27. System.out.println(consumerRecord.value());
  28. }
  29. }
  30. }
  31. }

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)同一个组

  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)多个组

分区机制—topic剖析

5)kafka高可用设计

5.1)集群
  • Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成

  • 这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一

5.2)备份机制(Replication)

Kafka 中消息的备份又叫做 副本(Replica)

Kafka 定义了两类副本:

  • 领导者副本(Leader Replica)

  • 追随者副本(Follower Replica)

备份机制—同步方式

ISR(in-sync replica)需要同步复制保存的follower

如果leader失效后,需要选出新的leader,选举的原则如下:

第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步

第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取

极端情况,就是所有副本都失效了,这时有两种方案

第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定

第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整

6)kafka生产者详解

6.1)发送类型
  • 同步发送

使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

  1. RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
  2. System.out.println(recordMetadata.offset());
  • 异步发送

用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

  1. //异步消息发送
  2. producer.send(kvProducerRecord, new Callback() {
  3. @Override
  4. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  5. if(e != null){
  6. System.out.println("记录异常信息到日志表中");
  7. }
  8. System.out.println(recordMetadata.offset());
  9. }
  10. });
6.2)参数详解
  • ack确认机制

代码的配置方式:

  1. //ack配置 消息确认机制
  2. prop.put(ProducerConfig.ACKS_CONFIG,"all");

参数的选择说明

确认机制

说明

acks=0

生产者在成功写入消息之前不会等待(不需要)任何来自服务器的响应,消息有丢失的风险,但是速度最快

acks=1(默认值)

只要集群Leader节点收到消息,生产者就会收到一个来自服务器的成功响应

acks=all

只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应

  • retries 重试次数

生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

代码中配置方式:

  1. //重试次数
  2. prop.put(ProducerConfig.RETRIES_CONFIG,10);
  • 消息压缩

默认情况下, 消息发送时不会被压缩。

代码中配置方式:

  1. //数据压缩
  2. prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");

压缩算法

说明

snappy

占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用

lz4

占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观

gzip

占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

7)kafka消费者详解

7.1)消费者组
  • 消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体

  • 一个发布在Topic上消息被分发给此消费者组中的一个消费者

  • 所有的消费者都在一个组中,那么这就变成了queue模型 消息队列 一对一

  • 所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型 一对多消费者

7.2)消息有序性

应用场景:

  • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致

  • 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序

topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区

7.3)提交和偏移量

kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量)

消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡

正常的情况

如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费

再均衡后不可避免会出现一些问题

问题一:

如果提交偏移量2小于客户端处理的最后一个消息10的偏移量,那么处于两个偏移量之间的消息就会被重复处理

问题二:

如果提交的偏移量5大于客户端最后一个消息11的偏移量,那么处于两个偏移量之间的消息将会丢失

如果想要解决这些问题,还要知道目前kafka提交偏移量的方式

提交偏移量的方式有两种,分别是自动提交偏移量和手动提交

  • 自动提交偏移量

当enable.auto.commit被设置为true提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去

  • 手动提交 ,当enable.auto.commit被设置为false可以有以下三种提交方式

  • 提交当前偏移量(同步提交)

  • 异步提交

  • 同步和异步组合提交

1.提交当前偏移量(同步提交)

把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。

只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

  1. while (true){
  2. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  3. for (ConsumerRecord<String, String> record : records) {
  4. System.out.println(record.value());
  5. System.out.println(record.key());
  6. try {
  7. consumer.commitSync();//同步提交当前最新的偏移量
  8. }catch (CommitFailedException e){
  9. System.out.println("记录提交失败的异常:"+e);
  10. }
  11. }
  12. }
2.异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API :commitAsync()

  1. while (true){
  2. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  3. for (ConsumerRecord<String, String> record : records) {
  4. System.out.println(record.value());
  5. System.out.println(record.key());
  6. }
  7. consumer.commitAsync(new OffsetCommitCallback() {
  8. @Override
  9. public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
  10. if(e!=null){
  11. System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
  12. }
  13. }
  14. });
  15. }
3.同步和异步组合提交

异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试

相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖

举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费

  1. try {
  2. while (true){
  3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  4. for (ConsumerRecord<String, String> record : records) {
  5. System.out.println(record.value());
  6. System.out.println(record.key());
  7. }
  8. consumer.commitAsync();
  9. }
  10. }catch (Exception e){+
  11. e.printStackTrace();
  12. System.out.println("记录错误信息:"+e);
  13. }finally {
  14. try {
  15. consumer.commitSync();
  16. }finally {
  17. consumer.close();
  18. }
  19. }

8)springboot集成kafka

8.1)入门

1.导入spring-kafka依赖信息

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <!-- kafkfa -->
  7. <dependency>
  8. <groupId>org.springframework.kafka</groupId>
  9. <artifactId>spring-kafka</artifactId>
  10. <exclusions>
  11. <exclusion>
  12. <groupId>org.apache.kafka</groupId>
  13. <artifactId>kafka-clients</artifactId>
  14. </exclusion>
  15. </exclusions>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.kafka</groupId>
  19. <artifactId>kafka-clients</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>com.alibaba</groupId>
  23. <artifactId>fastjson</artifactId>
  24. </dependency>
  25. </dependencies>

2.在resources下创建文件application.yml

  1. server:
  2. port: 9991
  3. spring:
  4. application:
  5. name: kafka-demo
  6. kafka:
  7. bootstrap-servers: 192.168.200.130:9092
  8. producer:
  9. retries: 10
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. consumer:
  13. group-id: ${spring.application.name}-test
  14. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.消息生产者

  1. package com.heima.kafka.controller;
  2. import org.springframework.web.bind.annotation.RestController;
  3. @RestController
  4. public class HelloController {
  5. @Autowired
  6. private KafkaTemplate<String,String> kafkaTemplate;
  7. @GetMapping("/hello")
  8. public String hello(){
  9. kafkaTemplate.send("itcast-topic","黑马程序员");
  10. return "ok";
  11. }
  12. }

4.消息消费者

  1. package com.heima.kafka.listener;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. import org.springframework.util.StringUtils;
  5. @Component
  6. public class HelloListener {
  7. @KafkaListener(topics = "itcast-topic")
  8. public void onMessage(String message){
  9. if(!StringUtils.isEmpty(message)){
  10. System.out.println(message);
  11. }
  12. }
  13. }
8.2)传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息

  1. @GetMapping("/hello")
  2. public String hello(){
  3. User user = new User();
  4. user.setUsername("xiaowang");
  5. user.setAge(18);
  6.    
  7. kafkaTemplate.send("user-topic", JSON.toJSONString(user));
  8. return "ok";
  9. }
  • 接收消息

  1. package com.heima.kafka.listener;
  2. import org.springframework.util.StringUtils;
  3. @Component
  4. public class HelloListener {
  5. @KafkaListener(topics = "user-topic")
  6. public void onMessage(String message){
  7. if(!StringUtils.isEmpty(message)){
  8. User user = JSON.parseObject(message, User.class);
  9. System.out.println(user);
  10. }
  11. }
  12. }

9)自媒体文章上下架功能完成

9.1)需求分析
  • 已发表且已上架的文章可以下架

  • 已发表且已下架的文章可以上架

9.2)流程说明
9.3)接口定义

说明

接口路径

/api/v1/news/down_or_up

请求方式

POST

参数

DTO

响应结果

ResponseResult

DTO

  1. @Data
  2. public class WmNewsDto {
  3. private Integer id;
  4. /**
  5. * 是否上架 0 下架 1 上架
  6. */
  7. private Short enable;
  8. }

ResponseResult

9.4)自媒体文章上下架-功能实现

9.4.1)接口定义

在heima-leadnews-wemedia工程下的WmNewsController新增方法

  1. @PostMapping("/down_or_up")
  2. public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
  3. return null;
  4. }

在WmNewsDto中新增enable属性 ,完整的代码如下:

  1. package com.heima.model.wemedia.dtos;
  2. import lombok.Data;
  3. import java.util.Date;
  4. import java.util.List;
  5. @Data
  6. public class WmNewsDto {
  7. private Integer id;
  8. /**
  9. * 标题
  10. */
  11. private String title;
  12. /**
  13. * 频道id
  14. */
  15. private Integer channelId;
  16. /**
  17. * 标签
  18. */
  19. private String labels;
  20. /**
  21. * 发布时间
  22. */
  23. private Date publishTime;
  24. /**
  25. * 文章内容
  26. */
  27. private String content;
  28. /**
  29. * 文章封面类型 0 无图 1 单图 3 多图 -1 自动
  30. */
  31. private Short type;
  32. /**
  33. * 提交时间
  34. */
  35. private Date submitedTime;
  36. /**
  37. * 状态 提交为1 草稿为0
  38. */
  39. private Short status;
  40. /**
  41. * 封面图片列表 多张图以逗号隔开
  42. */
  43. private List<String> images;
  44. /**
  45. * 上下架 0 下架 1 上架
  46. */
  47. private Short enable;
  48. }

9.4.2)业务层编写

在WmNewsService新增方法

  1. /**
  2. * 文章的上下架
  3. * @param dto
  4. * @return
  5. */
  6. public ResponseResult downOrUp(WmNewsDto dto);

实现方法

  1. /**
  2. * 文章的上下架
  3. * @param dto
  4. * @return
  5. */
  6. @Override
  7. public ResponseResult downOrUp(WmNewsDto dto) {
  8. //1.检查参数
  9. if(dto.getId() == null){
  10. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  11. }
  12. //2.查询文章
  13. WmNews wmNews = getById(dto.getId());
  14. if(wmNews == null){
  15. return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
  16. }
  17. //3.判断文章是否已发布
  18. if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
  19. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");
  20. }
  21. //4.修改文章enable
  22. if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){
  23. update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
  24. .eq(WmNews::getId,wmNews.getId()));
  25. }
  26. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  27. }

9.4.3)控制器

  1. @PostMapping("/down_or_up")
  2. public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
  3. return wmNewsService.downOrUp(dto);
  4. }

9.4.4)测试

9.5)消息通知article端文章上下架

9.5.1)在heima-leadnews-common模块下导入kafka依赖

  1. <!-- kafkfa -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-clients</artifactId>
  9. </dependency>

9.5.2)在自媒体端的nacos配置中心配置kafka的生产者

  1. spring:
  2. kafka:
  3. bootstrap-servers: 192.168.200.130:9092
  4. producer:
  5. retries: 10
  6. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. value-serializer: org.apache.kafka.common.serialization.StringSerializer

9.5.3)在自媒体端文章上下架后发送消息

  1. //发送消息,通知article端修改文章配置
  2. if(wmNews.getArticleId() != null){
  3. Map<String,Object> map = new HashMap<>();
  4. map.put("articleId",wmNews.getArticleId());
  5. map.put("enable",dto.getEnable());
  6. kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
  7. }

常量类:

  1. public class WmNewsMessageConstants {
  2. public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
  3. }

9.5.4)在article端的nacos配置中心配置kafka的消费者

  1. spring:
  2. kafka:
  3. bootstrap-servers: 192.168.200.130:9092
  4. consumer:
  5. group-id: ${spring.application.name}
  6. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  7. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

9.5.5)在article端编写监听,接收数据

  1. package com.heima.article.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.article.service.ApArticleConfigService;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.kafka.annotation.KafkaListener;
  8. import org.springframework.stereotype.Component;
  9. import java.util.Map;
  10. @Component
  11. @Slf4j
  12. public class ArtilceIsDownListener {
  13. @Autowired
  14. private ApArticleConfigService apArticleConfigService;
  15. @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
  16. public void onMessage(String message){
  17. if(StringUtils.isNotBlank(message)){
  18. Map map = JSON.parseObject(message, Map.class);
  19. apArticleConfigService.updateByMap(map);
  20. log.info("article端文章配置修改,articleId={}",map.get("articleId"));
  21. }
  22. }
  23. }

9.5.6)修改ap_article_config表的数据

新建ApArticleConfigService

  1. package com.heima.article.service;
  2. import com.baomidou.mybatisplus.extension.service.IService;
  3. import com.heima.model.article.pojos.ApArticleConfig;
  4. import java.util.Map;
  5. public interface ApArticleConfigService extends IService<ApArticleConfig> {
  6. /**
  7. * 修改文章配置
  8. * @param map
  9. */
  10. public void updateByMap(Map map);
  11. }

实现类:

  1. package com.heima.article.service.impl;
  2. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  3. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  4. import com.heima.article.mapper.ApArticleConfigMapper;
  5. import com.heima.article.service.ApArticleConfigService;
  6. import com.heima.model.article.pojos.ApArticleConfig;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.stereotype.Service;
  9. import org.springframework.transaction.annotation.Transactional;
  10. import java.util.Map;
  11. @Service
  12. @Slf4j
  13. @Transactional
  14. public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {
  15. /**
  16. * 修改文章配置
  17. * @param map
  18. */
  19. @Override
  20. public void updateByMap(Map map) {
  21. //0 下架 1 上架
  22. Object enable = map.get("enable");
  23. boolean isDown = true;
  24. if(enable.equals(1)){
  25. isDown = false;
  26. }
  27. //修改文章配置
  28. update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));
  29. }
  30. }
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号