当前位置:   article > 正文

黑马头条项目详解(三)

黑马头条

五、文章定时发布----延迟任务

什么是延迟任务?

定时任务:有固定周期的,有明确的触发时间

延迟任务:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟 

应用场景

场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消

场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止

技术选型

1、DelayQueue

JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法

getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。

compareTo方法:用于排序,确定元素出队列的顺序。

2、RabbitMQ实现延迟任务

TTL:Time To Live (消息存活时间)

死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)

3、redis实现

zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序

 5.1、redis实现延迟任务的思路

问题思路

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

2.为什么redis中使用两种数据类型,list和zset?

原因一:list存储立即执行的任务,zset存储未来的数据
原因二:任务量过大以后,zset的性能会下降
时间复杂度:执行时间(次数)随着数据规模增长的变化趋势
操作redis中的list命令LPUSH:时间复杂度:O(1)
操作redis中的zset命令zadd:时间复杂度:O(M*Iog(n))

3.在添加zset数据的时候,为什么需要预加载?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

5.2、表结构

表结构

taskinfo 任务表

taskinfo_logs 任务日志表 

 5.3、延迟任务---添加任务

①创建task类,用于接收添加任务的参数

  1. package com.heima.model.schedule.dtos;
  2. import lombok.Data;
  3. import java.io.Serializable;
  4. @Data
  5. public class Task implements Serializable {
  6. /**
  7. * 任务id
  8. */
  9. private Long taskId;
  10. /**
  11. * 类型
  12. */
  13. private Integer taskType;
  14. /**
  15. * 优先级
  16. */
  17. private Integer priority;
  18. /**
  19. * 执行id
  20. */
  21. private long executeTime;
  22. /**
  23. * task参数
  24. */
  25. private byte[] parameters;
  26. }

②创建TaskService

  1. package com.heima.schedule.service;
  2. import com.heima.model.schedule.dtos.Task;
  3. /**
  4. * 对外访问接口
  5. */
  6. public interface TaskService {
  7. /**
  8. * 添加任务
  9. * @param task 任务对象
  10. * @return 任务id
  11. */
  12. public long addTask(Task task) ;
  13. }

实现类

  1. package com.heima.schedule.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.common.constants.ScheduleConstants;
  4. import com.heima.common.redis.CacheService;
  5. import com.heima.model.schedule.dtos.Task;
  6. import com.heima.model.schedule.pojos.Taskinfo;
  7. import com.heima.model.schedule.pojos.TaskinfoLogs;
  8. import com.heima.schedule.mapper.TaskinfoLogsMapper;
  9. import com.heima.schedule.mapper.TaskinfoMapper;
  10. import com.heima.schedule.service.TaskService;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springframework.beans.BeanUtils;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.stereotype.Service;
  15. import org.springframework.transaction.annotation.Transactional;
  16. import java.util.Calendar;
  17. import java.util.Date;
  18. @Service
  19. @Transactional
  20. @Slf4j
  21. public class TaskServiceImpl implements TaskService {
  22. /**
  23. * 添加延迟任务
  24. *
  25. * @param task
  26. * @return
  27. */
  28. @Override
  29. public long addTask(Task task) {
  30. //1.添加任务到数据库中
  31. boolean success = addTaskToDb(task);
  32. if (success) {
  33. //2.添加任务到redis
  34. addTaskToCache(task);
  35. }
  36. return task.getTaskId();
  37. }
  38. @Autowired
  39. private CacheService cacheService;
  40. /**
  41. * 把任务添加到redis中
  42. *
  43. * @param task
  44. */
  45. private void addTaskToCache(Task task) {
  46. String key = task.getTaskType() + "_" + task.getPriority();
  47. //获取5分钟之后的时间 毫秒值
  48. Calendar calendar = Calendar.getInstance();
  49. calendar.add(Calendar.MINUTE, 5);
  50. long nextScheduleTime = calendar.getTimeInMillis();
  51. //2.1 如果任务的执行时间小于等于当前时间,存入list
  52. if (task.getExecuteTime() <= System.currentTimeMillis()) {
  53. cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
  54. } else if (task.getExecuteTime() <= nextScheduleTime) {
  55. //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
  56. cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
  57. }
  58. }
  59. @Autowired
  60. private TaskinfoMapper taskinfoMapper;
  61. @Autowired
  62. private TaskinfoLogsMapper taskinfoLogsMapper;
  63. /**
  64. * 添加任务到数据库中
  65. *
  66. * @param task
  67. * @return
  68. */
  69. private boolean addTaskToDb(Task task) {
  70. boolean flag = false;
  71. try {
  72. //保存任务表
  73. Taskinfo taskinfo = new Taskinfo();
  74. BeanUtils.copyProperties(task, taskinfo);
  75. taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
  76. taskinfoMapper.insert(taskinfo);
  77. //设置taskID
  78. task.setTaskId(taskinfo.getTaskId());
  79. //保存任务日志数据
  80. TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
  81. BeanUtils.copyProperties(taskinfo, taskinfoLogs);
  82. taskinfoLogs.setVersion(1);
  83. taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
  84. taskinfoLogsMapper.insert(taskinfoLogs);
  85. flag = true;
  86. } catch (Exception e) {
  87. e.printStackTrace();
  88. }
  89. return flag;
  90. }
  91. }

