当前位置:   article > 正文

基于Redis实现的延迟消息队列_springmvc+redis实现消息队列及延迟队列

springmvc+redis实现消息队列及延迟队列

本文以redis为数据结构基础,配合Spring管理机制,使用java实现了一个轻量级、可配置的消息队列。适合的项目特点:

  • Spring框架管理对象
  • 有消息需求,但不想维护mq中间件
  • 有使用redis
  • 对消息持久化并没有很苛刻的要求

需要使用rabbitmq实现延迟消息请参考这里


1. 设计方案

设计主要包含以下几点:

  • 将整个Redis当做消息池,以kv形式存储消息
  • 使用ZSET做优先队列,按照score维持优先级
  • 使用LIST结构,以先进先出的方式消费
  • zset和list存储消息地址(对应消息池的每个key)
  • 自定义路由对象,存储zset和list名称,以点对点的方式将消息从zset路由到正确的list
  • 使用定时器维持路由
  • 根据TTL规则实现消息延迟

    基于Redis有消息队列设计方案

2. 代码实现

2.1 技术说明

示例使用Springboot,gradle,redis,jdk8。

2.2 核心代码

核心代码主要包含消息对象Message,路由器Route和消息队列RedisMQ。

2.2.1 Message对象

package git.yampery.msmq;

/**
 * @decription Message
 * <p>封装消息元数据</p>
 * @author Yampery
 * @date 2017/11/2 15:50
 */
public class Message {

    /**
     * 消息主题
     */
    private String topic;
    /**
     * 消息id
     */
    private String id;
    /**
     * 消息延迟
     */
    private long delay;
    /**
     * 消息优先级
     */
    private int priority;
    /**
     * 消息存活时间
     */
    private int ttl;
    /**
     * 消息体,对应业务内容
     */
    private String body;
    /**
     * 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
     * 用来消除时间的影响
     */
    private long createTime;
    /**
     * 消息状态(延迟-0;待发送-1;已发送-2;发送失败-3)
     */
    private int status;

    /**
     * getset略...
     */
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

2.2.2 Route(消息路由器)

package git.yampery.msmq;

/**
 * @decription Route
 * <p>消息路由器,主要控制将消息从指定的队列路由到待消费的list<br>
 * 通过这种方式实现自定义延迟以及优先级发送</p>
 * @author Yampery
 * @date 2017/11/3 14:33
 */
public class Route {

    /**
     * 存放消息的队列
     */
    private String queue;

    /**
     * 待消费的列表
     */
    private String list;

    public Route(String queue, String list) {
        this.queue = queue;
        this.list = list;
    }

    /**
     * getset略...
     */
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

2.2.3 RedisMq(消息队列)

package git.yampery.msmq;

import git.yampery.utils.JedisUtils;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
 * @decription RedisMQ
 * <p>基于redis的消息队列</p>
 * <p>将整个redis作为消息池存储消息体,以ZSET为消息队列,LIST作为待消费列表<br>
 * 用Spring定时器作为监听器,每次监听ZSET中指定数量的消息<br>
 * 根据SCORE确定是否达到发送要求,如果达到,利用消息路由{@link Route}将消息路由到待消费list</p>
 * @author Yampery
 * @date 2017/11/2 15:49
 */
public class RedisMQ {

    /**
     * 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message}
     * 的消息体body作为值存储
     */
    private static final String MSG_POOL = "Message:Pool:";
    /**
     * 默认监听数量,对应监听zset队列前多少个元素
     */
    private static final int DEFAUT_MONITOR = 10;
    @Resource private JedisUtils jedisUtils;


    /**
     * 每次监听queue中元素的数量,可配置
     */
    private int monitorCount = DEFAUT_MONITOR;


    /**
     * 消息路由
     */
    private List<Route> routes;

    /**
     * 存入消息池
     * @param message
     * @return
     */
    public boolean addMsgPool(Message message) {

        if (null != message) {
            return jedisUtils.setex(MSG_POOL + message.getId(), message.getBody(), message.getTtl());
        }
        return false;
    }

    /**
     * 从消息池中删除消息
     * @param id
     * @return
     */
    public boolean deMsgPool(String id) {

        return jedisUtils.del(MSG_POOL + id);
    }

