赞
踩
今天开发了一个短信提醒的功能,工作流程
大概是:医生工作室收到了用户的咨询订单,如果医生在10分钟没有处理订单,系统会发送短信提醒
医生及时处理订单。
看到这个业务首先想到的实现方式有两种
:
1)定时线程
周期性查询订单信息;
2)通过延迟消息
实现;
如果对准确性和性能要求比较高,选择延迟消息
是一个比较合适的方式,但是延迟消息要考虑数据持久化和引入第三方中间件的问题,由于我们项目环境中有redis组件,所以延迟消息在不引入其它组件的前提下使用redis实现延迟消息功能是比较合适的;
redis实现延迟消息的方式有两种
:
1)监听redis过期key值
,key的有效期作为延迟时间;
2)通过zset类型
实现;
由于第一种方式"redis监听过期"的延迟时间可能存在较大偏差的问题,我们选择使用zset实现延迟消息
;
同时,redis不能保证原子性
,我们继续结合lua脚本
实现延迟消息;
注:由于redis用的比较多,接下来还会写几篇redis+lua实现业务功能的文章
1. lua脚本
zrangebyscore命令
:zrangebyscore 返回有序集合中指定分数区间的成员列表,有序集成员按分数值递增(从小到大)次序排列;
zrem命令
:用于移除有序集中的一个或多个成员,不存在的成员将被忽略;
---
--- Created by Mars.
---
local zkey = KEYS[1]
local maxsco = ARGV[1]
--redis.call('zadd',zkey,0,ARGV[1])
local zrange = redis.call('zrangebyscore',zkey,0,maxsco,'LIMIT',0,1)
if next(zrange) ~= nil and #zrange > 0
then
local rmnum = redis.call('zrem',zkey,unpack(zrange))
if(rmnum > 0)
then
return zrange
end
else
return {}
end
2. lua脚本初始化:LuaConfig类
package com.sk.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import java.util.List;
@Configuration
public class LuaConfig {
@Bean
public DefaultRedisScript<List> defaultRedisScript(){
DefaultRedisScript<List> redisScript = new DefaultRedisScript<>();
redisScript.setResultType(List.class);
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/asyncmsg.lua")));
return redisScript;
}
}
3. 延迟消息业务类
package com.sk.service;
import java.util.Set;
public interface RedisDelayMsgService {
void msgEnQueue(Object data, int delayTime,String orderDelayQueue);
Set<Object> msgDequeue(String orderDelayQueue);
}
4. 延迟消息业务类实现
package com.sk.service.impl;
import com.sk.service.RedisDelayMsgService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Service
public class RedisDelayMsgServiceImpl implements RedisDelayMsgService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private DefaultRedisScript redisScript;
@Override
public void msgEnQueue(Object data, int delayTime,String orderDelayQueue) {
delayTime +=System.currentTimeMillis();
redisTemplate.opsForZSet().add(orderDelayQueue, data, delayTime);
}
@Override
public Set<Object> msgDequeue(String orderDelayQueue) {
List execute = (List) redisTemplate.execute(redisScript, Collections.singletonList(orderDelayQueue), System.currentTimeMillis());
if (execute != null && execute.size() > 0){
return new HashSet<>(execute);
}
return null;
}
}
5. 监听获取延迟消息
package com.sk.init;
import com.sk.service.RedisDelayMsgService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
@Configuration
public class OrderMsgTask implements CommandLineRunner {
@Resource
private RedisDelayMsgService redisDelayMsgService;
@Override
public void run(String... args) throws Exception {
// 创建 ScheduledThreadPool 线程池
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
log.info("OrderMsgTask -> run() 方法添加任务:" + LocalDateTime.now());
threadPool.schedule(new Runnable() {
@Override
public void run() {
Set<Object> msgSet = redisDelayMsgService.msgDequeue();
if(null != msgSet){
Iterator it = msgSet.iterator();
while(it.hasNext()) {
String orderMsg = (String) it.next();
log.info("------redis--延迟消息,orderMsg:{}",orderMsg);
}
}
System.out.println("执行 schedule 方法:" + LocalDateTime.now());
}
}, 3, TimeUnit.SECONDS); // 3s 之后执行
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。