赞
踩
延时队列可以通过Redis的zset(有序列表)来实现。
将消息序列化成一个字符串作为zset的value
这个消息的到期处理时间作为score
然后用多个线程轮训zset获取到期的任务进行处理
多个线程处理是为了保障可用性,万一挂了一个线程还有其他线程可以处理。
package com.ryz2593.happy.study.redis; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import redis.clients.jedis.Jedis; import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; /** * @author ryz2593 */ public class RedisDelayingQueue<T> { static class TaskItem<T> { public String id; public T msg; } //fastjson序列化对象中存在generic类型时,需要使用TypeReference private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<T>(); //分配唯一的uuid task.id = UUID.randomUUID().toString(); task.msg = msg; //fastjson 序列化 String s = JSON.toJSONString(task); //塞入延时队列, 5s 后再试 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); } public void loop() { while (!Thread.interrupted()) { //只取一条 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { //歇会继续 Thread.sleep(500); } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); //抢到了 if (jedis.zrem(queueKey, s) > 0) { //fastjson反序列化 TaskItem<T> task = JSON.parseObject(s, TaskType); this.handleMsg(task.msg); } } } private void handleMsg(T msg) { System.out.println(msg); } public static void main(String[] args) { Jedis jedis = new Jedis("localhost"); final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo"); Thread producer = new Thread() { @Override public void run() { for (int i = 0; i < 10; i++) { queue.delay("codehole" + i); } } }; Thread consumer = new Thread() { @Override public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException e) { } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。