当前位置:   article > 正文

Redis消息队列、阻塞队列、延时队列_redis 队列

redis 队列

目录

 一、普通队列

使用redis的命令来模拟普通队列

使用lpush命令生产消息:

使用rpop命令消费消息:

使用Java代码来实现普通队列:

生产者SingleProducer

消费者SingleConsumer:

二、Redis阻塞队列

使用redis的brpop命令来模拟阻塞队列

Java阻塞队列生产者实现如下:

Java阻塞队列消费者实现如下:

三、Redis延迟队列

下面使用redis的zset来模拟延时队列

命令生产者:

命令消费者:

使用Java代码来实现普通队列:

生产者DelayProducer :

消费者DelayConsumer :

应用场景


redis 队列的优点是轻量级,业务足够简单时不需要使用rabbitMq这样专业的消息中间件;缺点是弹出队列中的元素时,即使该消息处理失败也无法再次进行消费

Redis队列 List

 一、普通队列

可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。

  • lpush+rpop:左进右出的队列
  • rpush+lpop:左出右进的队列

使用redis的命令来模拟普通队列

使用lpush命令生产消息:

  1. >lpush queue:single 1
  2. "1"
  3. >lpush queue:single 2
  4. "2"
  5. >lpush queue:single 3
  6. "3"

使用rpop命令消费消息:

  1. >rpop queue:single
  2. "1"
  3. >rpop queue:single
  4. "2"
  5. >rpop queue:single
  6. "3"

使用Java代码来实现普通队列:

生产者SingleProducer

  1. package com.morris.redis.demo.queue.single;
  2. import redis.clients.jedis.Jedis;
  3. /**
  4. * 生产者
  5. */
  6. public class SingleProducer {
  7. public static final String SINGLE_QUEUE_NAME = "queue:single";
  8. public static void main(String[] args) {
  9. Jedis jedis = new Jedis();
  10. for (int i = 0; i < 100; i++) {
  11. jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i);
  12. }
  13. jedis.close();
  14. }
  15. }

消费者SingleConsumer:

  1. package com.morris.redis.demo.queue.single;
  2. import redis.clients.jedis.Jedis;
  3. import java.util.Objects;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * 消费者
  7. */
  8. public class SingleConsumer {
  9. public static void main(String[] args) throws InterruptedException {
  10. Jedis jedis = new Jedis();
  11. while (true) {
  12. String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
  13. if(Objects.nonNull(message)) {
  14. System.out.println(message);
  15. } else {
  16. TimeUnit.MILLISECONDS.sleep(500);
  17. }
  18. }
  19. }
  20. }

上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:

  1. 普通的redis队列,为了实现业务,通常会使用while进行循环,这样的话没有消息时依旧会频繁的执行循环,造成cpu的空转,所以一般会在代码中增加sleep来解决该问题,但因此又会造成消息延迟问题。
  2. 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

二、Redis阻塞队列

redis队列提供了 “阻塞式” 拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。
如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知消费者立即处理新消息。

使用redis的brpop命令来模拟阻塞队列

>brpop queue:single 30

使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL

