赞
踩
基于Redis的Redisson的分布式映射结构的RMap Java对象实现了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。与HashMap不同的是,RMap保持了元素的插入顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
除了同步接口外,还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。如果你想用Redis Map来保存你的POJO的话,可以考虑使用分布式实时对象(Live Object)服务。
在特定的场景下,映射缓存(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,可以使用Redisson提供的带有本地缓存功能的映射。
- RMap<String, SomeObject> map = redisson.getMap("anyMap");
- SomeObject prevObject = map.put("123", new SomeObject());
- SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
- SomeObject obj = map.remove("123");
-
- map.fastPut("321", new SomeObject());
- map.fastRemove("321");
-
- RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
- RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
-
- map.fastPutAsync("321", new SomeObject());
- map.fastRemoveAsync("321");
映射的字段锁的用法:
- RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
- MyKey k = new MyKey();
- RLock keyLock = map.getLock(k);
- keyLock.lock();
- try {
- MyValue v = map.get(k);
- // 其他业务逻辑
- } finally {
- keyLock.unlock();
- }
-
- RReadWriteLock rwLock = map.getReadWriteLock(k);
- rwLock.readLock().lock();
- try {
- MyValue v = map.get(k);
- // 其他业务逻辑
- } finally {
- keyLock.readLock().unlock();
- }
Redisson提供了一系列的映射类型的数据结构,这些结构按特性主要分为三大类:
以下列表是Redisson提供的所有映射的名称及其特性:
接口名称 | RedissonClient | 本地缓存功能 | 数据分片功能 | 元素淘汰功能 |
RMap | getMap() | No | No | No |
RMapCache | getMapCache() | No | No | Yes |
RLocalCachedMap | getLocalCachedMap() | Yes | No | No |
RLocalCachedMap | getLocalCachedMapCache() | Yes | No | Yes |
RClusteredMap | getClusteredMap() | No | Yes | No |
RClusteredMapCache | getClusteredMapCache() | No | Yes | Yes |
RClusteredLocal | getClusteredLocal | Yes | Yes | No |
RClusteredLocal | getClusteredLocal | Yes | Yes | Yes |
除此以外,Redisson还提供了Spring Cache和JCache的实现。
Redisson的分布式的RMapCache Java对象在基于RMap的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache是基于RMap实现的,使它同时继承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于这样的功能来实现的。
目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
- RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
- // 有效时间 ttl = 10分钟
- map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
- // 有效时间 ttl = 10分钟, 最长闲置时间 maxIdleTime = 10秒钟
- map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
-
- // 有效时间 = 3 秒钟
- map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
- // 有效时间 ttl = 40秒钟, 最长闲置时间 maxIdleTime = 10秒钟
- map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
在特定的场景下,映射(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,使用Redisson提供的带有本地缓存功能的分布式本地缓存映射RLocalCachedMapJava对象会是一个很好的选择。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。本地缓存功能充分的利用了JVM的自身内存空间,对部分常用的元素实行就地缓存,这样的设计让读取操作的性能较分布式映射相比提高最多 45倍 。以下配置参数可以用来创建这个实例:
- LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
- // 用于淘汰清除本地缓存内的元素
- // 共有以下几种选择:
- // LFU - 统计元素的使用频率,淘汰用得最少(最不常用)的。
- // LRU - 按元素使用时间排序比较,淘汰最早(最久远)的。
- // SOFT - 元素用Java的WeakReference来保存,缓存元素通过GC过程清除。
- // WEAK - 元素用Java的SoftReference来保存, 缓存元素通过GC过程清除。
- // NONE - 永不淘汰清除缓存元素。
- .evictionPolicy(EvictionPolicy.NONE)
- // 如果缓存容量值为0表示不限制本地缓存容量大小
- .cacheSize(1000)
- // 以下选项适用于断线原因造成了未收到本地缓存更新消息的情况。
- // 断线重连的策略有以下几种:
- // CLEAR - 如果断线一段时间以后则在重新建立连接以后清空本地缓存
- // LOAD - 在服务端保存一份10分钟的作废日志
- // 如果10分钟内重新建立连接,则按照作废日志内的记录清空本地缓存的元素
- // 如果断线时间超过了这个时间,则将清空本地缓存中所有的内容
- // NONE - 默认值。断线重连时不做处理。
- .reconnectionStrategy(ReconnectionStrategy.NONE)
- // 以下选项适用于不同本地缓存之间相互保持同步的情况
- // 缓存同步策略有以下几种:
- // INVALIDATE - 默认值。当本地缓存映射的某条元素发生变动时,同时驱逐所有相同本地缓存映射内的该元素
- // UPDATE - 当本地缓存映射的某条元素发生变动时,同时更新所有相同本地缓存映射内的该元素
- // NONE - 不做任何同步处理
- .syncStrategy(SyncStrategy.INVALIDATE)
- // 每个Map本地缓存里元素的有效时间,默认毫秒为单位
- .timeToLive(10000)
- // 或者
- .timeToLive(10, TimeUnit.SECONDS)
- // 每个Map本地缓存里元素的最长闲置时间,默认毫秒为单位
- .maxIdle(10000)
- // 或者
- .maxIdle(10, TimeUnit.SECONDS);
- RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);
-
- String prevObject = map.put("123", 1);
- String currentObject = map.putIfAbsent("323", 2);
- String obj = map.remove("123");
-
- // 在不需要旧值的情况下可以使用fast为前缀的类似方法
- map.fastPut("a", 1);
- map.fastPutIfAbsent("d", 32);
- map.fastRemove("b");
-
- RFuture<String> putAsyncFuture = map.putAsync("321");
- RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
-
- map.fastPutAsync("321", new SomeObject());
- map.fastRemoveAsync("321");
当不再使用Map本地缓存对象的时候应该手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
- RLocalCachedMap<String, Integer> map = ...
- map.destroy();
代码范例:
- public void loadData(String cacheName, Map<String, String> data) {
- RLocalCachedMap<String, String> clearMap = redisson.getLocalCachedMap(cacheName,
- LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.INVALIDATE));
- RLocalCachedMap<String, String> loadMap = redisson.getLocalCachedMap(cacheName,
- LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.NONE));
-
- loadMap.putAll(data);
- clearMap.clearLocalCache();
- }
数据分片功能(Sharding)
Map数据分片是Redis集群模式下的一个功能。Redisson提供的分布式集群映射RClusteredMap Java对象也是基于RMap实现的。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。在这里可以获取更多的内部信息。
- RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap");
-
- SomeObject prevObject = map.put("123", new SomeObject());
- SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
- SomeObject obj = map.remove("123");
-
- map.fastPut("321", new SomeObject());
- map.fastRemove("321");
Redisson供了将映射中的数据持久化到外部储存服务的功能。主要场景有一下几种:
通俗的讲,如果一个被请求的数据不存在于Redisson的映射中的时候,Redisson将通过预先配置好的MapLoader对象加载数据。
在遇到映射中某条数据被更改时,Redisson会首先通过预先配置好的MapWriter对象写入到外部储存系统,然后再更新Redis内的数据。
对映射的数据的更改会首先写入到Redis,然后再使用异步的方式,通过MapWriter对象写入到外部储存系统。在并发环境下可以通过writeBehindThreads参数来控制写入线程的数量,已达到对外部储存系统写入并发量的控制。
以上策略适用于所有实现了RMap、RMapCache、RLocalCachedMap和RLocalCachedMapCache接口的对象。
- MapOptions<K, V> options = MapOptions.<K, V>defaults()
- .writer(myWriter)
- .loader(myLoader);
-
- RMap<K, V> map = redisson.getMap("test", options);
- // 或
- RMapCache<K, V> map = redisson.getMapCache("test", options);
- // 或
- RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
- // 或
- RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
Redisson为所有实现了RMapCache或RLocalCachedMapCache接口的对象提供了监听以下事件的监听器:
事件 | 监听器 元素 添加 事件 | org.redisson.api.map.event.EntryCreatedListener
元素 过期 事件 | org.redisson.api.map.event.EntryExpiredListener
元素 删除 事件 | org.redisson.api.map.event.EntryRemovedListener
元素 更新 事件 | org.redisson.api.map.event.EntryUpdatedListener
使用范例:
- RMapCache<String, Integer> map = redisson.getMapCache("myMap");
- // 或
- RLocalCachedMapCache<String, Integer> map = redisson.getLocalCachedMapCache("myMap", options);
-
- int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
- @Override
- public void onUpdated(EntryEvent<Integer, Integer> event) {
- event.getKey(); // 字段名
- event.getValue() // 新值
- event.getOldValue() // 旧值
- // ...
- }
- });
-
- int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {
- @Override
- public void onCreated(EntryEvent<Integer, Integer> event) {
- event.getKey(); // 字段名
- event.getValue() // 值
- // ...
- }
- });
-
- int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {
- @Override
- public void onExpired(EntryEvent<Integer, Integer> event) {
- event.getKey(); // 字段名
- event.getValue() // 值
- // ...
- }
- });
-
- int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {
- @Override
- public void onRemoved(EntryEvent<Integer, Integer> event) {
- event.getKey(); // 字段名
- event.getValue() // 值
- // ...
- }
- });
-
- map.removeListener(updateListener);
- map.removeListener(createListener);
- map.removeListener(expireListener);
- map.removeListener(removeListener);
Redisson提供了基于Redis的以LRU为驱逐策略的分布式LRU有界映射对象。顾名思义,分布式LRU有界映射允许通过对其中元素按使用时间排序处理的方式,主动移除超过规定容量限制的元素。
- RMapCache<String, String> map = redisson.getMapCache("map");
- // 尝试将该映射的最大容量限制设定为10
- map.trySetMaxSize(10);
-
- // 将该映射的最大容量限制设定或更改为10
- map.setMaxSize(10);
-
- map.put("1", "2");
- map.put("3", "3", 1, TimeUnit.SECONDS);
基于Redis的Redisson的分布式RMultimap Java对象允许Map中的一个字段值包含多个元素。 字段总数受Redis限制,每个Multimap最多允许有4 294 967 295个不同字段。
基于Set的Multimap不允许一个字段值包含有重复的元素。
- RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
- map.put(new SimpleKey("0"), new SimpleValue("1"));
- map.put(new SimpleKey("0"), new SimpleValue("2"));
- map.put(new SimpleKey("3"), new SimpleValue("4"));
-
- Set<SimpleValue> allValues = map.get(new SimpleKey("0"));
-
- List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
- Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
-
- Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
基于List的Multimap在保持插入顺序的同时允许一个字段下包含重复的元素。
- RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
- map.put(new SimpleKey("0"), new SimpleValue("1"));
- map.put(new SimpleKey("0"), new SimpleValue("2"));
- map.put(new SimpleKey("0"), new SimpleValue("1"));
- map.put(new SimpleKey("3"), new SimpleValue("4"));
-
- List<SimpleValue> allValues = map.get(new SimpleKey("0"));
-
- Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
- List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
-
- List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
Multimap对象的淘汰机制是通过不同的接口来实现的。它们是RSetMultimapCache接口和RListMultimapCache接口,分别对应的是Set和List的Multimaps。
所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
RSetMultimapCache的使用范例:
- RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
- multimap.put("1", "a");
- multimap.put("1", "b");
- multimap.put("1", "c");
-
- multimap.put("2", "e");
- multimap.put("2", "f");
-
- multimap.expireKey("2", 10, TimeUnit.MINUTES);
基于Redis的Redisson的分布式Set结构的RSet Java对象实现了java.util.Set接口。通过元素的相互状态比较保证了每个元素的唯一性。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
- RSet<SomeObject> set = redisson.getSet("anySet");
- set.add(new SomeObject());
- set.remove(new SomeObject());
Redisson PRO版本中的Set对象还可以在Redis集群环境下支持单集合数据分片。
7.3.1. 集(Set)淘汰机制(Eviction)
基于Redis的Redisson的分布式RSetCache Java对象在基于RSet的前提下实现了针对单个元素的淘汰机制。由于RSetCache是基于RSet实现的,使它还集成了java.util.Set接口。
目前的Redis自身并不支持Set当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
- RSetCache<SomeObject> set = redisson.getSetCache("anySet");
- // ttl = 10 seconds
- set.add(new SomeObject(), 10, TimeUnit.SECONDS);
Set数据分片是Redis集群模式下的一个功能。Redisson提供的分布式RClusteredSet Java对象也是基于RSet实现的。在这里可以获取更多的信息。
- RClusteredSet<SomeObject> set = redisson.getClusteredSet("anySet");
- set.add(new SomeObject());
- set.remove(new SomeObject());
除了RClusteredSet以外,Redisson还提供了另一种集群模式下的分布式集(Set),它不仅提供了透明的数据分片功能,还为每个元素提供了淘汰机制。RClusteredSetCache 类分别同时提供了RClusteredSet 和RSetCache 这两个接口的实现。当然这些都是基于java.util.Set的接口实现上的。
该功能仅限于Redisson PRO版本。
基于Redis的Redisson的分布式RSortedSet Java对象实现了java.util.SortedSet接口。在保证元素唯一性的前提下,通过比较器(Comparator)接口实现了对元素的排序。
- RSortedSet<Integer> set = redisson.getSortedSet("anySet");
- set.trySetComparator(new MyComparator()); // 配置元素比较器
- set.add(3);
- set.add(1);
- set.add(2);
-
- set.removeAsync(0);
- set.addAsync(5);
基于Redis的Redisson的分布式RScoredSortedSet Java对象是一个可以按插入时指定的元素评分排序的集合。它同时还保证了元素的唯一性。
- RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple");
-
- set.add(0.13, new SomeObject(a, b));
- set.addAsync(0.251, new SomeObject(c, d));
- set.add(0.302, new SomeObject(g, d));
-
- set.pollFirst();
- set.pollLast();
-
- int index = set.rank(new SomeObject(g, d)); // 获取元素在集合中的位置
- Double score = set.getScore(new SomeObject(g, d)); // 获取元素的评分
基于Redis的Redisson的分布式RLexSortedSet Java对象在实现了java.util.Set<String>接口的同时,将其中的所有字符串元素按照字典顺序排列。它公式还保证了字符串元素的唯一性。
- RLexSortedSet set = redisson.getLexSortedSet("simple");
- set.add("d");
- set.addAsync("e");
- set.add("f");
-
- set.lexRangeTail("d", false);
- set.lexCountHead("e");
- set.lexRange("d", true, "z", false);
基于Redis的Redisson分布式列表(List)结构的RList Java对象在实现了java.util.List接口的同时,确保了元素插入时的顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
- RList<SomeObject> list = redisson.getList("anyList");
- list.add(new SomeObject());
- list.get(0);
- list.remove(new SomeObject());
基于Redis的Redisson分布式无界队列(Queue)结构的RQueue Java对象实现了java.util.Queue接口。尽管RQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
- RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
- queue.add(new SomeObject());
- SomeObject obj = queue.peek();
- SomeObject someObj = queue.poll();
基于Redis的Redisson分布式无界双端队列(Deque)结构的RDeque Java对象实现了java.util.Deque接口。尽管RDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
- RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
- queue.addFirst(new SomeObject());
- queue.addLast(new SomeObject());
- SomeObject obj = queue.removeFirst();
- SomeObject someObj = queue.removeLast();
基于Redis的Redisson分布式无界阻塞队列(Blocking Queue)结构的RBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。尽管RBlockingQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
- RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
- queue.offer(new SomeObject());
-
- SomeObject obj = queue.peek();
- SomeObject someObj = queue.poll();
- SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。
基于Redis的Redisson分布式有界阻塞队列(Bounded Blocking Queue)结构的RBoundedBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。队列的初始容量(边界)必须在使用前设定好。
- RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
- // 如果初始容量(边界)设定成功则返回`真(true)`,
- // 如果初始容量(边界)已近存在则返回`假(false)`。
- queue.trySetCapacity(2);
-
- queue.offer(new SomeObject(1));
- queue.offer(new SomeObject(2));
- // 此时容量已满,下面代码将会被阻塞,直到有空闲为止。
- queue.put(new SomeObject());
-
- SomeObject obj = queue.peek();
- SomeObject someObj = queue.poll();
- SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。
基于Redis的Redisson分布式无界阻塞双端队列(Blocking Deque)结构的RBlockingDeque Java对象实现了java.util.concurrent.BlockingDeque接口。尽管RBlockingDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
- RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
- deque.putFirst(1);
- deque.putLast(2);
- Integer firstValue = queue.takeFirst();
- Integer lastValue = queue.takeLast();
- Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
- Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。
基于Redis的Redisson分布式无界阻塞公平队列(Blocking Fair Queue)结构的RBlockingFairQueue Java对象在实现Redisson分布式无界阻塞队列(Blocking Queue)结构RBlockingQueue接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。
以分布式无界阻塞队列为基础,采用公平获取消息的机制,不仅保证了poll、pollFromAny、pollLastAndOfferFirstTo和take方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。
- RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
- queue.offer(new SomeObject());
-
- SomeObject obj = queue.peek();
- SomeObject someObj = queue.poll();
- SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
该功能仅限于Redisson PRO版本。
基于Redis的Redisson分布式无界阻塞公平双端队列(Blocking Fair Deque)结构的RBlockingFairDeque Java对象在实现Redisson分布式无界阻塞双端队列(Blocking Deque)结构RBlockingDeque接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。
以分布式无界阻塞双端队列为基础,采用公平获取消息的机制,不仅保证了poll、take、pollFirst、takeFirst、pollLast和takeLast方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。
- RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque");
- deque.offer(new SomeObject());
-
- SomeObject firstElement = queue.peekFirst();
- SomeObject firstElement = queue.pollFirst();
- SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES);
- SomeObject firstElement = queue.takeFirst();
-
- SomeObject lastElement = queue.peekLast();
- SomeObject lastElement = queue.pollLast();
- SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES);
- SomeObject lastElement = queue.takeLast();
该功能仅限于Redisson PRO版本。
基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。
- RQueue<String> distinationQueue = ...
- RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
- // 10秒钟以后将消息发送到指定队列
- delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
- // 一分钟以后将消息发送到指定队列
- delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。
- RDelayedQueue<String> delayedQueue = ...
- delayedQueue.destroy();
基于Redis的Redisson分布式优先队列(Priority Queue)Java对象实现了java.util.Queue的接口。可以通过比较器(Comparator)接口来对元素排序。
- RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
- queue.trySetComparator(new MyComparator()); // 指定对象比较器
- queue.add(3);
- queue.add(1);
- queue.add(2);
-
- queue.removeAsync(0);
- queue.addAsync(5);
-
- queue.poll();
基于Redis的Redisson分布式优先双端队列(Priority Deque)Java对象实现了java.util.Deque的接口。可以通过比较器(Comparator)接口来对元素排序。
- RPriorityDeque<Integer> queue = redisson.getPriorityDeque("anyQueue");
- queue.trySetComparator(new MyComparator()); // 指定对象比较器
- queue.addLast(3);
- queue.addFirst(1);
- queue.add(2);
-
- queue.removeAsync(0);
- queue.addAsync(5);
-
- queue.pollFirst();
- queue.pollLast();
基于Redis的分布式无界优先阻塞队列(Priority Blocking Queue)Java对象的结构与java.util.concurrent.PriorityBlockingQueue类似。可以通过比较器(Comparator)接口来对元素排序。PriorityBlockingQueue的最大容量是4 294 967 295个元素。
- RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
- queue.trySetComparator(new MyComparator()); // 指定对象比较器
- queue.add(3);
- queue.add(1);
- queue.add(2);
-
- queue.removeAsync(0);
- queue.addAsync(5);
-
- queue.take();
当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。
基于Redis的分布式无界优先阻塞双端队列(Priority Blocking Deque)Java对象实现了java.util.Deque的接口。addLast、 addFirst、push方法不能再这个对里使用。PriorityBlockingDeque的最大容量是4 294 967 295个元素。
- RPriorityBlockingDeque<Integer> queue = redisson.getPriorityBlockingDeque("anyQueue");
- queue.trySetComparator(new MyComparator()); // 指定对象比较器
- queue.add(2);
-
- queue.removeAsync(0);
- queue.addAsync(5);
-
- queue.pollFirst();
- queue.pollLast();
- queue.takeFirst();
- queue.takeLast();
当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。