赞
踩
什么是延迟任务?
定时任务:有固定周期的,有明确的触发时间
延迟任务:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟
应用场景
场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消
场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止
技术选型
1、DelayQueue
JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。
compareTo方法:用于排序,确定元素出队列的顺序。
2、RabbitMQ实现延迟任务
TTL:Time To Live (消息存活时间)
死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)
3、redis实现
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序
问题思路
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
2.为什么redis中使用两种数据类型,list和zset?
原因一:list存储立即执行的任务,zset存储未来的数据
原因二:任务量过大以后,zset的性能会下降
时间复杂度:执行时间(次数)随着数据规模增长的变化趋势
操作redis中的list命令LPUSH:时间复杂度:O(1)
操作redis中的zset命令zadd:时间复杂度:O(M*Iog(n))
3.在添加zset数据的时候,为什么需要预加载?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
表结构
taskinfo 任务表
taskinfo_logs 任务日志表
①创建task类,用于接收添加任务的参数
- package com.heima.model.schedule.dtos;
-
- import lombok.Data;
-
- import java.io.Serializable;
-
- @Data
- public class Task implements Serializable {
-
- /**
- * 任务id
- */
- private Long taskId;
- /**
- * 类型
- */
- private Integer taskType;
-
- /**
- * 优先级
- */
- private Integer priority;
-
- /**
- * 执行id
- */
- private long executeTime;
-
- /**
- * task参数
- */
- private byte[] parameters;
-
- }

②创建TaskService
- package com.heima.schedule.service;
-
- import com.heima.model.schedule.dtos.Task;
-
- /**
- * 对外访问接口
- */
- public interface TaskService {
-
- /**
- * 添加任务
- * @param task 任务对象
- * @return 任务id
- */
- public long addTask(Task task) ;
-
- }

实现类
- package com.heima.schedule.service.impl;
-
- import com.alibaba.fastjson.JSON;
- import com.heima.common.constants.ScheduleConstants;
- import com.heima.common.redis.CacheService;
- import com.heima.model.schedule.dtos.Task;
- import com.heima.model.schedule.pojos.Taskinfo;
- import com.heima.model.schedule.pojos.TaskinfoLogs;
- import com.heima.schedule.mapper.TaskinfoLogsMapper;
- import com.heima.schedule.mapper.TaskinfoMapper;
- import com.heima.schedule.service.TaskService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.BeanUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- import java.util.Calendar;
- import java.util.Date;
-
- @Service
- @Transactional
- @Slf4j
- public class TaskServiceImpl implements TaskService {
- /**
- * 添加延迟任务
- *
- * @param task
- * @return
- */
- @Override
- public long addTask(Task task) {
- //1.添加任务到数据库中
-
- boolean success = addTaskToDb(task);
-
- if (success) {
- //2.添加任务到redis
- addTaskToCache(task);
- }
-
-
- return task.getTaskId();
- }
-
- @Autowired
- private CacheService cacheService;
-
- /**
- * 把任务添加到redis中
- *
- * @param task
- */
- private void addTaskToCache(Task task) {
-
- String key = task.getTaskType() + "_" + task.getPriority();
-
- //获取5分钟之后的时间 毫秒值
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.MINUTE, 5);
- long nextScheduleTime = calendar.getTimeInMillis();
-
- //2.1 如果任务的执行时间小于等于当前时间,存入list
- if (task.getExecuteTime() <= System.currentTimeMillis()) {
- cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
- } else if (task.getExecuteTime() <= nextScheduleTime) {
- //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
- cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
- }
-
-
- }
-
- @Autowired
- private TaskinfoMapper taskinfoMapper;
-
- @Autowired
- private TaskinfoLogsMapper taskinfoLogsMapper;
-
- /**
- * 添加任务到数据库中
- *
- * @param task
- * @return
- */
- private boolean addTaskToDb(Task task) {
-
- boolean flag = false;
-
- try {
- //保存任务表
- Taskinfo taskinfo = new Taskinfo();
- BeanUtils.copyProperties(task, taskinfo);
- taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
- taskinfoMapper.insert(taskinfo);
-
- //设置taskID
- task.setTaskId(taskinfo.getTaskId());
-
- //保存任务日志数据
- TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
- BeanUtils.copyProperties(taskinfo, taskinfoLogs);
- taskinfoLogs.setVersion(1);
- taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
- taskinfoLogsMapper.insert(taskinfoLogs);
-
- flag = true;
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- return flag;
- }
- }