    /**
     * 像队列中添加消息
     * @param key
     * @param score 优先级
     * @param val
     * @return 返回消息id
     */
    public String enMessage(String key, long score, String val) {

        if (jedisUtils.zadd(key, score, val)) {
            return val;
        }
        return "";
    }

    /**
     * 从队列删除消息
     * @param id
     * @return
     */
    public boolean deMessage(String key, String id) {

        return jedisUtils.zdel(key, id);
    }

    /**
     * 消费
     * @return
     */
    public List<String> consume(String key) {

        long count = jedisUtils.countList(key);
        if (0 < count) {
            // 可根据需求做限制
            List<String> ids = jedisUtils.rangeList(key, 0, count - 1);
            if (ids != null) {
                List<String> result = new ArrayList<>();
                ids.forEach(l -> result.add(jedisUtils.get(MSG_POOL + l, "")));
                jedisUtils.removeListValue(key, ids);
                return result;
            } /// if end~
        }

        return null;
    }

    /**
     * 消息队列监听器<br>
     * 监听所有路由器,将消息队列中的消息路由到待消费列表
     */
    @Scheduled(cron="*/5 * * * * *")
    public void monitor() {
        // 获取消息路由
        int route_size;
        if (null == routes || 1 > (route_size = routes.size())) return;
        String queue, list;
        Set<String> set;
        for (int i = 0; i < route_size; i++) {
            queue = routes.get(i).getQueue();
            list = routes.get(i).getList();
            set = jedisUtils.getSoredSetByRange(queue, 0, monitorCount, true);
            if (null != set) {
                long current = System.currentTimeMillis();
                long score;
                for (String id : set) {
                     score = jedisUtils.getScore(queue, id).longValue();
                    if (current >= score) {
                        // 添加到list
                        if (jedisUtils.insertList(list, id)) {
                            // 删除queue中的元素
                            deMessage(queue, id);
                        } /// if end~
                    } /// if end~
                } /// for end~
            } /// if end~
        } /// for end~
    }

    public int getMonitorCount() {
        return monitorCount;
    }

    public void setMonitorCount(int monitorCount) {
        this.monitorCount = monitorCount;
    }

    public List<Route> getRoutes() {
        return routes;
    }

