当前位置:   article > 正文

java实现redis订阅发布_java redis 消息发布订阅模式

java redis 消息发布订阅模式

一:什么是订阅发布

什么是redis的发布订阅(pub/sub)?   Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。 

     同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,  Redis作为一个pub/sub的server, 在订阅者和发布者之间起到了消息路由的功能。

  简单来讲,这里面还有个channel的概念,这里就是频道的意思,比如你订阅了银行的频道,当你的资金发生变动时,银行就会通过它的频道给你发送信息,在这里,你是属于被动接收的,而不是向银行索要信息,这个例子中,你就是sub(订阅者),而银行就是pub(发布者)。

二、运用场景:

一直都认为会一样技术之前,都必须先明白这样一种技术在哪些地方会被用到,不能盲目的学东西。

  看到发布订阅的特性,用来做一个简单的实时聊天系统再适合不过了。这是其中之一,当然这样的东西,我们开发中很少涉及到。再举一个常用的,在我们的分布式架构中,常常会遇到读写分离的场景,在写入的过程中,就可以使用redis发布订阅,使得写入值及时发布到各个读的程序中,就保证数据的完整一致性。再比如,在一个博客网站中,有100个粉丝订阅了你,当你发布新文章,就可以推送消息给粉丝们拉。总之场景很多,需要去挖掘。

三、回顾java如何操作redis:

redis是一种缓存数据库,它也是C/S的结构,也就是客户端和服务端,一般来说,在java中,我们通常使用 jedis(客户端)去操作redis(服务端),这其中操作的时候,两者之间肯定要建立连接,就像数据库链接一样,在关系型数据库中,我们一般都维护一个连接池,以达到链接的复用,来省去建立连接和关闭连接的时间。所以在jedis中,同样也存在一个jedispool(jedis连接池)的概念,我们都是从池中去取连接使用。

四、代码实例:

需要导入的jar包:

1、redis的properties配置文件

  1. #***************** jedis configuration *********************
  2. #redis ip
  3. redis.ip=127.0.0.1
  4. #redis port
  5. redis.port=6379
  6. #redis passWord
  7. redis.passWord=123456
  8. #over time setup
  9. redis.timeout=3000
  10. #************************ jedis pool configuration *******************
  11. #jedis Maximum number of active connections
  12. jedis.pool.maxActive=100
  13. #jedis Maximum number of free connections
  14. jedis.pool.maxIdle=50
  15. #When the jedis pool returns without a connection object, it waits for the maximum time, in milliseconds, for a connection to be available.
  16. #If the wait time is exceeded, a JedisConnectionException is thrown directly
  17. jedis.pool.maxWait=1500
  18. # When retrieving a connection from the pool, check whether it is valid
  19. jedis.pool.testOnBorrow=true
  20. # Check for validity when returning the connection
  21. jedis.pool.testOnReturn=true

2、读取配置文件工具类

  1. package com.li.util;
  2. import java.io.IOException;
  3. import java.util.Properties;
  4. public class DataSourceUtil {
  5. private static Properties properties = new Properties();
  6. static {
  7. try {
  8. //加载properties配置文件
  9. properties.load(DataSourceUtil.class.getResourceAsStream("/redis.properties"));
  10. } catch (IOException e) {
  11. // TODO Auto-generated catch block
  12. System.out.println("properties文件加载报错");
  13. e.printStackTrace();
  14. }
  15. }
  16. public static String getKey(String key) {
  17. return properties.getProperty(key);
  18. }
  19. }