ScheduleConstants常量类
- package com.heima.common.constants;
-
- public class ScheduleConstants {
-
- //task状态
- public static final int SCHEDULED=0; //初始化状态
-
- public static final int EXECUTED=1; //已执行状态
-
- public static final int CANCELLED=2; //已取消状态
-
- public static String FUTURE="future_"; //未来数据key前缀
-
- public static String TOPIC="topic_"; //当前数据key前缀
- }
在TaskService中添加方法
- /**
- * 取消任务
- * @param taskId 任务id
- * @return 取消结果
- */
- public boolean cancelTask(long taskId);
实现
- /**
- * 取消任务
- * @param taskId
- * @return
- */
- @Override
- public boolean cancelTask(long taskId) {
-
- boolean flag = false;
-
- //删除任务,更新日志
- Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
-
- //删除redis的数据
- if(task != null){
- removeTaskFromCache(task);
- flag = true;
- }
-
-
-
- return false;
- }
-
- /**
- * 删除redis中的任务数据
- * @param task
- */
- private void removeTaskFromCache(Task task) {
-
- String key = task.getTaskType()+"_"+task.getPriority();
-
- if(task.getExecuteTime()<=System.currentTimeMillis()){
- cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
- }else {
- cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
- }
- }
-
- /**
- * 删除任务,更新任务日志状态
- * @param taskId
- * @param status
- * @return
- */
- private Task updateDb(long taskId, int status) {
- Task task = null;
- try {
- //删除任务
- taskinfoMapper.deleteById(taskId);
-
- TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
- taskinfoLogs.setStatus(status);
- taskinfoLogsMapper.updateById(taskinfoLogs);
-
- task = new Task();
- BeanUtils.copyProperties(taskinfoLogs,task);
- task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
- }catch (Exception e){
- log.error("task cancel exception taskid={}",taskId);
- }
-
- return task;
-
- }

①在TaskService中添加方法
- /**
- * 按照类型和优先级来拉取任务
- * @param type
- * @param priority
- * @return
- */
- public Task poll(int type,int priority);
实现
- /**
- * 按照类型和优先级拉取任务
- * @return
- */
- @Override
- public Task poll(int type,int priority) {
- Task task = null;
- try {
- String key = type+"_"+priority;
- String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
- if(StringUtils.isNotBlank(task_json)){
- task = JSON.parseObject(task_json, Task.class);
- //更新数据库信息
- updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
- }
- }catch (Exception e){
- e.printStackTrace();
- log.error("poll task exception");
- }
-
- return task;
- }

方案1:keys 模糊匹配
keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞
方案2:scan
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
普通redis客户端和服务器交互模式
Pipeline请求模型
①在TaskService中添加方法
- @Scheduled(cron = "0 */1 * * * ?")
- public void refresh() {
- System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
-
- // 获取所有未来数据集合的key值
- Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
- for (String futureKey : futureKeys) { // future_250_250
-
- String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
- //获取该组key下当前需要消费的任务数据
- Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
- if (!tasks.isEmpty()) {
- //将这些任务数据添加到消费者队列中
- cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
- System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
- }
- }
- }

②在引导类中添加开启任务调度注解:`@EnableScheduling`
问题描述
启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法
分布式锁
分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。
redis实现分布式锁
sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作
客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
客户端A执行代码完成,删除锁
客户端B在等待一段时间后再去请求设置key的值,设置成功
客户端B执行代码完成,删除锁
实现
在工具类CacheService中添加方法
- /**
- * 加锁
- *
- * @param name
- * @param expire
- * @return
- */
- public String tryLock(String name, long expire) {
- name = name + "_lock";
- String token = UUID.randomUUID().toString();
- RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
- RedisConnection conn = factory.getConnection();
- try {
-
- //参考redis命令:
- //set key value [EX seconds] [PX milliseconds] [NX|XX]
- Boolean result = conn.set(
- name.getBytes(),
- token.getBytes(),
- Expiration.from(expire, TimeUnit.MILLISECONDS),
- RedisStringCommands.SetOption.SET_IF_ABSENT //NX
- );
- if (result != null && result)
- return token;
- } finally {
- RedisConnectionUtils.releaseConnection(conn, factory,false);
- }
- return null;
- }

