赞
踩
目录
redis 队列的优点是轻量级,业务足够简单时不需要使用rabbitMq这样专业的消息中间件;缺点是弹出队列中的元素时,即使该消息处理失败也无法再次进行消费
Redis队列 List
可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。
- >lpush queue:single 1
- "1"
- >lpush queue:single 2
- "2"
- >lpush queue:single 3
- "3"
- >rpop queue:single
- "1"
- >rpop queue:single
- "2"
- >rpop queue:single
- "3"
- package com.morris.redis.demo.queue.single;
-
- import redis.clients.jedis.Jedis;
-
- /**
- * 生产者
- */
- public class SingleProducer {
-
- public static final String SINGLE_QUEUE_NAME = "queue:single";
-
- public static void main(String[] args) {
- Jedis jedis = new Jedis();
- for (int i = 0; i < 100; i++) {
- jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i);
- }
- jedis.close();
- }
- }
- package com.morris.redis.demo.queue.single;
-
- import redis.clients.jedis.Jedis;
-
- import java.util.Objects;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 消费者
- */
- public class SingleConsumer {
-
- public static void main(String[] args) throws InterruptedException {
- Jedis jedis = new Jedis();
- while (true) {
- String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
- if(Objects.nonNull(message)) {
- System.out.println(message);
- } else {
- TimeUnit.MILLISECONDS.sleep(500);
- }
- }
- }
- }
上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:
redis队列提供了 “阻塞式” 拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。
如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知消费者立即处理新消息。
>brpop queue:single 30
使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL
- package com.cxh;
-
- import org.junit.jupiter.api.Test;
- import redis.clients.jedis.Jedis;
-
- import java.util.concurrent.TimeUnit;
-
- /**
- * 生产者类
- * 生产者每隔600ms生成一条消息
- * */
- class MessageProducer extends Thread{
- public static final String MESSAGE_KEY = "message:queue";
- private volatile int count;
-
- public void putMessage(String mess){
- Jedis jedis = new Jedis("127.0.0.1", 6379);
- /* jedis.auth("123456");*/
- Long size = jedis.lpush(MESSAGE_KEY, mess);
- System.out.println("Put " + Thread.currentThread().getName() + " put message " + count);
- count++;
- }
- @Override
- public synchronized void run() {
- for(int i = 0 ; i < 1; i++){
- putMessage("message" + count);
- try {
- Thread.sleep(600);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- package com.cxh.Component;
-
- import org.springframework.boot.CommandLineRunner;
- import org.springframework.stereotype.Component;
- import redis.clients.jedis.Jedis;
-
- import java.util.Arrays;
- import java.util.List;
-
- @Component
- public class component implements CommandLineRunner {
- public static final String MESSAGE_KEY = "message:queue";
- @Override
- public void run(String... args) throws Exception {
- /* //todo: 需要执行的方法
- System.out.println(Arrays.toString(args));*/
- System.out.println("comsumer 111");
- Jedis jedis = new Jedis("127.0.0.1", 6379);
- // String message = jedis.rpop(MESSAGE_KEY);
- // System.out.println("Pop " + Thread.currentThread().getName() + "comsumer message = " + message); //多线程的时候使用
- while (true){
- List<String> message = jedis.brpop(0, MESSAGE_KEY);
- System.out.println(message.toString());
- }
- }
-
- }
上面的代码已经实现了延迟队列的生产与消费,需要注意的是:
zset 会按 score 进行排序,如果 score 代表想要执行时间的时间戳。在某个时间将它插入zset集合中,它会按照时间戳大小进行排序,也就是对执行时间前后进行排序。
起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,可以达到延时执行的目的。
- 127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3
- (integer) 0
- 127.0.0.1:6379> zrange queue:delay 0 0 withscores
- 1) "order1"
- 2) "1"
- 127.0.0.1:6379> zrem queue:delay order1
- (integer) 1
- package com.morris.redis.demo.queue.delay;
-
- import redis.clients.jedis.Jedis;
-
- import java.util.Date;
- import java.util.Random;
-
- /**
- * 生产者
- */
- public class DelayProducer {
-
- public static final String DELAY_QUEUE_NAME = "queue:delay";
-
- public static void main(String[] args) {
- Jedis jedis = new Jedis();
- long now = new Date().getTime();
- Random random = new Random();
- for (int i = 0; i < 10; i++) {
- int second = random.nextInt(30); // 随机订单失效时间
- jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i);
- }
- jedis.close();
- }
- }
- package com.morris.redis.demo.queue.delay;
-
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.Tuple;
-
- import java.util.Date;
- import java.util.List;
- import java.util.Set;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 消费者
- */
- public class DelayConsumer {
-
- public static void main(String[] args) throws InterruptedException {
- Jedis jedis = new Jedis();
- while (true) {
- long now = new Date().getTime();
- Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
- if(tupleSet.isEmpty()) {
- TimeUnit.MILLISECONDS.sleep(500);
- } else {
- for (Tuple tuple : tupleSet) {
- Double score = tuple.getScore();
- long time = score.longValue();
- if(time < now) {
- jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
- System.out.println("order[" + tuple.getElement() +"] is timeout at " + time);
- } else {
- TimeUnit.MILLISECONDS.sleep(500);
- }
- break;
- }
- }
- }
- }
- }
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。