Java阻塞队列生产者实现如下:

  1. package com.cxh;
  2. import org.junit.jupiter.api.Test;
  3. import redis.clients.jedis.Jedis;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * 生产者类
  7. * 生产者每隔600ms生成一条消息
  8. * */
  9. class MessageProducer extends Thread{
  10. public static final String MESSAGE_KEY = "message:queue";
  11. private volatile int count;
  12. public void putMessage(String mess){
  13. Jedis jedis = new Jedis("127.0.0.1", 6379);
  14. /* jedis.auth("123456");*/
  15. Long size = jedis.lpush(MESSAGE_KEY, mess);
  16. System.out.println("Put " + Thread.currentThread().getName() + " put message " + count);
  17. count++;
  18. }
  19. @Override
  20. public synchronized void run() {
  21. for(int i = 0 ; i < 1; i++){
  22. putMessage("message" + count);
  23. try {
  24. Thread.sleep(600);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  30. }

Java阻塞队列消费者实现如下:

  1. package com.cxh.Component;
  2. import org.springframework.boot.CommandLineRunner;
  3. import org.springframework.stereotype.Component;
  4. import redis.clients.jedis.Jedis;
  5. import java.util.Arrays;
  6. import java.util.List;
  7. @Component
  8. public class component implements CommandLineRunner {
  9. public static final String MESSAGE_KEY = "message:queue";
  10. @Override
  11. public void run(String... args) throws Exception {
  12. /* //todo: 需要执行的方法
  13. System.out.println(Arrays.toString(args));*/
  14. System.out.println("comsumer 111");
  15. Jedis jedis = new Jedis("127.0.0.1", 6379);
  16. // String message = jedis.rpop(MESSAGE_KEY);
  17. // System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message); //多线程的时候使用
  18. while (true){
  19. List<String> message = jedis.brpop(0, MESSAGE_KEY);
  20. System.out.println(message.toString());
  21. }
  22. }
  23. }

上面的代码已经实现了延迟队列的生产与消费,需要注意的是:

  1. 无法实现一次生产多次消费(使用 pub/sub 发布订阅模式,可以实现 1:N 的消息队列,即一次生产,多端消费
  2. 阻塞时间结束后代码会继续向下执行
  3. 如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,客户端要有处理机制。
    实际项目中redis连接超时时间远大于20s,因此正常情况不会出现redis超时问题。以防万一增加redis异常捕获,出现异常时杀掉当前进程,同时supervisord会自动重新拉起该进程

三、Redis延迟队列

zset 会按 score 进行排序,如果 score 代表想要执行时间的时间戳。在某个时间将它插入zset集合中,它会按照时间戳大小进行排序,也就是对执行时间前后进行排序。

起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,可以达到延时执行的目的。

下面使用redis的zset来模拟延时队列

命令生产者:

  1. 127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3
  2. (integer) 0

命令消费者:

  1. 127.0.0.1:6379> zrange queue:delay 0 0 withscores
  2. 1) "order1"
  3. 2) "1"
  4. 127.0.0.1:6379> zrem queue:delay order1
  5. (integer) 1

使用Java代码来实现普通队列:

生产者DelayProducer :

  1. package com.morris.redis.demo.queue.delay;
  2. import redis.clients.jedis.Jedis;
  3. import java.util.Date;
  4. import java.util.Random;
  5. /**
  6. * 生产者
  7. */
  8. public class DelayProducer {
  9. public static final String DELAY_QUEUE_NAME = "queue:delay";
  10. public static void main(String[] args) {
  11. Jedis jedis = new Jedis();
  12. long now = new Date().getTime();
  13. Random random = new Random();
  14. for (int i = 0; i < 10; i++) {
  15. int second = random.nextInt(30); // 随机订单失效时间
  16. jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i);
  17. }
  18. jedis.close();
  19. }
  20. }

消费者DelayConsumer :

  1. package com.morris.redis.demo.queue.delay;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.Tuple;
  4. import java.util.Date;
  5. import java.util.List;
  6. import java.util.Set;
  7. import java.util.concurrent.TimeUnit;
  8. /**
  9. * 消费者
  10. */
  11. public class DelayConsumer {
  12. public static void main(String[] args) throws InterruptedException {
  13. Jedis jedis = new Jedis();
  14. while (true) {
  15. long now = new Date().getTime();
  16. Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
  17. if(tupleSet.isEmpty()) {
  18. TimeUnit.MILLISECONDS.sleep(500);
  19. } else {
  20. for (Tuple tuple : tupleSet) {
  21. Double score = tuple.getScore();
  22. long time = score.longValue();
  23. if(time < now) {
  24. jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
  25. System.out.println("order[" + tuple.getElement() +"] is timeout at " + time);
  26. } else {
  27. TimeUnit.MILLISECONDS.sleep(500);
  28. }
  29. break;
  30. }
  31. }
  32. }
  33. }
  34. }

应用场景

延时队列在项目中的应用还是比较多的,尤其像电商类平台:

  1. 12306 下单成功后,在半个小时内没有支付,自动取消订单。
  2. 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存。
  3. 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等。
  4. 会议预定系统,在预定会议开始前半小时通知所有预定该会议的用户。
  5. 安全工单超过 24 小时未处理,则自动拉企业群提醒相关责任人。
  6. 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。
  7. 外卖平台发送订餐通知,下单成功后 60s 给用户推送短信。
     
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/479614
推荐阅读
相关标签
  

闽ICP备14008679号