赞
踩
什么是redis的发布订阅(pub/sub)? Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。
同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合, Redis作为一个pub/sub的server, 在订阅者和发布者之间起到了消息路由的功能。
简单来讲,这里面还有个channel的概念,这里就是频道的意思,比如你订阅了银行的频道,当你的资金发生变动时,银行就会通过它的频道给你发送信息,在这里,你是属于被动接收的,而不是向银行索要信息,这个例子中,你就是sub(订阅者),而银行就是pub(发布者)。
一直都认为会一样技术之前,都必须先明白这样一种技术在哪些地方会被用到,不能盲目的学东西。
看到发布订阅的特性,用来做一个简单的实时聊天系统再适合不过了。这是其中之一,当然这样的东西,我们开发中很少涉及到。再举一个常用的,在我们的分布式架构中,常常会遇到读写分离的场景,在写入的过程中,就可以使用redis发布订阅,使得写入值及时发布到各个读的程序中,就保证数据的完整一致性。再比如,在一个博客网站中,有100个粉丝订阅了你,当你发布新文章,就可以推送消息给粉丝们拉。总之场景很多,需要去挖掘。
redis是一种缓存数据库,它也是C/S的结构,也就是客户端和服务端,一般来说,在java中,我们通常使用 jedis(客户端)去操作redis(服务端),这其中操作的时候,两者之间肯定要建立连接,就像数据库链接一样,在关系型数据库中,我们一般都维护一个连接池,以达到链接的复用,来省去建立连接和关闭连接的时间。所以在jedis中,同样也存在一个jedispool(jedis连接池)的概念,我们都是从池中去取连接使用。
需要导入的jar包:
1、redis的properties配置文件
-
- #***************** jedis configuration *********************
- #redis ip
- redis.ip=127.0.0.1
- #redis port
- redis.port=6379
- #redis passWord
- redis.passWord=123456
- #over time setup
- redis.timeout=3000
- #************************ jedis pool configuration *******************
- #jedis Maximum number of active connections
- jedis.pool.maxActive=100
- #jedis Maximum number of free connections
- jedis.pool.maxIdle=50
- #When the jedis pool returns without a connection object, it waits for the maximum time, in milliseconds, for a connection to be available.
- #If the wait time is exceeded, a JedisConnectionException is thrown directly
- jedis.pool.maxWait=1500
- # When retrieving a connection from the pool, check whether it is valid
- jedis.pool.testOnBorrow=true
- # Check for validity when returning the connection
- jedis.pool.testOnReturn=true
2、读取配置文件工具类
- package com.li.util;
-
- import java.io.IOException;
- import java.util.Properties;
-
- public class DataSourceUtil {
-
- private static Properties properties = new Properties();
-
- static {
- try {
- //加载properties配置文件
- properties.load(DataSourceUtil.class.getResourceAsStream("/redis.properties"));
- } catch (IOException e) {
- // TODO Auto-generated catch block
- System.out.println("properties文件加载报错");
- e.printStackTrace();
- }
- }
- public static String getKey(String key) {
- return properties.getProperty(key);
- }
-
- }
3、redis连接池
-
- package com.li.util;
-
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- import redis.clients.jedis.JedisPoolConfig;
-
- public class RedisPoolUtil {
- /**
- * Jedis连接池使用步骤如下:
- 1->获取Jedis实例需要从JedisPool中获取;
- 2->用完Jedis实例需要返还给JedisPool;
- 3->如果Jedis在使用过程中出错,则也需要还给JedisPool;
- */
-
- private static JedisPool jedisPool = null;
-
- //不允许通过new创建该类的实例
- private RedisPoolUtil() {
-
- }
-
- //初始化redis连接池
- public static void initalPool() {
- //创建JedisPool连接池实例
- JedisPoolConfig config = new JedisPoolConfig();
- //设置连接池各项配置
- //1、可用连接实例的最大数目,如果赋值为-1,表示不限制.
- config.setMaxTotal(Integer.parseInt(DataSourceUtil.getKey("jedis.pool.maxActive")));
- //2、控制一个Pool最多有多少个状态为idle(空闲的)jedis实例,默认值也是8
- config.setMaxIdle(Integer.parseInt(DataSourceUtil.getKey("jedis.pool.maxIdle")));
- //3、等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时/如果超过等待时间,则直接抛出异常
- config.setMaxWaitMillis(Long.valueOf(DataSourceUtil.getKey("jedis.pool.maxWait")));
- //4、从池中获取连接的时候,是否进行有效检查
- config.setTestOnBorrow(Boolean.valueOf(DataSourceUtil.getKey("jedis.pool.testOnBorrow")));
- //5、归还连接的时候,是否进行有效检查
- config.setTestOnReturn(Boolean.valueOf(DataSourceUtil.getKey("jedis.pool.testOnReturn")));
-
- String ip = DataSourceUtil.getKey("redis.ip");
- int port = Integer.valueOf(DataSourceUtil.getKey("redis.port"));
- int timeout = Integer.valueOf(DataSourceUtil.getKey("redis.timeout"));
- String password = DataSourceUtil.getKey("redis.passWord");
- //根据配置实例化jedis池
- jedisPool = new JedisPool(config,ip,port,timeout,password);
-
- }
-
- //获得连接
- public static Jedis getConnect() {
- Jedis jedis = null;
- try {
- if(jedisPool != null) {
- jedis = jedisPool.getResource();
- return jedis;
- }else {
- System.out.println("jedisPool连接池为null");
- return null;
- }
- } catch (Exception e) {
- // TODO: handle exception
- System.out.println(e);
- return null;
- }
- }
- //归还连接
- public static void closeConnect(Jedis jedis){
- if(jedis!=null){
- jedis.close();
- }
- }
- //关闭连接池
- public static void closePool() {
- if(jedisPool != null) {
- jedisPool.close();
- }
- }
-
-
- }
-
-
4、redis发布者
- package com.li.day1;
-
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
-
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- /***
- * redis发布订阅---发布
- * @author Administrator
- *
- */
- public class Publisher extends Thread{
-
- private final Jedis jedis;
- private final String channel;
- public Publisher(Jedis jedis,String channel) {
- super();
- this.jedis = jedis;
- this.channel = channel;
- }
-
- @Override
- public void run() {
- // TODO Auto-generated method stub
- BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
- //一直监听控制台的输入,并发布到redis服务端
- while(true) {
- String message = null;
- try {
- //获取本地输入数据
- message = reader.readLine();
- System.out.println(channel+"="+message);
- //判断是否结束,是则关闭连接,否则发布数据
- if(!"quit".equals(message)) {
- jedis.publish(channel, message);
- }else {
- break;
- }
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
-
- }
-
-
- }
5、redis发布订阅消息监听器
- package com.li.day1;
-
- import redis.clients.jedis.JedisPubSub;
-
- /***
- * redis发布订阅---客户端订阅监听
- * redis发布订阅消息监听器
- * @author Administrator
- *
- */
- public class RedisMsgPubSubListener extends JedisPubSub{
- /*
- * 监听到订阅频道接受到消息时的回调
- */
- @Override
- public void onMessage(String channel, String message) {
- // TODO Auto-generated method stub
- System.out.println(" 监听到订阅频道接受到消息时的回调 :"+channel+" : "+message);
- super.onMessage(channel, message);
- }
- /*
- * 监听到订阅模式接受到消息时的回调
- */
- @Override
- public void onPMessage(String pattern, String channel, String message) {
- // TODO Auto-generated method stub
- System.out.println("监听到订阅模式接受到消息时的回调 : "+channel+" : "+message);
- super.onPMessage(pattern, channel, message);
- }
- /*
- * 订阅频道时的回调
- */
- @Override
- public void onPSubscribe(String pattern, int subscribedChannels) {
- // TODO Auto-generated method stub
- System.out.println("订阅频道时的回调 :"+pattern+" : "+subscribedChannels);
- super.onPSubscribe(pattern, subscribedChannels);
- }
- /*
- * 取消订阅频道时的回调
- */
- @Override
- public void onPUnsubscribe(String pattern, int subscribedChannels) {
- // TODO Auto-generated method stub
- System.out.println("取消订阅频道时的回调 :"+pattern+" : "+subscribedChannels);
- super.onPUnsubscribe(pattern, subscribedChannels);
- }
- /*
- * 订阅频道模式时的回调
- */
- @Override
- public void onSubscribe(String channel, int subscribedChannels) {
- // TODO Auto-generated method stub
- System.out.println("订阅模式时的回调: "+channel+":"+subscribedChannels);
- super.onSubscribe(channel, subscribedChannels);
- }
- /*
- * 取消订阅模式时的回调
- */
- @Override
- public void onUnsubscribe(String channel, int subscribedChannels) {
- // TODO Auto-generated method stub
- System.out.println(" 取消订阅模式时的回调: "+channel+":"+subscribedChannels);
- super.onUnsubscribe(channel, subscribedChannels);
- }
-
- }
6、redis订阅者
- package com.li.day1;
-
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- /***
- * redis发布订阅---订阅
- * @author Administrator
- *
- */
- public class Subscriber extends Thread{
- private final Jedis jedis;
- private final String channel;
-
-
- public Subscriber(Jedis jedis, String channel) {
- super();
- this.jedis = jedis;
- this.channel = channel;
- }
-
-
- @Override
- public void run() {
- // TODO Auto-generated method stub
- try {
- //创建redis发布订阅消息监听器
- RedisMsgPubSubListener redisMsgPubSubListener = new RedisMsgPubSubListener();
- //通过监听器的api去订阅,入参是订阅者和频道名
- jedis.subscribe(redisMsgPubSubListener, channel);
- }catch (Exception e) {
- System.out.println("订阅消息报错:"+e);
- // TODO: handle exception
- } finally {
- // TODO: handle finally clause
- if(jedis != null) {
- jedis.close();
- }
- }
-
- }
-
-
-
- }
7、测试类
- package com.li.day1;
-
- import com.li.util.RedisPoolUtil;
-
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- import redis.clients.jedis.JedisPoolConfig;
-
- public class PubSubMain {
-
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- //频道
- String channel = "myChannel";
- //初始化redisPool连接池
- RedisPoolUtil.initalPool();
- //订阅者
- Subscriber Subscriber = new Subscriber(RedisPoolUtil.getConnect(), channel);
- Subscriber.start();
- //发布者
- Publisher publisher = new Publisher(RedisPoolUtil.getConnect(),channel);
- publisher.start();
- }
-
- }
8、测试结果
参考文章:https://www.cnblogs.com/xinde123/p/8489054.html
https://www.liangzl.com/get-article-detail-124999.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。