修改未来数据定时刷新的方法,如下:
- /**
- * 未来数据定时刷新
- */
- @Scheduled(cron = "0 */1 * * * ?")
- public void refresh(){
-
- String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
- if(StringUtils.isNotBlank(token)){
- log.info("未来数据定时刷新---定时任务");
-
- //获取所有未来数据的集合key
- Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
- for (String futureKey : futureKeys) {//future_100_50
-
- //获取当前数据的key topic
- String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
-
- //按照key和分值查询符合条件的数据
- Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
-
- //同步数据
- if(!tasks.isEmpty()){
- cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
- log.info("成功的将"+futureKey+"刷新到了"+topicKey);
- }
- }
- }
- }

- @Scheduled(cron = "0 */5 * * * ?")
- @PostConstruct
- public void reloadData() {
- clearCache();
- log.info("数据库数据同步到缓存");
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.MINUTE, 5);
-
- //查看小于未来5分钟的所有任务
- List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
- if(allTasks != null && allTasks.size() > 0){
- for (Taskinfo taskinfo : allTasks) {
- Task task = new Task();
- BeanUtils.copyProperties(taskinfo,task);
- task.setExecuteTime(taskinfo.getExecuteTime().getTime());
- addTaskToCache(task);
- }
- }
- }
-
- private void clearCache(){
- // 删除缓存中未来数据集合和当前消费者队列的所有key
- Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
- Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
- cacheService.delete(futurekeys);
- cacheService.delete(topickeys);
- }

①延迟队列服务提供对外接口
提供远程的feign接口,在heima-leadnews-feign-api编写类如下:
- package com.heima.apis.schedule;
-
- import com.heima.model.common.dtos.ResponseResult;
- import com.heima.model.schedule.dtos.Task;
- import org.springframework.cloud.openfeign.FeignClient;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestBody;
-
- @FeignClient("leadnews-schedule")
- public interface IScheduleClient {
-
- /**
- * 添加任务
- * @param task 任务对象
- * @return 任务id
- */
- @PostMapping("/api/v1/task/add")
- public ResponseResult addTask(@RequestBody Task task);
-
- /**
- * 取消任务
- * @param taskId 任务id
- * @return 取消结果
- */
- @GetMapping("/api/v1/task/cancel/{taskId}")
- public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
-
- /**
- * 按照类型和优先级来拉取任务
- * @param type
- * @param priority
- * @return
- */
- @GetMapping("/api/v1/task/poll/{type}/{priority}")
- public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority);
- }

在heima-leadnews-schedule微服务下提供对应的实现
- package com.heima.schedule.feign;
-
- import com.heima.apis.schedule.IScheduleClient;
- import com.heima.model.common.dtos.ResponseResult;
- import com.heima.model.schedule.dtos.Task;
- import com.heima.schedule.service.TaskService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.*;
-
-
- @RestController
- public class ScheduleClient implements IScheduleClient {
-
- @Autowired
- private TaskService taskService;
-
- /**
- * 添加任务
- * @param task 任务对象
- * @return 任务id
- */
- @PostMapping("/api/v1/task/add")
- @Override
- public ResponseResult addTask(@RequestBody Task task) {
- return ResponseResult.okResult(taskService.addTask(task));
- }
-
- /**
- * 取消任务
- * @param taskId 任务id
- * @return 取消结果
- */
- @GetMapping("/api/v1/task/cancel/{taskId}")
- @Override
- public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
- return ResponseResult.okResult(taskService.cancelTask(taskId));
- }
-
- /**
- * 按照类型和优先级来拉取任务
- * @param type
- * @param priority
- * @return
- */
- @GetMapping("/api/v1/task/poll/{type}/{priority}")
- @Override
- public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
- return ResponseResult.okResult(taskService.poll(type,priority));
- }
- }

②发布文章集成添加延迟队列接口
在创建WmNewsTaskService
- package com.heima.wemedia.service;
-
- import com.heima.model.wemedia.pojos.WmNews;
-
-
- public interface WmNewsTaskService {
-
- /**
- * 添加任务到延迟队列中
- * @param id 文章的id
- * @param publishTime 发布的时间 可以做为任务的执行时间
- */
- public void addNewsToTask(Integer id, Date publishTime);
-
-
- }

