赞
踩
spring-boot-starter-actuator是Spring Boot提供的一个用于监控和管理应用程序的模块
用于查看应用程序的健康状况、审计信息、指标和其他有用的信息。这些端点可以帮助你监控应用程序的运行状态、性能指标和健康状况。
已经有了其他的监控和管理工具,不需要使用Spring Boot Actuator提供的功能。
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</exclusion>
</exclusions>
</dependency>
添加延迟队列时使用,监测扫描时也会用这个工具类进行获取消息
package cn.creatoo.common.redis.queue; import cn.creatoo.common.core.utils.StringUtils; import org.redisson.api.RBlockingDeque; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.lang.NonNull; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; /** * 分布式延时队列工具类 * @author */ @Component @ConditionalOnBean({RedissonClient.class}) public class RedisDelayQueueUtil { private static final Logger log = LoggerFactory.getLogger(RedisDelayQueueUtil.class); @Resource private RedissonClient redissonClient; /** * 添加延迟队列 * * @param value 队列值 * @param delay 延迟时间 * @param timeUnit 时间单位 * @param queueCode 队列键 * @param <T> */ public <T> boolean addDelayQueue(@NonNull T value, @NonNull long delay, @NonNull TimeUnit timeUnit, @NonNull String queueCode) { if (StringUtils.isBlank(queueCode) || Objects.isNull(value)) { return false; } try { RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); delayedQueue.offer(value, delay, timeUnit); //delayedQueue.destroy(); log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒"); } catch (Exception e) { log.error("(添加延时队列失败) {}", e.getMessage()); throw new RuntimeException("(添加延时队列失败)"); } return true; } /** * 获取延迟队列 * * @param queueCode * @param <T> */ public <T> T getDelayQueue(@NonNull String queueCode) throws InterruptedException { if (StringUtils.isBlank(queueCode)) { return null; } RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode); RDelayedQueue<Map> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); T value = (T) blockingDeque.poll(); return value; } /** * 删除指定队列中的消息 * * @param o 指定删除的消息对象队列值(同队列需保证唯一性) * @param queueCode 指定队列键 */ public boolean removeDelayedQueue(@NonNull Object o, @NonNull String queueCode) { if (StringUtils.isBlank(queueCode) || Objects.isNull(o)) { return false; } RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); boolean flag = delayedQueue.remove(o); //delayedQueue.destroy(); return flag; } }
package cn.creatoo.system.handler;
/**
* 延迟队列执行器
*/
public interface RedisDelayQueueHandle<T> {
void execute(T t);
}
实现队列执行器接口,在这里写延迟要做的业务逻辑
package cn.creatoo.system.handler.impl; import cn.creatoo.common.core.domain.vo.WaterVo; import cn.creatoo.system.api.RemoteFileService; import cn.creatoo.system.handler.RedisDelayQueueHandle; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Map; @Component("exposeLinkCloudDelay") public class ExposeLinkCloudDelay implements RedisDelayQueueHandle<Map> { @Autowired private RemoteFileService remoteFileService; @Override public void execute(Map map) { long dataId = Long.parseLong(map.get("dataId").toString()); WaterVo waterVo = new WaterVo(); waterVo.setFileLink(map.get("fileLink").toString()); waterVo.setType(Integer.parseInt(map.get("type").toString())); waterVo.setDataId(dataId); remoteFileService.waterLink(waterVo); } }
package cn.creatoo.common.core.enums; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; /** * 延迟队列业务枚举类 * @author shang tf * @data 2024/3/21 14:52 */ @Getter @NoArgsConstructor @AllArgsConstructor public enum FileRedisDelayQueueEnum { EXPOSE_LINK_DELAY("EXPOSE_LINK_DELAY","资源链接处理","exposeLinkDelay"), EXPOSE_LINK_CLOUD_DELAY("EXPOSE_LINK_CLOUD_DELAY","资源链接处理","exposeLinkCloudDelay"), COMPRESSED_LINK_DELAY("COMPRESSED_LINK_DELAY","文件压缩处理","compressedLinkDelay"), UPLOAD_TO_CLOUD_DELAY("UPLOAD_TO_CLOUD_DELAY","资源上传消费端","uploadToCloudDelay"), GET_HASHCODE_DELAY("GET_HASHCODE_DELAY","资源hash值获取","getHashcodeDelay"), UPLOAD_FILE_TO_CABINET("UPLOAD_FILE_CABINET","异步添加文件到数据柜","uploadFileCabinet"); /** * 延迟队列 Redis Key */ private String code; /** * 中文描述 */ private String name; /** * 延迟队列具体业务实现的 Bean * 可通过 Spring 的上下文获取 */ private String beanId; }
package cn.creatoo.system.handler.impl; import cn.creatoo.common.core.enums.FileRedisDelayQueueEnum; import cn.creatoo.common.redis.queue.RedisDelayQueueUtil; import cn.creatoo.system.handler.RedisDelayQueueHandle; import com.alibaba.fastjson2.JSON; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author shang tf * @data 2024/3/14 10:45 * 启动延迟队列监测扫描 * 文件处理的延迟队列线程池 */ @Slf4j @Component public class FileRedisDelayQueueRunner implements CommandLineRunner { @Autowired private RedisDelayQueueUtil redisDelayQueueUtil; @Autowired private ApplicationContext context; @Autowired private ThreadPoolTaskExecutor ptask; @Value("${file-thread-pool.core-pool-size:1}") private int corePoolSize; @Value("${file-thread-pool.maximum-pool-size:1}") private int maximumPoolSize; private ThreadPoolExecutor executorService; /** * 程序加载配置文件后,延迟创建线程池 */ @PostConstruct public void init() { executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactoryBuilder().setNameFormat("delay-queue-%d").build()); } @Override public void run(String... args) { ptask.execute(() -> { while (true) { try { FileRedisDelayQueueEnum[] queueEnums = FileRedisDelayQueueEnum.values(); for (FileRedisDelayQueueEnum queueEnum : queueEnums) { Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode()); if (value != null) { System.out.println("----------------value:" + JSON.toJSONString(value)); RedisDelayQueueHandle<Object> redisDelayQueueHandle = (RedisDelayQueueHandle<Object>) context.getBean(queueEnum.getBeanId()); executorService.execute(() -> { redisDelayQueueHandle.execute(value); }); } } TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { log.error("(FileRedission延迟队列监测异常中断) {}", e.getMessage()); } } }); log.info("(FileRedission延迟队列监测启动成功)"); } }
使用时在需要延时的地方。
通过注入RedisDelayQueueUtil
,使用addDelayQueue
方法进行添加延迟任务。
Map<String, String> map = new HashMap<>();
map.put("dataId", examineVo.getId().toString());
map.put("fileLink", resourceLink);
map.put("type", resourceType.toString());
map.put("remark", "资源链接处理");
// 5秒后执行exposeLinkCloudDelay中的方法
redisDelayQueueUtil.addDelayQueue(map, 5, TimeUnit.SECONDS, FileRedisDelayQueueEnum.EXPOSE_LINK_CLOUD_DELAY.getCode());
package cn.creatoo.common.redis.queue; import cn.creatoo.common.core.utils.StringUtils; import org.redisson.api.RBoundedBlockingQueue; import org.redisson.api.RQueue; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.lang.NonNull; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Objects; /** * 分布式队列工具类 */ @Component @ConditionalOnBean({RedissonClient.class}) public class RedisBlockQueueUtil { private static final Logger log = LoggerFactory.getLogger(RedisBlockQueueUtil.class); @Resource private RedissonClient redissonClient; // public <T> boolean addQueue(@NonNull T value, @NonNull String queueCode) { if (StringUtils.isBlank(queueCode) || Objects.isNull(value)) { return false; } try { RBoundedBlockingQueue<T> queue = redissonClient.getBoundedBlockingQueue(queueCode); queue.trySetCapacity(10000); queue.put(value); } catch (Exception e) { throw new RuntimeException("(添加redisson队列失败)"); } return true; } /** * 获取队列 * @param queueCode * @param <T> */ public <T> T getQueuePeek(@NonNull String queueCode) throws InterruptedException { if (StringUtils.isBlank(queueCode)) { return null; } RQueue<T> queue = redissonClient.getBoundedBlockingQueue(queueCode); T obj = (T) queue.peek(); return obj; } public <T> T getQueueTake(@NonNull String queueCode) throws InterruptedException { if (StringUtils.isBlank(queueCode)) { return null; } RBoundedBlockingQueue<T> queue = redissonClient.getBoundedBlockingQueue(queueCode); T obj = (T) queue.take(); return obj; } }
package cn.creatoo.common.core.enums; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; /** * 队列业务枚举 */ @Getter @NoArgsConstructor @AllArgsConstructor public enum RedisQueueEnum { FLOW_RECORD("redissionQueue:FLOW_RECORD", "流量流水"), USER_LOGIN_RECORD("redissionQueue:USER_LOGIN_RECORD", "用户登录流水"), USER_REGISTER_RECORD("redissionQueue:USER_REGISTER_RECORD", "用户注册流水"), SMS_SEND_RECORD("redissionQueue:SMS_SEND_RECORD", "短信流水"); /** * 队列 Redis Key */ private String code; /** * 中文描述 */ private String name; }
package cn.creatoo.system.handler.impl; import cn.creatoo.common.core.enums.RedisQueueEnum; import cn.creatoo.common.core.utils.StringUtils; import cn.creatoo.common.mongodb.model.FlowStatistics; import cn.creatoo.common.mongodb.model.MessageSendRecord; import cn.creatoo.common.mongodb.model.UserLogin; import cn.creatoo.common.mongodb.model.UserRegister; import cn.creatoo.common.redis.queue.RedisBlockQueueUtil; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * description: 启动队列监测扫描 */ @Slf4j @Component public class RedisQueueRunner implements CommandLineRunner { @Autowired private RedisBlockQueueUtil redisBlockQueueUtil; //@Autowired //private IBdStatcountService bdStatcountService; @Autowired private ThreadPoolTaskExecutor ptask; @Resource private MongoTemplate mongoTemplate; //@Autowired //private BdAdminHomeService bdAdminHomeService; @Value("${prodHost.mall}") private String mallHost; ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 8, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000),new ThreadFactoryBuilder().setNameFormat("queue-%d").build()); @Override public void run(String... args) throws Exception { ptask.execute(() -> { while (true){ try { RedisQueueEnum[] queueEnums = RedisQueueEnum.values(); for (RedisQueueEnum queueEnum : queueEnums) { Object value = redisBlockQueueUtil.getQueuePeek(queueEnum.getCode()); if (value != null) { executorService.execute(() -> { try { //System.out.println(value.toString()); if(queueEnum.getCode().equals(RedisQueueEnum.FLOW_RECORD.getCode())){ FlowStatistics flowStatistics = redisBlockQueueUtil.getQueueTake(queueEnum.getCode()); /* if(flowStatistics!=null && StringUtils.isNotBlank(flowStatistics.getUrl())){ mongoTemplate.insert(flowStatistics, "pv_" + new SimpleDateFormat("yyyy").format(new Date())); // 添加首页统计缓存 bdAdminHomeService.addDetailCache(flowStatistics); if(StringUtils.isNotBlank(flowStatistics.getUrl())){ bdStatcountService.browseByUrl(flowStatistics.getUrl()); } }*/ } else if (queueEnum.getCode().equals(RedisQueueEnum.USER_LOGIN_RECORD.getCode())) { UserLogin userLogin = redisBlockQueueUtil.getQueueTake(queueEnum.getCode()); mongoTemplate.insert(userLogin, "user_login_" + new SimpleDateFormat("yyyy").format(new Date())); } else if (queueEnum.getCode().equals(RedisQueueEnum.USER_REGISTER_RECORD.getCode())) { UserRegister userRegister = redisBlockQueueUtil.getQueueTake(queueEnum.getCode()); mongoTemplate.insert(userRegister, "user_register"); } else if (queueEnum.getCode().equals(RedisQueueEnum.SMS_SEND_RECORD.getCode())) { MessageSendRecord sendRecord = redisBlockQueueUtil.getQueueTake(queueEnum.getCode()); mongoTemplate.insert(sendRecord, "sms_send_" + new SimpleDateFormat("yyyy").format(new Date())); } } catch (InterruptedException e) { log.error("(Redission队列监测异常中断) {}", e.getMessage()); } }); } } TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { log.error("(Redission队列监测异常中断) {}", e.getMessage()); } } }); log.info("(Redission队列监测启动成功)"); } }
这个是直接执行,没有延迟的功能
redisBlockQueueUtil.addQueue(userRegister, RedisQueueEnum.USER_REGISTER_RECORD.getCode());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。