赞
踩
当遇到并发的客户端请求时,为了缓解服务端的处理压力,当请求对响应的处理的实时性要求不高时,可以实现一个异步的请求消息队列。
一种实现策略是使用redis的zset,将消息的到期处理时间作为score,然后用多个线程去轮训获取zset中的任务并进行处理。
需要提前考虑一个问题:
如何避免一个任务被多次处理?
一种解决方案是当多个线程获取到任务时,调用redis的zrem命令,将该任务从指定的zset中移除(利用了redis处理命令时是顺序执行的)。
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; import redis.clients.jedis.Jedis; import java.lang.reflect.Type; import java.util.List; import java.util.UUID; // 基于Redis实现的延迟队列 public class RedisDelayingQueue<T> { static class TaskItem<T> { public String id; public T msg; } // fastjson序列化对象时如果存在泛型,需要使用TypeReference private Type TaskType = new TypeReference<TaskItem<T>>(){}.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } // 将任务添加到 zset 中 // 分数是延时的时间 public void delay(T msg) { TaskItem<T> task = new TaskItem<T>(); task.id = UUID.randomUUID().toString(); task.msg = msg; // 序列化任务 String s = JSON.toJSONString(task); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); } public void loop() { while(!Thread.interrupted()) { // 从zset中取出一个任务 List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if(values.isEmpty()) { try { Thread.sleep(500); } catch(InterruptedException e) { break; } continue; } String s = values.iterator().next(); if(jedis.zrem(queueKey, s) > 0) { TaskItem<T> task = JSON.parseObject(s, TaskType); this.handleMsg(task.msg); } } } public void handleMsg(T msg) { System.out.println(msg); } }
通过上面loop中代码,多个线程获取到values时,可能会被多个线程同时取到,然后再调用zrem命令去竞争的删除该值,所以会有很多无用的网络请求发送到redis。更容易想到的方案是将取值然后删除的操作变成原子性的,两种实现方案:
这种方案不太好,如果两个命令之间发生了网络错误或者延迟,将造成其它线程的阻塞
public void synchronizedLoop() { while(!Thread.interrupted()) { synchronized(this) { // 从zset中取出一个任务 List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if(values.isEmpty()) { try { Thread.sleep(500); } catch(InterruptedException e) { break; } continue; } String s = values.iterator().next(); if(jedis.zrem(queueKey, s) > 0) { TaskItem<T> task = JSON.parseObject(s, TaskType); this.handleMsg(task.msg); } } } }
local key = KEYS[1]
local task = redis.call('ZPOPMIN', key)
if task and next(task) != nil then
redis.call('ZREM', key, task[1])
return task[1]
else
return nil
end
通过查阅文档发现,ZRANGEBYSCORE从6 版本开始已经过时了,所以这里使用ZPOPMIN来获取分数最小的value,可以达到相同的效果。
通过Jedis的eval函数,调用redis执行lua脚本的命令。
public void luaLoop() { while(!Thread.interrupted()) { Object ans = jedis.eval(script, 1, queueKey); if(ans != null) { String task = (String) ans; TaskItem<T> taskItem = JSON.parseObject(task, TaskType); this.handleMsg(taskItem.msg); }else{ try{ Thread.sleep(500); }catch(Exception e) { break; } } } }
为什么可以优化:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class Main { public static void main(String[] args) { JedisPool jedisPool = new JedisPool("url-of-redis", 6379, "username", "pass"); Jedis jedis = jedisPool.getResource(); RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo"); // 创建一个线程充当生产者,并向redis中存10个异步任务 Thread producer = new Thread() { public void run() { for (int i = 0; i < 10; i++) { queue.delay("codehole" + i); } } }; // 创建一个线程充当消费者,不断从redis中取任务并执行 Thread consumer = new Thread() { public void run() { queue.luaLoop(); } }; producer.start(); consumer.start(); try { // 等待生产者线程执行结束 producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); }catch(InterruptedException e) { e.printStackTrace(); } } }
这个问题是关于Jedis的问题,因为我通过上面的方式发起redis请求实际上是存在并发问题的,如果将上述代码中的延时去掉,这个问题发生的概率将大大发生,主要是因为Jedis不是线程安全的,换句话说,通过JedisPool获取redis连接的实例,并发访问是是通过同一个socket发送数据的。
这里使用时,最好是每个线程都用有一个Jedis的实例,避免数据竞争问题.这里只是用了两个线程,所以简单手动使用两个redis实例,如果有多个消费者存在的情况下,还是每个线程单独持有一个Jedis才能解决问题。
private Jedis readJedis;
private Jedis writeJedis;
本篇文章记录了使用zset实现一个简单异步队列的过程,然后对于第一次实现存在的一个问题,使用lua或者锁的方式优化网络IO。使用锁的方式会降低程序的并发度,所以一般使用lua脚本的方式来实现。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。