ScheduleConstants常量类

  1. package com.heima.common.constants;
  2. public class ScheduleConstants {
  3. //task状态
  4. public static final int SCHEDULED=0; //初始化状态
  5. public static final int EXECUTED=1; //已执行状态
  6. public static final int CANCELLED=2; //已取消状态
  7. public static String FUTURE="future_"; //未来数据key前缀
  8. public static String TOPIC="topic_"; //当前数据key前缀
  9. }

5.4、延迟任务---取消任务

在TaskService中添加方法

  1. /**
  2. * 取消任务
  3. * @param taskId 任务id
  4. * @return 取消结果
  5. */
  6. public boolean cancelTask(long taskId);

 实现

  1. /**
  2. * 取消任务
  3. * @param taskId
  4. * @return
  5. */
  6. @Override
  7. public boolean cancelTask(long taskId) {
  8. boolean flag = false;
  9. //删除任务,更新日志
  10. Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
  11. //删除redis的数据
  12. if(task != null){
  13. removeTaskFromCache(task);
  14. flag = true;
  15. }
  16. return false;
  17. }
  18. /**
  19. * 删除redis中的任务数据
  20. * @param task
  21. */
  22. private void removeTaskFromCache(Task task) {
  23. String key = task.getTaskType()+"_"+task.getPriority();
  24. if(task.getExecuteTime()<=System.currentTimeMillis()){
  25. cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
  26. }else {
  27. cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
  28. }
  29. }
  30. /**
  31. * 删除任务,更新任务日志状态
  32. * @param taskId
  33. * @param status
  34. * @return
  35. */
  36. private Task updateDb(long taskId, int status) {
  37. Task task = null;
  38. try {
  39. //删除任务
  40. taskinfoMapper.deleteById(taskId);
  41. TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
  42. taskinfoLogs.setStatus(status);
  43. taskinfoLogsMapper.updateById(taskinfoLogs);
  44. task = new Task();
  45. BeanUtils.copyProperties(taskinfoLogs,task);
  46. task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
  47. }catch (Exception e){
  48. log.error("task cancel exception taskid={}",taskId);
  49. }
  50. return task;
  51. }

5.5、延迟任务---消费任务

①在TaskService中添加方法

  1. /**
  2. * 按照类型和优先级来拉取任务
  3. * @param type
  4. * @param priority
  5. * @return
  6. */
  7. public Task poll(int type,int priority);