3、redis连接池 

  1. package com.li.util;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPool;
  4. import redis.clients.jedis.JedisPoolConfig;
  5. public class RedisPoolUtil {
  6. /**
  7. * Jedis连接池使用步骤如下:
  8. 1->获取Jedis实例需要从JedisPool中获取;
  9. 2->用完Jedis实例需要返还给JedisPool;
  10. 3->如果Jedis在使用过程中出错,则也需要还给JedisPool;
  11. */
  12. private static JedisPool jedisPool = null;
  13. //不允许通过new创建该类的实例
  14. private RedisPoolUtil() {
  15. }
  16. //初始化redis连接池
  17. public static void initalPool() {
  18. //创建JedisPool连接池实例
  19. JedisPoolConfig config = new JedisPoolConfig();
  20. //设置连接池各项配置
  21. //1、可用连接实例的最大数目,如果赋值为-1,表示不限制.
  22. config.setMaxTotal(Integer.parseInt(DataSourceUtil.getKey("jedis.pool.maxActive")));
  23. //2、控制一个Pool最多有多少个状态为idle(空闲的)jedis实例,默认值也是8
  24. config.setMaxIdle(Integer.parseInt(DataSourceUtil.getKey("jedis.pool.maxIdle")));
  25. //3、等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时/如果超过等待时间,则直接抛出异常
  26. config.setMaxWaitMillis(Long.valueOf(DataSourceUtil.getKey("jedis.pool.maxWait")));
  27. //4、从池中获取连接的时候,是否进行有效检查
  28. config.setTestOnBorrow(Boolean.valueOf(DataSourceUtil.getKey("jedis.pool.testOnBorrow")));
  29. //5、归还连接的时候,是否进行有效检查
  30. config.setTestOnReturn(Boolean.valueOf(DataSourceUtil.getKey("jedis.pool.testOnReturn")));
  31. String ip = DataSourceUtil.getKey("redis.ip");
  32. int port = Integer.valueOf(DataSourceUtil.getKey("redis.port"));
  33. int timeout = Integer.valueOf(DataSourceUtil.getKey("redis.timeout"));
  34. String password = DataSourceUtil.getKey("redis.passWord");
  35. //根据配置实例化jedis池
  36. jedisPool = new JedisPool(config,ip,port,timeout,password);
  37. }
  38. //获得连接
  39. public static Jedis getConnect() {
  40. Jedis jedis = null;
  41. try {
  42. if(jedisPool != null) {
  43. jedis = jedisPool.getResource();
  44. return jedis;
  45. }else {
  46. System.out.println("jedisPool连接池为null");
  47. return null;
  48. }
  49. } catch (Exception e) {
  50. // TODO: handle exception
  51. System.out.println(e);
  52. return null;
  53. }
  54. }
  55. //归还连接
  56. public static void closeConnect(Jedis jedis){
  57. if(jedis!=null){
  58. jedis.close();
  59. }
  60. }
  61. //关闭连接池
  62. public static void closePool() {
  63. if(jedisPool != null) {
  64. jedisPool.close();
  65. }
  66. }
  67. }

4、redis发布者

  1. package com.li.day1;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import redis.clients.jedis.Jedis;
  6. import redis.clients.jedis.JedisPool;
  7. /***
  8. * redis发布订阅---发布
  9. * @author Administrator
  10. *
  11. */
  12. public class Publisher extends Thread{
  13. private final Jedis jedis;
  14. private final String channel;
  15. public Publisher(Jedis jedis,String channel) {
  16. super();
  17. this.jedis = jedis;
  18. this.channel = channel;
  19. }
  20. @Override
  21. public void run() {
  22. // TODO Auto-generated method stub
  23. BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
  24. //一直监听控制台的输入,并发布到redis服务端
  25. while(true) {
  26. String message = null;
  27. try {
  28. //获取本地输入数据
  29. message = reader.readLine();
  30. System.out.println(channel+"="+message);
  31. //判断是否结束,是则关闭连接,否则发布数据
  32. if(!"quit".equals(message)) {
  33. jedis.publish(channel, message);
  34. }else {
  35. break;
  36. }
  37. } catch (IOException e) {
  38. // TODO Auto-generated catch block
  39. e.printStackTrace();
  40. }
  41. }
  42. }
  43. }