实现
- package com.heima.wemedia.service.impl;
-
- import com.heima.apis.schedule.IScheduleClient;
- import com.heima.model.common.enums.TaskTypeEnum;
- import com.heima.model.schedule.dtos.Task;
- import com.heima.model.wemedia.pojos.WmNews;
- import com.heima.utils.common.ProtostuffUtil;
- import com.heima.wemedia.service.WmNewsTaskService;
- import lombok.SneakyThrows;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
-
-
- @Service
- @Slf4j
- public class WmNewsTaskServiceImpl implements WmNewsTaskService {
-
-
- @Autowired
- private IScheduleClient scheduleClient;
-
- /**
- * 添加任务到延迟队列中
- * @param id 文章的id
- * @param publishTime 发布的时间 可以做为任务的执行时间
- */
- @Override
- @Async
- public void addNewsToTask(Integer id, Date publishTime) {
-
- log.info("添加任务到延迟服务中----begin");
-
- Task task = new Task();
- task.setExecuteTime(publishTime.getTime());
- task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
- task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
- WmNews wmNews = new WmNews();
- wmNews.setId(id);
- task.setParameters(ProtostuffUtil.serialize(wmNews));
-
- scheduleClient.addTask(task);
-
- log.info("添加任务到延迟服务中----end");
-
- }
-
- }

枚举类
- package com.heima.model.common.enums;
-
- import lombok.AllArgsConstructor;
- import lombok.Getter;
-
- @Getter
- @AllArgsConstructor
- public enum TaskTypeEnum {
-
- NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
- REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
- private final int taskType; //对应具体业务
- private final int priority; //业务不同级别
- private final String desc; //描述信息
- }
修改发布文章代码:
把之前的异步调用修改为调用延迟任务
- @Autowired
- private WmNewsTaskService wmNewsTaskService;
-
- /**
- * 发布修改文章或保存为草稿
- * @param dto
- * @return
- */
- @Override
- public ResponseResult submitNews(WmNewsDto dto) {
-
- //0.条件判断
- if(dto == null || dto.getContent() == null){
- return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
- }
-
- //1.保存或修改文章
-
- WmNews wmNews = new WmNews();
- //属性拷贝 属性名词和类型相同才能拷贝
- BeanUtils.copyProperties(dto,wmNews);
- //封面图片 list---> string
- if(dto.getImages() != null && dto.getImages().size() > 0){
- //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg
- String imageStr = StringUtils.join(dto.getImages(), ",");
- wmNews.setImages(imageStr);
- }
- //如果当前封面类型为自动 -1
- if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
- wmNews.setType(null);
- }
-
- saveOrUpdateWmNews(wmNews);
-
- //2.判断是否为草稿 如果为草稿结束当前方法
- if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
- return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
- }
-
- //3.不是草稿,保存文章内容图片与素材的关系
- //获取到文章内容中的图片信息
- List<String> materials = ectractUrlInfo(dto.getContent());
- saveRelativeInfoForContent(materials,wmNews.getId());
-
- //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
- saveRelativeInfoForCover(dto,wmNews,materials);
-
- //审核文章
- // wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
- wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
-
- return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
-
- }

③消费任务进行审核文章
WmNewsTaskService中添加方法
- /**
- * 消费延迟队列数据
- */
- public void scanNewsByTask();
实现
- @Autowired
- private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
-
- /**
- * 消费延迟队列数据
- */
- @Scheduled(fixedRate = 1000)
- @Override
- @SneakyThrows
- public void scanNewsByTask() {
-
- log.info("文章审核---消费任务执行---begin---");
-
- ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
- if(responseResult.getCode().equals(200) && responseResult.getData() != null){
- String json_str = JSON.toJSONString(responseResult.getData());
- Task task = JSON.parseObject(json_str, Task.class);
- byte[] parameters = task.getParameters();
- WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
- System.out.println(wmNews.getId()+"-----------");
- wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
- }
- log.info("文章审核---消费任务执行---end---");
- }