实现

  1. /**
  2. * 按照类型和优先级拉取任务
  3. * @return
  4. */
  5. @Override
  6. public Task poll(int type,int priority) {
  7. Task task = null;
  8. try {
  9. String key = type+"_"+priority;
  10. String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
  11. if(StringUtils.isNotBlank(task_json)){
  12. task = JSON.parseObject(task_json, Task.class);
  13. //更新数据库信息
  14. updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
  15. }
  16. }catch (Exception e){
  17. e.printStackTrace();
  18. log.error("poll task exception");
  19. }
  20. return task;
  21. }

5.6、延迟任务---未来数据定时刷新,zset同步至list

 redis的key值匹配

方案1:keys 模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

方案2:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

redis通道

普通redis客户端和服务器交互模式

Pipeline请求模型

功能实现

 ①在TaskService中添加方法

  1. @Scheduled(cron = "0 */1 * * * ?")
  2. public void refresh() {
  3. System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
  4. // 获取所有未来数据集合的key值
  5. Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
  6. for (String futureKey : futureKeys) { // future_250_250
  7. String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
  8. //获取该组key下当前需要消费的任务数据
  9. Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  10. if (!tasks.isEmpty()) {
  11. //将这些任务数据添加到消费者队列中
  12. cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
  13. System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
  14. }
  15. }
  16. }

②在引导类中添加开启任务调度注解:`@EnableScheduling`

5.7、分布式锁解决集群下的方法抢占执行

问题描述

启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法 

分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。  

redis实现分布式锁

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。 

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功

  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败

  • 客户端A执行代码完成,删除锁

  • 客户端B在等待一段时间后再去请求设置key的值,设置成功

  • 客户端B执行代码完成,删除锁

实现

在工具类CacheService中添加方法

  1. /**
  2. * 加锁
  3. *
  4. * @param name
  5. * @param expire
  6. * @return
  7. */
  8. public String tryLock(String name, long expire) {
  9. name = name + "_lock";
  10. String token = UUID.randomUUID().toString();
  11. RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
  12. RedisConnection conn = factory.getConnection();
  13. try {
  14. //参考redis命令:
  15. //set key value [EX seconds] [PX milliseconds] [NX|XX]
  16. Boolean result = conn.set(
  17. name.getBytes(),
  18. token.getBytes(),
  19. Expiration.from(expire, TimeUnit.MILLISECONDS),
  20. RedisStringCommands.SetOption.SET_IF_ABSENT //NX
  21. );
  22. if (result != null && result)
  23. return token;
  24. } finally {
  25. RedisConnectionUtils.releaseConnection(conn, factory,false);
  26. }
  27. return null;
  28. }

修改未来数据定时刷新的方法,如下:

  1. /**
  2. * 未来数据定时刷新
  3. */
  4. @Scheduled(cron = "0 */1 * * * ?")
  5. public void refresh(){
  6. String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
  7. if(StringUtils.isNotBlank(token)){
  8. log.info("未来数据定时刷新---定时任务");
  9. //获取所有未来数据的集合key
  10. Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
  11. for (String futureKey : futureKeys) {//future_100_50
  12. //获取当前数据的key topic
  13. String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
  14. //按照key和分值查询符合条件的数据
  15. Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  16. //同步数据
  17. if(!tasks.isEmpty()){
  18. cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
  19. log.info("成功的将"+futureKey+"刷新到了"+topicKey);
  20. }
  21. }
  22. }
  23. }

5.8、数据库任务定时同步到redis

  1. @Scheduled(cron = "0 */5 * * * ?")
  2. @PostConstruct
  3. public void reloadData() {
  4. clearCache();
  5. log.info("数据库数据同步到缓存");
  6. Calendar calendar = Calendar.getInstance();
  7. calendar.add(Calendar.MINUTE, 5);
  8. //查看小于未来5分钟的所有任务
  9. List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
  10. if(allTasks != null && allTasks.size() > 0){
  11. for (Taskinfo taskinfo : allTasks) {
  12. Task task = new Task();
  13. BeanUtils.copyProperties(taskinfo,task);
  14. task.setExecuteTime(taskinfo.getExecuteTime().getTime());
  15. addTaskToCache(task);
  16. }
  17. }
  18. }
  19. private void clearCache(){
  20. // 删除缓存中未来数据集合和当前消费者队列的所有key
  21. Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
  22. Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
  23. cacheService.delete(futurekeys);
  24. cacheService.delete(topickeys);
  25. }

 5.9、延迟队列解决精准时间发布文章