5、redis发布订阅消息监听器

  1. package com.li.day1;
  2. import redis.clients.jedis.JedisPubSub;
  3. /***
  4. * redis发布订阅---客户端订阅监听
  5. * redis发布订阅消息监听器
  6. * @author Administrator
  7. *
  8. */
  9. public class RedisMsgPubSubListener extends JedisPubSub{
  10. /*
  11. * 监听到订阅频道接受到消息时的回调
  12. */
  13. @Override
  14. public void onMessage(String channel, String message) {
  15. // TODO Auto-generated method stub
  16. System.out.println(" 监听到订阅频道接受到消息时的回调 :"+channel+" : "+message);
  17. super.onMessage(channel, message);
  18. }
  19. /*
  20. * 监听到订阅模式接受到消息时的回调
  21. */
  22. @Override
  23. public void onPMessage(String pattern, String channel, String message) {
  24. // TODO Auto-generated method stub
  25. System.out.println("监听到订阅模式接受到消息时的回调 : "+channel+" : "+message);
  26. super.onPMessage(pattern, channel, message);
  27. }
  28. /*
  29. * 订阅频道时的回调
  30. */
  31. @Override
  32. public void onPSubscribe(String pattern, int subscribedChannels) {
  33. // TODO Auto-generated method stub
  34. System.out.println("订阅频道时的回调 :"+pattern+" : "+subscribedChannels);
  35. super.onPSubscribe(pattern, subscribedChannels);
  36. }
  37. /*
  38. * 取消订阅频道时的回调
  39. */
  40. @Override
  41. public void onPUnsubscribe(String pattern, int subscribedChannels) {
  42. // TODO Auto-generated method stub
  43. System.out.println("取消订阅频道时的回调 :"+pattern+" : "+subscribedChannels);
  44. super.onPUnsubscribe(pattern, subscribedChannels);
  45. }
  46. /*
  47. * 订阅频道模式时的回调
  48. */
  49. @Override
  50. public void onSubscribe(String channel, int subscribedChannels) {
  51. // TODO Auto-generated method stub
  52. System.out.println("订阅模式时的回调: "+channel+":"+subscribedChannels);
  53. super.onSubscribe(channel, subscribedChannels);
  54. }
  55. /*
  56. * 取消订阅模式时的回调
  57. */
  58. @Override
  59. public void onUnsubscribe(String channel, int subscribedChannels) {
  60. // TODO Auto-generated method stub
  61. System.out.println(" 取消订阅模式时的回调: "+channel+":"+subscribedChannels);
  62. super.onUnsubscribe(channel, subscribedChannels);
  63. }
  64. }

6、redis订阅者

  1. package com.li.day1;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPool;
  4. /***
  5. * redis发布订阅---订阅
  6. * @author Administrator
  7. *
  8. */
  9. public class Subscriber extends Thread{
  10. private final Jedis jedis;
  11. private final String channel;
  12. public Subscriber(Jedis jedis, String channel) {
  13. super();
  14. this.jedis = jedis;
  15. this.channel = channel;
  16. }
  17. @Override
  18. public void run() {
  19. // TODO Auto-generated method stub
  20. try {
  21. //创建redis发布订阅消息监听器
  22. RedisMsgPubSubListener redisMsgPubSubListener = new RedisMsgPubSubListener();
  23. //通过监听器的api去订阅,入参是订阅者和频道名
  24. jedis.subscribe(redisMsgPubSubListener, channel);
  25. }catch (Exception e) {
  26. System.out.println("订阅消息报错:"+e);
  27. // TODO: handle exception
  28. } finally {
  29. // TODO: handle finally clause
  30. if(jedis != null) {
  31. jedis.close();
  32. }
  33. }
  34. }
  35. }

7、测试类

  1. package com.li.day1;
  2. import com.li.util.RedisPoolUtil;
  3. import redis.clients.jedis.Jedis;
  4. import redis.clients.jedis.JedisPool;
  5. import redis.clients.jedis.JedisPoolConfig;
  6. public class PubSubMain {
  7. public static void main(String[] args) {
  8. // TODO Auto-generated method stub
  9. //频道
  10. String channel = "myChannel";
  11. //初始化redisPool连接池
  12. RedisPoolUtil.initalPool();
  13. //订阅者
  14. Subscriber Subscriber = new Subscriber(RedisPoolUtil.getConnect(), channel);
  15. Subscriber.start();
  16. //发布者
  17. Publisher publisher = new Publisher(RedisPoolUtil.getConnect(),channel);
  18. publisher.start();
  19. }
  20. }

8、测试结果

参考文章:https://www.cnblogs.com/xinde123/p/8489054.html

https://www.liangzl.com/get-article-detail-124999.html

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/472425
推荐阅读
相关标签
  

闽ICP备14008679号