当前位置:   article > 正文

【技术应用】springboot+redis+lua实现延迟消息_lua 脚本 zset 延时队列 springboot

lua 脚本 zset 延时队列 springboot

【技术应用】springboot+redis+lua实现延迟消息

一、前言

今天开发了一个短信提醒的功能,工作流程大概是:医生工作室收到了用户的咨询订单,如果医生在10分钟没有处理订单,系统会发送短信提醒医生及时处理订单。

二、方案分析

看到这个业务首先想到的实现方式两种
1)定时线程周期性查询订单信息;
2)通过延迟消息实现;

如果对准确性和性能要求比较高,选择延迟消息是一个比较合适的方式,但是延迟消息要考虑数据持久化和引入第三方中间件的问题,由于我们项目环境中有redis组件,所以延迟消息在不引入其它组件的前提下使用redis实现延迟消息功能是比较合适的;

redis实现延迟消息的方式有两种
1)监听redis过期key值,key的有效期作为延迟时间;
2)通过zset类型实现;

由于第一种方式"redis监听过期"的延迟时间可能存在较大偏差的问题,我们选择使用zset实现延迟消息
同时,redis不能保证原子性,我们继续结合lua脚本实现延迟消息;

注:由于redis用的比较多,接下来还会写几篇redis+lua实现业务功能的文章

三、关键代码

1. lua脚本

zrangebyscore命令:zrangebyscore 返回有序集合中指定分数区间的成员列表,有序集成员按分数值递增(从小到大)次序排列;

zrem命令:用于移除有序集中的一个或多个成员,不存在的成员将被忽略;

---
--- Created by Mars.
---
local zkey = KEYS[1]
local maxsco = ARGV[1]
--redis.call('zadd',zkey,0,ARGV[1])
local zrange = redis.call('zrangebyscore',zkey,0,maxsco,'LIMIT',0,1)
if next(zrange) ~= nil and #zrange > 0
then
	local rmnum = redis.call('zrem',zkey,unpack(zrange))
	if(rmnum > 0)
	then
		return zrange
	end
else
	return {}
end
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2. lua脚本初始化:LuaConfig类

package com.sk.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;


import java.util.List;

@Configuration
public class LuaConfig {

	@Bean
	public DefaultRedisScript<List> defaultRedisScript(){
		DefaultRedisScript<List> redisScript = new DefaultRedisScript<>();
		redisScript.setResultType(List.class);
		redisScript.setScriptSource(new ResourceScriptSource(new  ClassPathResource("lua/asyncmsg.lua")));
		return redisScript;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

3. 延迟消息业务类

package com.sk.service;

import java.util.Set;

public interface RedisDelayMsgService {

	void msgEnQueue(Object data, int delayTime,String orderDelayQueue);

	Set<Object> msgDequeue(String orderDelayQueue);

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

4. 延迟消息业务类实现

package com.sk.service.impl;

import com.sk.service.RedisDelayMsgService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@Service
public class RedisDelayMsgServiceImpl implements RedisDelayMsgService {

	@Autowired
	private RedisTemplate redisTemplate;

	@Autowired
	private DefaultRedisScript redisScript;

	@Override
	public void msgEnQueue(Object data, int delayTime,String orderDelayQueue) {
		delayTime +=System.currentTimeMillis();
		redisTemplate.opsForZSet().add(orderDelayQueue, data, delayTime);
	}

	@Override
	public Set<Object> msgDequeue(String orderDelayQueue) {
		List execute = (List) redisTemplate.execute(redisScript, Collections.singletonList(orderDelayQueue), System.currentTimeMillis());
		if (execute != null && execute.size() > 0){
			return new HashSet<>(execute);
		}
		return null;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

5. 监听获取延迟消息

package com.sk.init;

import com.sk.service.RedisDelayMsgService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Slf4j
@Configuration
public class OrderMsgTask implements CommandLineRunner {

	@Resource
	private RedisDelayMsgService redisDelayMsgService;

	@Override
	public void run(String... args) throws Exception {

		// 创建 ScheduledThreadPool 线程池
		ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
		log.info("OrderMsgTask -> run() 方法添加任务:" + LocalDateTime.now());
		threadPool.schedule(new Runnable() {
			@Override
			public void run() {

				Set<Object> msgSet = redisDelayMsgService.msgDequeue();
				if(null != msgSet){
					Iterator it = msgSet.iterator();
					while(it.hasNext()) {
						String orderMsg = (String) it.next();
						log.info("------redis--延迟消息,orderMsg:{}",orderMsg);
					}
				}
				System.out.println("执行 schedule 方法:" + LocalDateTime.now());
			}

		}, 3, TimeUnit.SECONDS); // 3s 之后执行
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/600945
推荐阅读
相关标签
  

闽ICP备14008679号