①延迟队列服务提供对外接口 

提供远程的feign接口,在heima-leadnews-feign-api编写类如下: 

  1. package com.heima.apis.schedule;
  2. import com.heima.model.common.dtos.ResponseResult;
  3. import com.heima.model.schedule.dtos.Task;
  4. import org.springframework.cloud.openfeign.FeignClient;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.PathVariable;
  7. import org.springframework.web.bind.annotation.PostMapping;
  8. import org.springframework.web.bind.annotation.RequestBody;
  9. @FeignClient("leadnews-schedule")
  10. public interface IScheduleClient {
  11. /**
  12. * 添加任务
  13. * @param task 任务对象
  14. * @return 任务id
  15. */
  16. @PostMapping("/api/v1/task/add")
  17. public ResponseResult addTask(@RequestBody Task task);
  18. /**
  19. * 取消任务
  20. * @param taskId 任务id
  21. * @return 取消结果
  22. */
  23. @GetMapping("/api/v1/task/cancel/{taskId}")
  24. public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
  25. /**
  26. * 按照类型和优先级来拉取任务
  27. * @param type
  28. * @param priority
  29. * @return
  30. */
  31. @GetMapping("/api/v1/task/poll/{type}/{priority}")
  32. public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority);
  33. }

 在heima-leadnews-schedule微服务下提供对应的实现

  1. package com.heima.schedule.feign;
  2. import com.heima.apis.schedule.IScheduleClient;
  3. import com.heima.model.common.dtos.ResponseResult;
  4. import com.heima.model.schedule.dtos.Task;
  5. import com.heima.schedule.service.TaskService;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.*;
  8. @RestController
  9. public class ScheduleClient implements IScheduleClient {
  10. @Autowired
  11. private TaskService taskService;
  12. /**
  13. * 添加任务
  14. * @param task 任务对象
  15. * @return 任务id
  16. */
  17. @PostMapping("/api/v1/task/add")
  18. @Override
  19. public ResponseResult addTask(@RequestBody Task task) {
  20. return ResponseResult.okResult(taskService.addTask(task));
  21. }
  22. /**
  23. * 取消任务
  24. * @param taskId 任务id
  25. * @return 取消结果
  26. */
  27. @GetMapping("/api/v1/task/cancel/{taskId}")
  28. @Override
  29. public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
  30. return ResponseResult.okResult(taskService.cancelTask(taskId));
  31. }
  32. /**
  33. * 按照类型和优先级来拉取任务
  34. * @param type
  35. * @param priority
  36. * @return
  37. */
  38. @GetMapping("/api/v1/task/poll/{type}/{priority}")
  39. @Override
  40. public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
  41. return ResponseResult.okResult(taskService.poll(type,priority));
  42. }
  43. }

②发布文章集成添加延迟队列接口

在创建WmNewsTaskService

  1. package com.heima.wemedia.service;
  2. import com.heima.model.wemedia.pojos.WmNews;
  3. public interface WmNewsTaskService {
  4. /**
  5. * 添加任务到延迟队列中
  6. * @param id 文章的id
  7. * @param publishTime 发布的时间 可以做为任务的执行时间
  8. */
  9. public void addNewsToTask(Integer id, Date publishTime);
  10. }

