当前位置:   article > 正文

redis实现延迟队列_redisson 延时队列 原理

redisson 延时队列 原理

前言:redis实现延迟队列该怎么做?在这里我分享一下

一、Redis实现延迟队列

  1. 失效监听
  2. redisson实现发布订阅延迟

二、redis失效监听事件

集成KeyExpirationEventMessageListener类实现redis失效监听事件

三、此种实现面临的问题

  1. redis的失效监听事件会存在一定的时间差,并且当数据量越大时,误差会越大。
  2. redis的失效监听事件会将所有key失效都会通知到onMessage,如果针对一个key,分布式业务的场景下,会出现重复消费的问题。(可以增加分布式锁的实现,但是redisson分布式锁提供了另一种延迟队列的实现方式)

四、开发准备

redis需要在服务端开启配置,打开redis服务的配置文件 添加notify-keyspace-events Ex

  • 相关参数如下:
K:keyspace事件,事件以__keyspace@<db>__为前缀进行发布;        
E:keyevent事件,事件以__keyevent@<db>__为前缀进行发布;        
g:一般性的,非特定类型的命令,比如del,expire,rename等;       
$:字符串特定命令;        
l:列表特定命令;        
s:集合特定命令;        
h:哈希特定命令;        
z:有序集合特定命令;        
x:过期事件,当某个键过期并删除时会产生该事件;        
e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件;        
A:g$lshzxe的别名,因此”AKE”意味着所有事件。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

五、基础实现

  1. 加入依赖
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  1. 可正常连接存取redis数据之后,创建监听类RedisKeyExpirationListener继承KeyExpirationEventMessageListener,重写onMessage方法。(key失效之后,会发出onMessage方法,之呢个获取失效的key值,不能获取key对应的value值)。
import com.test01.scrm.service.member.api.common.MemberStatusEnum;
import com.test01.scrm.service.member.provider.service.base.IBaseMemberService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

