赞
踩
A:需求说明:
B:实现思路:
C:代码实现
Message消息封装类
- @Data
- public class Message {
-
- /**
- * 消息id
- */
- private String id;
- /**
- * 消息延迟/毫秒
- */
- private long delay;
-
- /**
- * 消息存活时间
- */
- private int ttl;
- /**
- * 消息体,对应业务内容
- */
- private String body;
- /**
- * 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
- * 用来消除时间的影响
- */
- private long createTime;
-
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
2.基于redis的消息队列
- @Component
- public class RedisMQ {
-
- /**
- * 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message}
- * 的消息体body作为值存储
- */
- public static final String MSG_POOL = "Message:Pool:";
- /**
- * zset队列 名称 queue
- */
- public static final String QUEUE_NAME = "Message:Queue:";
-
- private static final int SEMIH = 30*60;
-
-
-
- @Autowired
- private RedisService redisService;
-
- /**
- * 存入消息池
- * @param message
- * @return
- */
- public boolean addMsgPool(Message message) {
-
- if (null != message) {
- return redisService.setExp(MSG_POOL + message.getId(), message.getBody(), Long.valueOf(message.getTtl() + SEMIH));
- }
- return false;
- }
-
- /**
- * 从消息池中删除消息
- * @param id
- * @return
- */
- public void deMsgPool(String id) {
- redisService.remove(MSG_POOL + id);
- }
-
- /**
- * 向队列中添加消息
- * @param key
- * @param score 优先级
- * @param val
- * @return 返回消息id
- */
- public void enMessage(String key, long score, String val) {
- redisService.zsset(key,val,score);
- }
-
- /**
- * 从队列删除消息
- * @param id
- * @return
- */
- public boolean deMessage(String key, String id) {
- return redisService.zdel(key, id);
- }
-
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
3Redis操作工具类,这个工具类比较多方法,就不贴在这里了(https://blog.csdn.net/u010096717/article/details/83783865)
4.编写消息发送(生产者)
- @Component
- public class MessageProvider {
-
- static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
-
-
- private static int delay = 30;//30秒,可自己动态传入
-
- @Resource
- private RedisMQ redisMQ;
-
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- //改造成redis
- public void sendMessage(String messageContent) {
- try {
- if (messageContent != null){
- String seqId = UUID.randomUUID().toString();
- // 将有效信息放入消息队列和消息池中
- Message message = new Message();
- // 可以添加延迟配置
- message.setDelay(delay*1000);
- message.setCreateTime(System.currentTimeMillis());
- message.setBody(messageContent);
- message.setId(seqId);
- // 设置消息池ttl,防止长期占用
- message.setTtl(delay + 360);
- redisMQ.addMsgPool(message);
- //当前时间加上延时的时间,作为score
- Long delayTime = message.getCreateTime() + message.getDelay();
- String d = sdf.format(message.getCreateTime());
- System.out.println("当前时间:" + d+",消费的时间:" + sdf.format(delayTime));
- redisMQ.enMessage(RedisMQ.QUEUE_NAME,delayTime, message.getId());
- }else {
- logger.warn("消息内容为空!!!!!");
- }
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
5.消息消费者
- @Component
- public class RedisMQConsumer {
-
- @Resource
- private RedisMQ redisMQ;
-
- @Autowired
- private RedisService redisService;
-
- @Autowired
- private MessageProvider provider;
-
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-
- /**
- * 消息队列监听器<br>
- *
- */
- @Scheduled(cron = "*/1 * * * * *")
- public void monitor() {
- Set<String> set = redisService.rangeByScore(RedisMQ.QUEUE_NAME, 0, System.currentTimeMillis());
- if (null != set) {
- long current = System.currentTimeMillis();
- for (String id : set) {
- long score = redisService.getScore(RedisMQ.QUEUE_NAME, id).longValue();
- if (current >= score) {
- // 已超时的消息拿出来消费
- String str = "";
- try {
- str = redisService.get(RedisMQ.MSG_POOL + id);
- System.out.println("消费了:" + str+ ",消费的时间:" + sdf.format(System.currentTimeMillis()));
- } catch (Exception e) {
- e.printStackTrace();
- //如果出了异常,则重新放回队列
- System.out.println("消费异常,重新回到队列");
- provider.sendMessage(str);
- } finally {
- redisMQ.deMessage(RedisMQ.QUEUE_NAME, id);
- redisMQ.deMsgPool(id);
- }
- }
- }
- }
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
6.配置信息
- <!--1依赖引入-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
-
-
- 2yml配置
- spring:
- redis:
- database: 1
- host: 127.0.0.1
- port: 6379
以上代码已经实现了延迟消费功能,现在来测试一下,调用MessageProvider的sendMessage方法,我设定了30秒
可以看到结果
因为我们是用定时器去轮询的,会出现误差
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。