实现

  1. package com.heima.wemedia.service.impl;
  2. import com.heima.apis.schedule.IScheduleClient;
  3. import com.heima.model.common.enums.TaskTypeEnum;
  4. import com.heima.model.schedule.dtos.Task;
  5. import com.heima.model.wemedia.pojos.WmNews;
  6. import com.heima.utils.common.ProtostuffUtil;
  7. import com.heima.wemedia.service.WmNewsTaskService;
  8. import lombok.SneakyThrows;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.scheduling.annotation.Async;
  12. import org.springframework.stereotype.Service;
  13. @Service
  14. @Slf4j
  15. public class WmNewsTaskServiceImpl implements WmNewsTaskService {
  16. @Autowired
  17. private IScheduleClient scheduleClient;
  18. /**
  19. * 添加任务到延迟队列中
  20. * @param id 文章的id
  21. * @param publishTime 发布的时间 可以做为任务的执行时间
  22. */
  23. @Override
  24. @Async
  25. public void addNewsToTask(Integer id, Date publishTime) {
  26. log.info("添加任务到延迟服务中----begin");
  27. Task task = new Task();
  28. task.setExecuteTime(publishTime.getTime());
  29. task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
  30. task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  31. WmNews wmNews = new WmNews();
  32. wmNews.setId(id);
  33. task.setParameters(ProtostuffUtil.serialize(wmNews));
  34. scheduleClient.addTask(task);
  35. log.info("添加任务到延迟服务中----end");
  36. }
  37. }

枚举类

  1. package com.heima.model.common.enums;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Getter;
  4. @Getter
  5. @AllArgsConstructor
  6. public enum TaskTypeEnum {
  7. NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
  8. REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
  9. private final int taskType; //对应具体业务
  10. private final int priority; //业务不同级别
  11. private final String desc; //描述信息
  12. }

修改发布文章代码:

把之前的异步调用修改为调用延迟任务

  1. @Autowired
  2. private WmNewsTaskService wmNewsTaskService;
  3. /**
  4. * 发布修改文章或保存为草稿
  5. * @param dto
  6. * @return
  7. */
  8. @Override
  9. public ResponseResult submitNews(WmNewsDto dto) {
  10. //0.条件判断
  11. if(dto == null || dto.getContent() == null){
  12. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  13. }
  14. //1.保存或修改文章
  15. WmNews wmNews = new WmNews();
  16. //属性拷贝 属性名词和类型相同才能拷贝
  17. BeanUtils.copyProperties(dto,wmNews);
  18. //封面图片 list---> string
  19. if(dto.getImages() != null && dto.getImages().size() > 0){
  20. //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg
  21. String imageStr = StringUtils.join(dto.getImages(), ",");
  22. wmNews.setImages(imageStr);
  23. }
  24. //如果当前封面类型为自动 -1
  25. if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
  26. wmNews.setType(null);
  27. }
  28. saveOrUpdateWmNews(wmNews);
  29. //2.判断是否为草稿 如果为草稿结束当前方法
  30. if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
  31. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  32. }
  33. //3.不是草稿,保存文章内容图片与素材的关系
  34. //获取到文章内容中的图片信息
  35. List<String> materials = ectractUrlInfo(dto.getContent());
  36. saveRelativeInfoForContent(materials,wmNews.getId());
  37. //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
  38. saveRelativeInfoForCover(dto,wmNews,materials);
  39. //审核文章
  40. // wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  41. wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
  42. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  43. }

③消费任务进行审核文章

WmNewsTaskService中添加方法

  1. /**
  2. * 消费延迟队列数据
  3. */
  4. public void scanNewsByTask();

实现

  1. @Autowired
  2. private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
  3. /**
  4. * 消费延迟队列数据
  5. */
  6. @Scheduled(fixedRate = 1000)
  7. @Override
  8. @SneakyThrows
  9. public void scanNewsByTask() {
  10. log.info("文章审核---消费任务执行---begin---");
  11. ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  12. if(responseResult.getCode().equals(200) && responseResult.getData() != null){
  13. String json_str = JSON.toJSONString(responseResult.getData());
  14. Task task = JSON.parseObject(json_str, Task.class);
  15. byte[] parameters = task.getParameters();
  16. WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
  17. System.out.println(wmNews.getId()+"-----------");
  18. wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  19. }
  20. log.info("文章审核---消费任务执行---end---");
  21. }

