赞
踩
redis的zset它结合了set和list的特点
1、集合内元素不会重复
2、元素以有序的方式排列
zset中的元素都会关联一个分数score,内部将通过这个score对集合元素进行的排序。
虽然zset集合中元素不会重复,但score可以重复。
如果有两个score相同的元素,将按照元素的字典序进行排序
score保证了队列中的消息有序性
延迟队列的实现:
将数据存到redis的zset中并指定score(double),zset会对score进行排序,让最早消费的数据位于最前,拿最前的数据跟当前时间比较,时间到了则消费
redis
作为消息队列的优缺点ack
,消息确认机制,存在消息丢失的可能redis
作为延时消息队列如果是简单的日志推送,消息推送等,可以使用redis
队列。
生产者
- @Slf4j
- @Component
- public class MessageProvider {
-
-
- private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
-
-
-
-
- @Lazy
- @Resource
- MsgMapper msgMapper;
-
-
- @Lazy
- @Resource
- RedisUtil redisUtil;//这里就不放出来了,大家都有的
-
- private static String USER_CHANNEL = "随便_CHANNEL_";
-
- public static final String KEY_PREFIX ="你的前缀_msg:";
-
-
- /**
- * @Description: 发送消息添加到延迟队列
- * @param delay 延迟时间(排序的score)
- * @Author: fan
- * @Date: 2024/7/2 15:55
- */
- public void sendMessage(Long id,Long taskId,String messageContent, Long delay,String channel) {
- //消息体格式,可根据自己需要调整,这里就不放出来了
- Message message = new Message();
- String msgId = USER_CHANNEL + id;
- long time = System.currentTimeMillis();
- LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
- message.setCreateTime(dateTime);
- message.setDelayTime(delay);
- message.setBody(messageContent);
- message.setMsgId(msgId);
- message.setStatus(ImmobilizationEnum.not_fixed_broadcast.getCode());
- message.setChannel(USER_CHANNEL + channel);
- Boolean b = false;
- try {
- //推送到队列
- b = pushQUeue(message);
- } catch (Exception e) {
- log.error("[sendMessage_Exception异常] e={}", e);
- } finally {
- String value = redisUtil.getKey(YOUR_KEY_PREFIX + msgId);
- if (StringUtil.isEmpty(value )) {
- //如果没有则插入数据库,代码补偿
- Message msg = msgMapper.selectByMsgId(msgId);
- if (msg == null) {
- msgMapper.insert(message);
- redisUtil.setKey(YOUR_KEY_PREFIX + msgId, UUID.toString, 20);
- }
- }
- }
- }
-
-
- public Boolean pushQUeue(Message queueManager){
- Boolean b = false;
- try {
- String messageStr = mapper.writeValueAsString(queueManager);
- b = redisUtil.addZset(YOUR_QUEUE_NAME_KEY, messageStr, queueManager.getDelayTime());
- redisUtil.expire(YOUR_QUEUE_NAME_KEY, 20, TimeUnit.SECONDS);
- } catch (Exception e) {
- log.error("[push_Exception异常] e={} ",e);
- }
- return b;
- }
-
- public List<Message> pullZset(){
- long currentTimeMillis = System.currentTimeMillis();
- List<Message> messageList = new ArrayList<>();
- try{
- Set<String> strings = redisUtil.rangeByScore(YOUR_QUEUE_NAME_KEY, 0, Long.MAX_VALUE);
-
- if (CollectionUtils.isEmpty(strings)) {
- return null;
- }
- messageList = strings.stream().map(msg -> {
- Message message = null;
- try {
- message = mapper.readValue(msg,Message.class);
- } catch (Exception e) {
- log.error("[pull_Exception异常] e:{}",e);
- }
- return message;
- }).collect(Collectors.toList());
- } catch (Exception e) {
- log.error("[pull_Exception异常] Exception={} ", e);
- } finally {
- if (CollectionUtils.isEmpty(messageList)) {
- //如果缓存没有则从数据库取并下发到队列
- messageList = MsgMapper.selectByStatus("你的查询条件,这里按状态,具体SQL就不贴了");
- }
-
- }
- return messageList;
- }
-
-
-
- }
消费者
- //消费者方法
- public void delayingQueueConsumer(){
- List<Message> msgList = pullZset();
- if (!CollectionUtils.isEmpty(msgList)) {
- long current = getCurrentTime();
- for (int i = 0; i < msgList.size(); i++) {
- Message msg = msgList.get(i);
- //到点执行
- if (current >= msg.getDelayTime()) {
- //你的业务
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。