当前位置:   article > 正文

Redis之zset在异步队列上的应用_redis异步队列

redis异步队列

当遇到并发的客户端请求时,为了缓解服务端的处理压力,当请求对响应的处理的实时性要求不高时,可以实现一个异步的请求消息队列。

一种实现策略是使用redis的zset,将消息的到期处理时间作为score,然后用多个线程去轮训获取zset中的任务并进行处理。

需要提前考虑一个问题:

如何避免一个任务被多次处理?

一种解决方案是当多个线程获取到任务时,调用redis的zrem命令,将该任务从指定的zset中移除(利用了redis处理命令时是顺序执行的)。

环境

  • JDK17
  • 两个jar包
    • Jedis
    • fastjson2

代码

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import redis.clients.jedis.Jedis;

import java.lang.reflect.Type;
import java.util.List;
import java.util.UUID;

// 基于Redis实现的延迟队列
public class RedisDelayingQueue<T> {
    static class TaskItem<T> {
        public String id;
        public T msg;
    }
    // fastjson序列化对象时如果存在泛型,需要使用TypeReference
    private Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();

    private Jedis jedis;
    private String queueKey;

    public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    // 将任务添加到 zset 中
    // 分数是延时的时间
    public void delay(T msg) {
        TaskItem<T> task = new TaskItem<T>();
        task.id = UUID.randomUUID().toString();
        task.msg = msg;
        // 序列化任务
        String s = JSON.toJSONString(task);
        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s);
    }

    public void loop() {
        while(!Thread.interrupted()) {
            // 从zset中取出一个任务
            List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
            if(values.isEmpty()) {
                try {
                    Thread.sleep(500);
                } catch(InterruptedException e) {
                    break;
                }
                continue;
            }
            String s = values.iterator().next();
            if(jedis.zrem(queueKey, s) > 0) {
                TaskItem<T> task = JSON.parseObject(s, TaskType);
                this.handleMsg(task.msg);
            }
        }
    }

    public void handleMsg(T msg) {
        System.out.println(msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

优化

通过上面loop中代码,多个线程获取到values时,可能会被多个线程同时取到,然后再调用zrem命令去竞争的删除该值,所以会有很多无用的网络请求发送到redis。更容易想到的方案是将取值然后删除的操作变成原子性的,两种实现方案:

  • 通过对代码块进行加锁的方式
  • 利用redis中lua脚本的原子执行的特点

代码块加锁

这种方案不太好,如果两个命令之间发生了网络错误或者延迟,将造成其它线程的阻塞

    public void synchronizedLoop() {
        while(!Thread.interrupted()) {
            synchronized(this) {
                // 从zset中取出一个任务
                List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
                if(values.isEmpty()) {
                    try {
                        Thread.sleep(500);
                    } catch(InterruptedException e) {
                        break;
                    }
                    continue;
                }
                String s = values.iterator().next();
                if(jedis.zrem(queueKey, s) > 0) {
                    TaskItem<T> task = JSON.parseObject(s, TaskType);
                    this.handleMsg(task.msg);
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Lua脚本

local key = KEYS[1]
local task = redis.call('ZPOPMIN', key)
if task and next(task) != nil then
  redis.call('ZREM', key, task[1])
  return task[1]
else
  return nil
end
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

通过查阅文档发现,ZRANGEBYSCORE从6 版本开始已经过时了,所以这里使用ZPOPMIN来获取分数最小的value,可以达到相同的效果。

通过Jedis的eval函数,调用redis执行lua脚本的命令。

    public void luaLoop() {
        while(!Thread.interrupted()) {
            Object ans = jedis.eval(script, 1, queueKey);
            if(ans != null) {
                String task = (String) ans;
                TaskItem<T> taskItem = JSON.parseObject(task, TaskType);
                this.handleMsg(taskItem.msg);
            }else{
                try{
                    Thread.sleep(500);
                }catch(Exception e) {
                    break;
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

为什么可以优化

  • 使用lua脚本的方式,使得一个线程如果zset中有任务都会成功获取任务,而不会多个线程同时拿到同一个任务,再去竞争删除,减少了无效的网络IO

测试程序

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class Main {
    public static void main(String[] args) {
        JedisPool jedisPool = new JedisPool("url-of-redis", 6379, "username", "pass");
        Jedis jedis = jedisPool.getResource();

        RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");

        // 创建一个线程充当生产者,并向redis中存10个异步任务
        Thread producer = new Thread() {
            public void run() {
                for (int i = 0; i < 10; i++) {
                    queue.delay("codehole" + i);
                }
            }
        };

        // 创建一个线程充当消费者,不断从redis中取任务并执行
        Thread consumer = new Thread() {
            public void run() {
                queue.luaLoop();
            }
        };

        producer.start();
        consumer.start();
        try {
            // 等待生产者线程执行结束
            producer.join();
            Thread.sleep(6000);
            consumer.interrupt();
            consumer.join();
        }catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

一些问题

这个问题是关于Jedis的问题,因为我通过上面的方式发起redis请求实际上是存在并发问题的,如果将上述代码中的延时去掉,这个问题发生的概率将大大发生,主要是因为Jedis不是线程安全的,换句话说,通过JedisPool获取redis连接的实例,并发访问是是通过同一个socket发送数据的。

这里使用时,最好是每个线程都用有一个Jedis的实例,避免数据竞争问题.这里只是用了两个线程,所以简单手动使用两个redis实例,如果有多个消费者存在的情况下,还是每个线程单独持有一个Jedis才能解决问题。

    private Jedis readJedis;
    private Jedis writeJedis;
  • 1
  • 2

总结

本篇文章记录了使用zset实现一个简单异步队列的过程,然后对于第一次实现存在的一个问题,使用lua或者锁的方式优化网络IO。使用锁的方式会降低程序的并发度,所以一般使用lua脚本的方式来实现。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/649526?site
推荐阅读
相关标签
  

闽ICP备14008679号