在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling

六、自媒体文章上下架

需求

思路

 

会产生耦合 

使用MQ可以解耦 

6.1、消息中间件

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统  

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

6.2、kafka安装配置

 Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

  • Docker安装zookeeper

下载镜像

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安装kafka

 下载镜像

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

  1. docker run -d --name kafka \
  2. --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
  3. --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
  4. --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
  5. --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
  6. --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
  7. --net=host wurstmeister/kafka:2.12-2.3.1

6.3、kafka入门

生产者发送消息

 生产者发送消息

  1. package com.heima.kafka.sample;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. /**
  7. * 生产者
  8. */
  9. public class ProducerQuickStart {
  10. public static void main(String[] args) {
  11. //1.kafka的配置信息
  12. Properties properties = new Properties();
  13. //kafka的连接地址
  14. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
  15. //发送失败,失败的重试次数
  16. properties.put(ProducerConfig.RETRIES_CONFIG,5);
  17. //消息key的序列化器
  18. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  19. //消息value的序列化器
  20. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  21. //2.生产者对象
  22. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
  23. //封装发送的消息
  24. /**
  25. 第一个参数topic;第二个参数消息的key;第三个参数消息的value
  26. */
  27. ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
  28. //3.发送消息
  29. producer.send(record);
  30. //4.关闭消息通道,必须关闭,否则消息发送不成功
  31. producer.close();
  32. }
  33. }

消费者接收消息

  1. package com.heima.kafka.sample;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import java.time.Duration;
  7. import java.util.Collections;
  8. import java.util.Properties;
  9. /**
  10. * 消费者
  11. */
  12. public class ConsumerQuickStart {
  13. public static void main(String[] args) {
  14. //1.添加kafka的配置信息
  15. Properties properties = new Properties();
  16. //kafka的连接地址
  17. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
  18. //消费者组
  19. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
  20. //消息的反序列化器
  21. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  22. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  23. //2.消费者对象
  24. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
  25. //3.订阅主题
  26. consumer.subscribe(Collections.singletonList("itheima-topic"));
  27. //当前线程一直处于监听状态
  28. while (true) {
  29. //4.获取消息
  30. ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
  31. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  32. System.out.println(consumerRecord.key());
  33. System.out.println(consumerRecord.value());
  34. }
  35. }
  36. }
  37. }

 同一个消费者组下的消费者只有一个能消费到topic

分区

分区策略

6.4、kafka高可用设计

 集群

备份机制

ISR(in-sync replica)需要同步复制保存的follower

如果leader失效后,需要选出新的leader,选举的原则如下:

第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的

第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取

极端情况,就是所有副本都失效了,这时有两种方案

第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定

第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整

6.5、kafka生产者详解

发送类型

同步发送

使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

  1. RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
  2. System.out.println(recordMetadata.offset());

 异步发送

调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

  1. //异步消息发送
  2. producer.send(kvProducerRecord, new Callback() {
  3. @Override
  4. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  5. if(e != null){
  6. System.out.println("记录异常信息到日志表中");
  7. }
  8. System.out.println(recordMetadata.offset());
  9. }
  10. });

参数详解

 6.6、kafka消费者详解

消费者组

  • 消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体

  • 一个发布在Topic上消息被分发给此消费者组中的一个消费者

    • 所有的消费者都在一个组中,那么这就变成了queue模型

    • 所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型

消息有序性

应用场景:

  • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致

  • 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序

topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。  

6.7、springboot集成kafka