在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling
需求
思路
会产生耦合
使用MQ可以解耦
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统
producer:发布消息的对象称之为主题生产者(Kafka topic producer)
topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
Docker安装zookeeper
下载镜像
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
Docker安装kafka
下载镜像
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
- docker run -d --name kafka \
- --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
- --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
- --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
- --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
- --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
- --net=host wurstmeister/kafka:2.12-2.3.1
生产者发送消息
生产者发送消息
- package com.heima.kafka.sample;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
-
- /**
- * 生产者
- */
- public class ProducerQuickStart {
-
- public static void main(String[] args) {
- //1.kafka的配置信息
- Properties properties = new Properties();
- //kafka的连接地址
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
- //发送失败,失败的重试次数
- properties.put(ProducerConfig.RETRIES_CONFIG,5);
- //消息key的序列化器
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
- //消息value的序列化器
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
-
- //2.生产者对象
- KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
-
- //封装发送的消息
- /**
- 第一个参数topic;第二个参数消息的key;第三个参数消息的value
- */
- ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
-
- //3.发送消息
- producer.send(record);
-
- //4.关闭消息通道,必须关闭,否则消息发送不成功
- producer.close();
- }
-
- }

消费者接收消息
- package com.heima.kafka.sample;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
-
- /**
- * 消费者
- */
- public class ConsumerQuickStart {
-
- public static void main(String[] args) {
- //1.添加kafka的配置信息
- Properties properties = new Properties();
- //kafka的连接地址
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
- //消费者组
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
- //消息的反序列化器
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-
- //2.消费者对象
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
-
- //3.订阅主题
- consumer.subscribe(Collections.singletonList("itheima-topic"));
-
- //当前线程一直处于监听状态
- while (true) {
- //4.获取消息
- ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
- for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
- System.out.println(consumerRecord.key());
- System.out.println(consumerRecord.value());
- }
- }
-
- }
-
- }

同一个消费者组下的消费者只有一个能消费到topic
分区
分区策略
集群
备份机制
ISR(in-sync replica)需要同步复制保存的follower
如果leader失效后,需要选出新的leader,选举的原则如下:
第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的
第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取
极端情况,就是所有副本都失效了,这时有两种方案
第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定
第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整
发送类型
同步发送
使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
- RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
- System.out.println(recordMetadata.offset());
异步发送
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
- //异步消息发送
- producer.send(kvProducerRecord, new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if(e != null){
- System.out.println("记录异常信息到日志表中");
- }
- System.out.println(recordMetadata.offset());
- }
- });
参数详解
消费者组
消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
一个发布在Topic上消息被分发给此消费者组中的一个消费者
所有的消费者都在一个组中,那么这就变成了queue模型
所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型
消息有序性
应用场景:
即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致
充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序
topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。
①导入spring-kafka依赖信息
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!-- kafkfa -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- </dependency>
- </dependencies>

②在resources下创建文件application.yml
- server:
- port: 9991
- spring:
- application:
- name: kafka-demo
- kafka:
- bootstrap-servers: 192.168.200.130:9092
- producer:
- retries: 10
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer:
- group-id: ${spring.application.name}-test
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
③消息生产者
- package com.heima.kafka.controller;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- public class HelloController {
-
- @Autowired
- private KafkaTemplate<String,String> kafkaTemplate;
-
- @GetMapping("/hello")
- public String hello(){
- kafkaTemplate.send("itcast-topic","黑马程序员");
- return "ok";
- }
- }

④消息消费者
- package com.heima.kafka.listener;
-
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StringUtils;
-
- @Component
- public class HelloListener {
-
- @KafkaListener(topics = "itcast-topic")
- public void onMessage(String message){
- if(!StringUtils.isEmpty(message)){
- System.out.println(message);
- }
-
- }
- }

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式
发送消息
- @GetMapping("/hello")
- public String hello(){
- User user = new User();
- user.setUsername("xiaowang");
- user.setAge(18);
-
- kafkaTemplate.send("user-topic", JSON.toJSONString(user));
-
- return "ok";
- }
接收消息
- package com.heima.kafka.listener;
-
- import com.alibaba.fastjson.JSON;
- import com.heima.kafka.pojo.User;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StringUtils;
-
- @Component
- public class HelloListener {
-
- @KafkaListener(topics = "user-topic")
- public void onMessage(String message){
- if(!StringUtils.isEmpty(message)){
- User user = JSON.parseObject(message, User.class);
- System.out.println(user);
- }
-
- }
- }