/**
 * @author lwl
 */
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
    private final IBaseMemberService baseMemberService;


    private final static String MEMBER_LOCK_ACCOUNT_SUFFIX = ".lock_account";
    private final static String MEMBER_LOCK_ACCOUNT_DOMAIN_SUFFIX = "T";
    private final static String MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX = "M";
    private final static String MEMBER_REDISSON_LOCK = ".member_lock_redisson";
    private final static int WAIT_TIME = 5;
    private final static int LEASE_TIME = 10;

    public RedisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer, IBaseMemberService baseMemberService) {
        super(redisMessageListenerContainer);
        this.baseMemberService = baseMemberService;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        //获取失效的key
        String expiredKey = message.toString();
        log.info("================================get on message:{}====================", expiredKey);
        if (expiredKey.endsWith(MEMBER_LOCK_ACCOUNT_SUFFIX)) {
            log.info("================================on message:{}====================", expiredKey);
            try {
                log.info("=======待解锁账号解锁======expiredKey:{}", expiredKey);
                String tenantId = expiredKey.substring(expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_DOMAIN_SUFFIX) + 1, expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX));
                String memberId = expiredKey.substring(expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX) + 1, expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_SUFFIX));
                baseMemberService.updateAccount(Integer.parseInt(tenantId), Long.parseLong(memberId), MemberStatusEnum.NORMAL.getCode(), null);
            } catch (Exception exception) {
                log.info("auto unlock fail,expired key:{},exception:{}", expiredKey, exception.getMessage());
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  1. 创建一个配置类RedisConfig
/**
 * @author lwl
 */
@Configuration
public class RedisConfig {

    @Value("${redis.dbIndex}")
    private Integer dbIndex;

    private final String TOPIC = "__keyevent@" + dbIndex + "__:expired";
    private final RedisConnectionFactory redisConnectionFactory;

    public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }


    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        //keyevent事件,事件以__keyevent@<db>__为前缀进行发布
        //db为redis第几个库 db2...
//        redisMessageListenerContainer.addMessageListener(redisKeyExpirationListener, new PatternTopic(TOPIC));
        return redisMessageListenerContainer;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

六、使用redisson实现延迟队列

由于延时队列持久化在redis中,所以机器宕机数据不会异常丢失,机器重启后,会正常消费队列中积累的任务

七、redisson实现延迟队列的原理

使用redis的zset有序性,轮询zset中的每个元素,到点后将内容迁移至待消费的队列

八、延迟队列配置

package com.test01.scrm.service.member.provider.config.redisson.delay;

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lwl
 * redisson延迟队列
 */
@Configuration
public class RedissonQueueConfig {

    private final String queueName = "queue";

    @Bean
    public RBlockingQueue<String> rBlockingQueue(@Qualifier("redissonSingle") RedissonClient redissonClient) {
        return redissonClient.getBlockingQueue(queueName);
    }

    @Bean(name = "rDelayedQueue")
    public RDelayedQueue<String> rDelayedQueue(@Qualifier("redissonSingle") RedissonClient redissonClient,
                                               @Qualifier("rBlockingQueue") RBlockingQueue<String> blockQueue) {
        return redissonClient.getDelayedQueue(blockQueue);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

定义队列使用接口

package com.test01.scrm.service.member.provider.config.redisson.delay;

import java.util.concurrent.TimeUnit;

/**
 * @author lwl
 */
public interface DelayQueue {

    /**
     * 发布
     *
     * @param object
     * @return
     */
    Boolean offer(Object object);

    /**
     * 带延迟功能的队列
     *
     * @param object
     * @param time
     * @param timeUnit
     */
    void offer(Object object, Long time, TimeUnit timeUnit);

    void offerAsync(Object object, Long time, TimeUnit timeUnit);

    Boolean offerAsync(Object object);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

延迟队列实现

package com.test01.scrm.service.member.provider.config.redisson.delay;

import org.redisson.api.RDelayedQueue;
import org.redisson.api.RFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @author lwl
 */
@Component
public class RedissonDelayQueue implements DelayQueue {

    private static Logger log = LoggerFactory.getLogger(RedissonDelayQueue.class);

    @Resource(name = "rDelayedQueue")
    private RDelayedQueue<Object> rDelayedQueue;


    @Override
    public Boolean offer(Object object) {
        return rDelayedQueue.offer(object);
    }

    @Override
    public void offer(Object object, Long time, TimeUnit timeUnit) {
        rDelayedQueue.offer(object, time, timeUnit);
    }

    @Override
    public void offerAsync(Object object, Long time, TimeUnit timeUnit) {
        rDelayedQueue.offerAsync(object, time, timeUnit);
    }

    @Override
    public Boolean offerAsync(Object object) {
        boolean flag = false;
        RFuture<Boolean> rFuture = rDelayedQueue.offerAsync(object);
        try {
            flag = rFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            log.info("offerAsync exception:{}", e.getMessage());
            e.printStackTrace();
        }
        return flag;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

启动一个后台监控线程

package com.test01.scrm.service.member.provider.config.redisson.delay;

import org.redisson.api.RBlockingQueue;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * @author test01
 */
@Component
public class RedissonTask {
    @Resource(name = "rBlockingQueue")
    private RBlockingQueue<Object> rBlockingQueue;

    @PostConstruct
    public void take() {
        new Thread(() -> {
            while (true) {
                try {
                    System.out.println("=========================" + rBlockingQueue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

使用延迟队列发送

package com.test01.scrm.service.member.provider.impl;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mybatis.spring.annotation.MapperScan;
import org.redisson.api.RDelayedQueue;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles(value = "llh")
@MapperScan("com.test01.scrm.service.member.provider.mapper")
public class RDelayQueueTests {

    @Resource(name = "rDelayedQueue")
    private RDelayedQueue<Object> rDelayedQueue;

    @Test
    public void offerAsync() {

        rDelayedQueue.offerAsync("llh send message", 20, TimeUnit.SECONDS);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

九、疑问解答与加群交流学习

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/649466
推荐阅读
相关标签
  

闽ICP备14008679号