①导入spring-kafka依赖信息

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <!-- kafkfa -->
  7. <dependency>
  8. <groupId>org.springframework.kafka</groupId>
  9. <artifactId>spring-kafka</artifactId>
  10. <exclusions>
  11. <exclusion>
  12. <groupId>org.apache.kafka</groupId>
  13. <artifactId>kafka-clients</artifactId>
  14. </exclusion>
  15. </exclusions>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.kafka</groupId>
  19. <artifactId>kafka-clients</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>com.alibaba</groupId>
  23. <artifactId>fastjson</artifactId>
  24. </dependency>
  25. </dependencies>

 ②在resources下创建文件application.yml

  1. server:
  2. port: 9991
  3. spring:
  4. application:
  5. name: kafka-demo
  6. kafka:
  7. bootstrap-servers: 192.168.200.130:9092
  8. producer:
  9. retries: 10
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. consumer:
  13. group-id: ${spring.application.name}-test
  14. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

③消息生产者

  1. package com.heima.kafka.controller;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. public class HelloController {
  8. @Autowired
  9. private KafkaTemplate<String,String> kafkaTemplate;
  10. @GetMapping("/hello")
  11. public String hello(){
  12. kafkaTemplate.send("itcast-topic","黑马程序员");
  13. return "ok";
  14. }
  15. }

④消息消费者

  1. package com.heima.kafka.listener;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. import org.springframework.util.StringUtils;
  5. @Component
  6. public class HelloListener {
  7. @KafkaListener(topics = "itcast-topic")
  8. public void onMessage(String message){
  9. if(!StringUtils.isEmpty(message)){
  10. System.out.println(message);
  11. }
  12. }
  13. }

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息

  1. @GetMapping("/hello")
  2. public String hello(){
  3. User user = new User();
  4. user.setUsername("xiaowang");
  5. user.setAge(18);
  6. kafkaTemplate.send("user-topic", JSON.toJSONString(user));
  7. return "ok";
  8. }
  • 接收消息  

  1. package com.heima.kafka.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.kafka.pojo.User;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. import org.springframework.util.StringUtils;
  7. @Component
  8. public class HelloListener {
  9. @KafkaListener(topics = "user-topic")
  10. public void onMessage(String message){
  11. if(!StringUtils.isEmpty(message)){
  12. User user = JSON.parseObject(message, User.class);
  13. System.out.println(user);
  14. }
  15. }
  16. }

6.8、自媒体文章上下架--自媒体端

流程图 

接口定义

 ①在heima-leadnews-wemedia工程下的WmNewsController新增方法

  1. @PostMapping("/down_or_up")
  2. public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
  3. return wmNewsService.downOrUp(dto);
  4. }

在WmNewsDto中新增enable属性 ,完整的代码如下:

  1. package com.heima.model.wemedia.dtos;
  2. import lombok.Data;
  3. import java.util.Date;
  4. import java.util.List;
  5. @Data
  6. public class WmNewsDto {
  7. private Integer id;
  8. /**
  9. * 标题
  10. */
  11. private String title;
  12. /**
  13. * 频道id
  14. */
  15. private Integer channelId;
  16. /**
  17. * 标签
  18. */
  19. private String labels;
  20. /**
  21. * 发布时间
  22. */
  23. private Date publishTime;
  24. /**
  25. * 文章内容
  26. */
  27. private String content;
  28. /**
  29. * 文章封面类型 0 无图 1 单图 3 多图 -1 自动
  30. */
  31. private Short type;
  32. /**
  33. * 提交时间
  34. */
  35. private Date submitedTime;
  36. /**
  37. * 状态 提交为1 草稿为0
  38. */
  39. private Short status;
  40. /**
  41. * 封面图片列表 多张图以逗号隔开
  42. */
  43. private List<String> images;
  44. /**
  45. * 上下架 0 下架 1 上架
  46. */
  47. private Short enable;
  48. }

②在WmNewsService新增方法

  1. /**
  2. * 文章的上下架
  3. * @param dto
  4. * @return
  5. */
  6. public ResponseResult downOrUp(WmNewsDto dto);