    public void setRoutes(List<Route> routes) {
        this.routes = routes;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162

2.2.4 RedisMq消息队列配置:

  • mq.properties文件
# 队列的监听数量
mq.monitor.count                 =30
# 队列一
mq.queue.first                  =queue:1
# 队列二
mq.queue.second                 =queue:2
# 消费列表一
mq.consumer.first               =list:1
# 消费列表二
mq.consumer.second              =list:2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • MqConfig.java文件
package git.yampery.config;

import git.yampery.msmq.RedisMQ;
import git.yampery.msmq.Route;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.util.ArrayList;
import java.util.List;

/**
 * @decription MqConfig
 * <p>消息队列配置</p>
 * @author Yampery
 * @date 2018/2/9 14:26
 *
 * 根据不同的架构可选择使用XML配置
 * ---------------------------------------------------
 *
    <bean id="redisMQ" class="git.yampery.msmq.RedisMQ">
        <property name="monitorCount" value="15"/>
        <property name="routes">
            <list>
                <bean class="git.yampery.msmq.Route">
                    <property name="queue" value="${mq.queue.first}"/>
                    <property name="list" value="${mq.consumer.first}"/>
                </bean>
                <bean class="git.yampery.msmq.Route">
                    <property name="queue" value="${mq.queue.second}"/>
                    <property name="list" value="${mq.consumer.second}"/>
                </bean>
            </list>
        </property>
    </bean>

 * ----------------------------------------------------
 */
@Configuration
public class MqConfig {

    @Bean(name = "redisMQ")
    @Primary
    public RedisMQ getRedisMq() {
        RedisMQ redisMQ = new RedisMQ();
        // 配置监听队列元素数量
        redisMQ.setMonitorCount(monitorCount);
        // 配置路由表
        redisMQ.setRoutes(routeList());
        return redisMQ;
    }

    /**
     * 返回路由表
     * @return
     */
    public List<Route> routeList() {
        List<Route> routeList = new ArrayList<>();
        Route routeFirst = new Route(queueFirst, listFirst);
        Route routeSecond = new Route(queueSecond, listSecond);
        routeList.add(routeFirst);
        routeList.add(routeSecond);
        return routeList;
    }

    @Value("${mq.monitor.count}")
    private int monitorCount;
    @Value("${mq.queue.first}")
    private String queueFirst;
    @Value("${mq.queue.second}")
    private String queueSecond;
    @Value("${mq.consumer.first}")
    private String listFirst;
    @Value("${mq.consumer.second}")
    private String listSecond;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

如果使用的是xml配置 请参考:

<bean id="redisMQ" class="git.yampery.msmq.RedisMQ">
    <property name="monitorCount" value="15"/>
    <property name="routes">
        <list>
            <bean class="git.yampery.msmq.Route">
                <property name="queue" value="${mq.queue.first}"/>
                <property name="list" value="${mq.consumer.first}"/>
            </bean>
            <bean class="git.yampery.msmq.Route">
                <property name="queue" value="${mq.queue.second}"/>
                <property name="list" value="${mq.consumer.second}"/>
            </bean>
        </list>
    </property>
</bean>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2.2.5 消费者

并没有内置消费者监听器来实现,可以直接使用定时器实现

package git.yampery.task;

import com.alibaba.fastjson.JSONObject;
import git.yampery.msmq.RedisMQ;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.util.List;

/**
 * @decription MsgTask
 * <p>发送消息</p>
 * @author Yampery
 * @date 2018/2/9 18:04
 */
@Component
public class MsgTask {

    @Resource private RedisMQ redisMQ;
    // @Value("${mq.list.first}") private String MQ_LIST_FIRST;

    @Scheduled(cron="*/5 * * * * *")
    public void sendMsg() {
        // 消费
        List<String> msgs = redisMQ.consume(redisMQ.getRoutes().get(0).getList());
        int len;
        if (null != msgs && 0 < (len = msgs.size())) {
            // 将每一条消息转为JSONObject
            JSONObject jObj;
            for (int i = 0; i < len; i++) {
                if (!StringUtils.isEmpty(msgs.get(i))) {
                    jObj = JSONObject.parseObject(msgs.get(i));
                    // 取出消息
                    System.out.println(jObj.toJSONString());
                }
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

4. 测试

测试设置20秒延迟,发布消息到queue:1,在list:1消费。

package git.yampery.mq;

import com.alibaba.fastjson.JSONObject;
import git.yampery.msmq.Message;
import git.yampery.msmq.RedisMQ;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @decription TestMQ
 * <p>测试</p>
 * @author Yampery
 * @date 2018/2/9 18:43
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestMQ {

    @Resource
    private RedisMQ redisMQ;
    @Value("${mq.queue.first}")
    private String MQ_QUEUE_FIRST;

    @Test
    public void testMq() {

        JSONObject jObj = new JSONObject();
        jObj.put("msg", "这是一条短信");

        String seqId = UUID.randomUUID().toString();

        // 将有效信息放入消息队列和消息池中
        Message message = new Message();
        message.setBody(jObj.toJSONString());
        // 可以添加延迟配置
        message.setDelay(20);
        message.setTopic("SMS");
        message.setCreateTime(System.currentTimeMillis());
        message.setId(seqId);
        // 设置消息池ttl,防止长期占用
        message.setTtl(20 * 60);
        message.setStatus(0);
        message.setPriority(0);
        redisMQ.addMsgPool(message);
        redisMQ.enMessage(MQ_QUEUE_FIRST,
                message.getCreateTime() + message.getDelay() + message.getPriority(), message.getId());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

3. 总结

文章利用redis已有的数据存储结构,实现了轻量级的消息队列,并未真正实现消息持久化。示例是针对点对点的消息路由方式,当然,也可以扩展成广播和主题的方式,不过,这样就得不偿失了,如果需求比较复杂,可靠性要求较高,反而不如直接维护rabbitmq之类的消息队列。
需要使用rabbitmq实现延迟消息请参考这里

4. 源码连接

文章并未贴出所有代码,gradle构建、jedisUtils以及一些配置等可以参考源码,源码只需要设置自己的redis配置即可。
https://github.com/Yampery/rdsmq.git

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/125692
推荐阅读
相关标签
  

闽ICP备14008679号