流程图
接口定义
①在heima-leadnews-wemedia工程下的WmNewsController新增方法
- @PostMapping("/down_or_up")
- public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
- return wmNewsService.downOrUp(dto);
- }
在WmNewsDto中新增enable属性 ,完整的代码如下:
- package com.heima.model.wemedia.dtos;
-
- import lombok.Data;
-
- import java.util.Date;
- import java.util.List;
-
- @Data
- public class WmNewsDto {
-
- private Integer id;
- /**
- * 标题
- */
- private String title;
- /**
- * 频道id
- */
- private Integer channelId;
- /**
- * 标签
- */
- private String labels;
- /**
- * 发布时间
- */
- private Date publishTime;
- /**
- * 文章内容
- */
- private String content;
- /**
- * 文章封面类型 0 无图 1 单图 3 多图 -1 自动
- */
- private Short type;
- /**
- * 提交时间
- */
- private Date submitedTime;
- /**
- * 状态 提交为1 草稿为0
- */
- private Short status;
-
- /**
- * 封面图片列表 多张图以逗号隔开
- */
- private List<String> images;
-
- /**
- * 上下架 0 下架 1 上架
- */
- private Short enable;
- }

②在WmNewsService新增方法
- /**
- * 文章的上下架
- * @param dto
- * @return
- */
- public ResponseResult downOrUp(WmNewsDto dto);
实现
- /**
- * 文章的上下架
- * @param dto
- * @return
- */
- @Override
- public ResponseResult downOrUp(WmNewsDto dto) {
- //1.检查参数
- if(dto.getId() == null){
- return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
- }
-
- //2.查询文章
- WmNews wmNews = getById(dto.getId());
- if(wmNews == null){
- return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
- }
-
- //3.判断文章是否已发布
- if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
- return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");
- }
-
- //4.修改文章enable
- if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){
- update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
- .eq(WmNews::getId,wmNews.getId()));
- }
- return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
- }

①在heima-leadnews-common模块下导入kafka依赖
- <!-- kafkfa -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
②在自媒体端的nacos配置中心配置kafka的生产者
- spring:
- kafka:
- bootstrap-servers: 192.168.200.130:9092
- producer:
- retries: 10
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
③在自媒体端文章上下架后发送消息
- //发送消息,通知article端修改文章配置
- if(wmNews.getArticleId() != null){
- Map<String,Object> map = new HashMap<>();
- map.put("articleId",wmNews.getArticleId());
- map.put("enable",dto.getEnable());
- kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
- }
常量类
- public class WmNewsMessageConstants {
-
- public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
- }
④在article端的nacos配置中心配置kafka的消费者
- spring:
- kafka:
- bootstrap-servers: 192.168.200.130:9092
- consumer:
- group-id: ${spring.application.name}
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
⑤在article端编写监听,接收数据
- package com.heima.article.listener;
-
- import com.alibaba.fastjson.JSON;
- import com.heima.article.service.ApArticleConfigService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
-
- @Component
- @Slf4j
- public class ArtilceIsDownListener {
-
- @Autowired
- private ApArticleConfigService apArticleConfigService;
-
- @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
- public void onMessage(String message){
- if(StringUtils.isNotBlank(message)){
- Map map = JSON.parseObject(message, Map.class);
- apArticleConfigService.updateByMap(map);
- log.info("article端文章配置修改,articleId={}",map.get("articleId"));
- }
- }
- }

⑥修改ap_article_config表的数据
新建ApArticleConfigService
- package com.heima.article.service;
-
- import com.baomidou.mybatisplus.extension.service.IService;
- import com.heima.model.article.pojos.ApArticleConfig;
-
- import java.util.Map;
-
- public interface ApArticleConfigService extends IService<ApArticleConfig> {
-
- /**
- * 修改文章配置
- * @param map
- */
- public void updateByMap(Map map);
- }
实现类
- package com.heima.article.service.impl;
-
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.heima.article.mapper.ApArticleConfigMapper;
- import com.heima.article.service.ApArticleConfigService;
- import com.heima.model.article.pojos.ApArticleConfig;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- import java.util.Map;
-
- @Service
- @Slf4j
- @Transactional
- public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {
-
-
- /**
- * 修改文章配置
- * @param map
- */
- @Override
- public void updateByMap(Map map) {
- //0 下架 1 上架
- Object enable = map.get("enable");
- boolean isDown = true;
- if(enable.equals(1)){
- isDown = false;
- }
- //修改文章配置
- update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));
-
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。