实现

  1. /**
  2. * 文章的上下架
  3. * @param dto
  4. * @return
  5. */
  6. @Override
  7. public ResponseResult downOrUp(WmNewsDto dto) {
  8. //1.检查参数
  9. if(dto.getId() == null){
  10. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  11. }
  12. //2.查询文章
  13. WmNews wmNews = getById(dto.getId());
  14. if(wmNews == null){
  15. return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
  16. }
  17. //3.判断文章是否已发布
  18. if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
  19. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");
  20. }
  21. //4.修改文章enable
  22. if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){
  23. update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
  24. .eq(WmNews::getId,wmNews.getId()));
  25. }
  26. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  27. }

 6.9、自媒体文章上下架--消息通知article端文章上下架

①在heima-leadnews-common模块下导入kafka依赖

  1. <!-- kafkfa -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-clients</artifactId>
  9. </dependency>

②在自媒体端的nacos配置中心配置kafka的生产者

  1. spring:
  2. kafka:
  3. bootstrap-servers: 192.168.200.130:9092
  4. producer:
  5. retries: 10
  6. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. value-serializer: org.apache.kafka.common.serialization.StringSerializer

③在自媒体端文章上下架后发送消息

  1. //发送消息,通知article端修改文章配置
  2. if(wmNews.getArticleId() != null){
  3. Map<String,Object> map = new HashMap<>();
  4. map.put("articleId",wmNews.getArticleId());
  5. map.put("enable",dto.getEnable());
  6. kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
  7. }

常量类

  1. public class WmNewsMessageConstants {
  2. public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
  3. }

④在article端的nacos配置中心配置kafka的消费者

  1. spring:
  2. kafka:
  3. bootstrap-servers: 192.168.200.130:9092
  4. consumer:
  5. group-id: ${spring.application.name}
  6. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  7. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

⑤在article端编写监听,接收数据

  1. package com.heima.article.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.article.service.ApArticleConfigService;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.kafka.annotation.KafkaListener;
  8. import org.springframework.stereotype.Component;
  9. import java.util.Map;
  10. @Component
  11. @Slf4j
  12. public class ArtilceIsDownListener {
  13. @Autowired
  14. private ApArticleConfigService apArticleConfigService;
  15. @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
  16. public void onMessage(String message){
  17. if(StringUtils.isNotBlank(message)){
  18. Map map = JSON.parseObject(message, Map.class);
  19. apArticleConfigService.updateByMap(map);
  20. log.info("article端文章配置修改,articleId={}",map.get("articleId"));
  21. }
  22. }
  23. }

⑥修改ap_article_config表的数据

新建ApArticleConfigService

  1. package com.heima.article.service;
  2. import com.baomidou.mybatisplus.extension.service.IService;
  3. import com.heima.model.article.pojos.ApArticleConfig;
  4. import java.util.Map;
  5. public interface ApArticleConfigService extends IService<ApArticleConfig> {
  6. /**
  7. * 修改文章配置
  8. * @param map
  9. */
  10. public void updateByMap(Map map);
  11. }

实现类

  1. package com.heima.article.service.impl;
  2. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  3. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  4. import com.heima.article.mapper.ApArticleConfigMapper;
  5. import com.heima.article.service.ApArticleConfigService;
  6. import com.heima.model.article.pojos.ApArticleConfig;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.stereotype.Service;
  9. import org.springframework.transaction.annotation.Transactional;
  10. import java.util.Map;
  11. @Service
  12. @Slf4j
  13. @Transactional
  14. public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {
  15. /**
  16. * 修改文章配置
  17. * @param map
  18. */
  19. @Override
  20. public void updateByMap(Map map) {
  21. //0 下架 1 上架
  22. Object enable = map.get("enable");
  23. boolean isDown = true;
  24. if(enable.equals(1)){
  25. isDown = false;
  26. }
  27. //修改文章配置
  28. update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));
  29. }
  30. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/988130
推荐阅读
相关标签
  

闽ICP备14008679号