当前位置:   article > 正文

jedis的publish/subscribe_jedis pub 阻塞如何处理

jedis pub 阻塞如何处理
首先使用redis客户端来进行publish与subscribe的功能是否能够正常运行。

 打开redis服务器

 
 
  1. [root@localhost ~]# redis-server /opt/redis-2.4.10/redis.conf  
  2. [7719] 16 Apr 11:37:22 # Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now. 
  3. [7719] 16 Apr 11:37:22 * Server started, Redis version 2.4.10 
  4. [7719] 16 Apr 11:37:22 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect. 

 打开一个客户端订阅一个news.sports的channel。

 
 
  1. [root@localhost ~]# redis-cli 
  2. redis 127.0.0.1:6379> subscribe news.sports 
  3. Reading messages... (press Ctrl-C to quit) 
  4. 1) "subscribe" 
  5. 2) "news.sports" 
  6. 3) (integer) 1 

 可以看到已经开始了监听,向news.sports channel发布一条消息

 
 
  1. [root@localhost ~]# redis-cli             
  2. redis 127.0.0.1:6379> publish news.sports "kaka is back" 
  3. (integer) 1 

 订阅的客户端顺利收到消息

 
 
  1. redis 127.0.0.1:6379> subscribe news.sports 
  2. Reading messages... (press Ctrl-C to quit) 
  3. 1) "subscribe" 
  4. 2) "news.sports" 
  5. 3) (integer) 1 
  6. 1) "message" 
  7. 2) "news.sports" 
  8. 3) "kaka is back" 

 接下来使用jedis来进行发布/订阅的验证

 发布消息是通过jedis.publish(String channel, String message)来发布的,其实就是往redis服务器发布一条publish命令。

 
 
  1. public void publish(final byte[] channel, final byte[] message) { 
  2.        sendCommand(PUBLISH, channel, message); 
  3.    } 

 订阅消息是通过jedis.subscribe(JedisPub pub,String channel)来进行的,channel好理解,那么JedisPub是什么呢。

 看源码吧。

 Jedis订阅方法的源码为

 
 
  1. public void subscribe(JedisPubSub jedisPubSub, String... channels) { 
  2.       checkIsInMulti(); 
  3.       connect(); 
  4.       client.setTimeoutInfinite(); 
  5.       jedisPubSub.proceed(client, channels); 
  6.       client.rollbackTimeout(); 
  7.   } 

可以看到,主要是通过jedisPubSub.proceed(client, channels);来进行订阅的。看proceed方法。

 
 
  1. public void proceed(Client client, String... channels) { 
  2.        this.client = client; 
  3.        client.subscribe(channels); 
  4.        client.flush(); 
  5.        process(client); 
  6.    } 

追踪client.subscribe(channels)可以看到,

 
 
  1. public void subscribe(final byte[]... channels) { 
  2.        sendCommand(SUBSCRIBE, channels); 
  3.    } 

其只是向服务器发送了一个subcribe的命令而已。

那么要了解jedisPubSub的作用,只能看process方法了。简单看process其实是一个do...while循环

 
 
  1. private void process(Client client) { 
  2.        do { 
  3.            
  4.        } while (isSubscribed()); 
  5.    } 

我们可以猜测正是靠着这个循环来不断的读取服务器那边传到来的订阅的消息。

看主体

 
 
  1. List<Object> reply = client.getObjectMultiBulkReply(); 
  2.            final Object firstObj = reply.get(0); 
  3.            if (!(firstObj instanceof byte[])) { 
  4.                throw new JedisException("Unknown message type: " + firstObj); 
  5.            } 
  6.            final byte[] resp = (byte[]) firstObj; 
  7.            if (Arrays.equals(SUBSCRIBE.raw, resp)) { 
  8.                subscribedChannels = ((Long) reply.get(2)).intValue(); 
  9.                final byte[] bchannel = (byte[]) reply.get(1); 
  10.                final String strchannel = (bchannel == null) ? null 
  11.                        : SafeEncoder.encode(bchannel); 
  12.                onSubscribe(strchannel, subscribedChannels); 
  13.            } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { 
  14.                subscribedChannels = ((Long) reply.get(2)).intValue(); 
  15.                final byte[] bchannel = (byte[]) reply.get(1); 
  16.                final String strchannel = (bchannel == null) ? null 
  17.                        : SafeEncoder.encode(bchannel); 
  18.                onUnsubscribe(strchannel, subscribedChannels); 
  19.            } else if (Arrays.equals(MESSAGE.raw, resp)) { 
  20.                final byte[] bchannel = (byte[]) reply.get(1); 
  21.                final byte[] bmesg = (byte[]) reply.get(2); 
  22.                final String strchannel = (bchannel == null) ? null 
  23.                        : SafeEncoder.encode(bchannel); 
  24.                final String strmesg = (bmesg == null) ? null : SafeEncoder 
  25.                        .encode(bmesg); 
  26.                onMessage(strchannel, strmesg); 
  27.            } else if (Arrays.equals(PMESSAGE.raw, resp)) { 
  28.                final byte[] bpattern = (byte[]) reply.get(1); 
  29.                final byte[] bchannel = (byte[]) reply.get(2); 
  30.                final byte[] bmesg = (byte[]) reply.get(3); 
  31.                final String strpattern = (bpattern == null) ? null 
  32.                        : SafeEncoder.encode(bpattern); 
  33.                final String strchannel = (bchannel == null) ? null 
  34.                        : SafeEncoder.encode(bchannel); 
  35.                final String strmesg = (bmesg == null) ? null : SafeEncoder 
  36.                        .encode(bmesg); 
  37.                onPMessage(strpattern, strchannel, strmesg); 
  38.            } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { 
  39.                subscribedChannels = ((Long) reply.get(2)).intValue(); 
  40.                final byte[] bpattern = (byte[]) reply.get(1); 
  41.                final String strpattern = (bpattern == null) ? null 
  42.                        : SafeEncoder.encode(bpattern); 
  43.                onPSubscribe(strpattern, subscribedChannels); 
  44.            } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { 
  45.                subscribedChannels = ((Long) reply.get(2)).intValue(); 
  46.                final byte[] bpattern = (byte[]) reply.get(1); 
  47.                final String strpattern = (bpattern == null) ? null 
  48.                        : SafeEncoder.encode(bpattern); 
  49.                onPUnsubscribe(strpattern, subscribedChannels); 
  50.            } else { 
  51.                throw new JedisException("Unknown message type: " + firstObj); 
  52.            } 

可以看到,通过client.getObjectMultiBulkReply()来得到返回来的消息。判断消息的类型来进行不同的操作。比如Arrays.equals(SUBSCRIBE.raw, resp)判断返回来的消息是订阅,subscribedChannels = ((Long) reply.get(2)).intValue();是取得消息,也是do...while判断循环的条件,也就是说这一次如果读到消息了,则进行下一次循环。那么onSubscribe(String channel, int subscribedChannels)究竟做了什么事,看开头

 
 
  1. public abstract void onMessage(String channel, String message); 
  2.  
  3.    public abstract void onPMessage(String pattern, String channel, 
  4.            String message); 
  5.  
  6.    public abstract void onSubscribe(String channel, int subscribedChannels); 
  7.  
  8.    public abstract void onUnsubscribe(String channel, int subscribedChannels); 
  9.  
  10.    public abstract void onPUnsubscribe(String pattern, int subscribedChannels); 
  11.  
  12.    public abstract void onPSubscribe(String pattern, int subscribedChannels); 

可以看到这是xetorthio留给我们的方法。onSubscrible是订阅时应该做些什么,onMessage就是有消息传来是做些什么,以此类推。

接下来可以写一个方法来发布和订阅消息了。

 
 
  1. package redis.client.jredis.tests; 
  2.  
  3. import java.util.Timer; 
  4. import java.util.TimerTask; 
  5.  
  6. import org.junit.Test; 
  7.  
  8. import redis.clients.jedis.Jedis; 
  9. import redis.clients.jedis.JedisPool; 
  10. import redis.clients.jedis.JedisPoolConfig; 
  11. import redis.clients.jedis.JedisPubSub; 
  12.  
  13. public class JedisTest extends JedisTestBase { 
  14.     JedisPool pool = null
  15.     /** 
  16.      * 测试发布验证 
  17.      */ 
  18.     @Test 
  19.     public void testPS(){ 
  20.         /** 
  21.         Jedis jedis = new Jedis("192.168.5.146",6379); 
  22.         jedis.set("name", "xiaoruoen"); 
  23.         jedis.publish("news.blog.title", "Hello,World"); 
  24.         //*/ 
  25.         final String host = "192.168.5.146"
  26.         JedisPoolConfig config = new JedisPoolConfig(); 
  27.         pool = new JedisPool(new JedisPoolConfig(),host); 
  28.         subscribe(new NewsListener(), "news.sports"); 
  29.         Timer timer = new Timer(); 
  30.         timer.schedule(new TimerTask() { 
  31.              
  32.             @Override 
  33.             public void run() { 
  34.                 // TODO Auto-generated method stub 
  35.                 publish("news.sports""{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}"); 
  36.             } 
  37.         }, 10003000); 
  38.          
  39.     } 
  40.      
  41.     public Jedis getResource(int dbnum){ 
  42.         Jedis jedis = pool.getResource(); 
  43.         jedis.select(dbnum); 
  44.         return jedis; 
  45.     } 
  46.      
  47.     /** 
  48.      *  
  49.      * @param channel 
  50.      * @param message ex:"{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}" 
  51.      */ 
  52.     public void publish(String channel,String message){ 
  53.         Jedis jedis = getResource(12); 
  54.         jedis.publish(channel, message); 
  55.         pool.returnResource(jedis); 
  56.     } 
  57.      
  58.     public void subscribe(JedisPubSub listener,String channel){ 
  59.         Jedis jedis = getResource(12); 
  60.         jedis.subscribe(listener, channel); 
  61.         pool.returnResource(jedis); 
  62.     } 
  63.  

 

 
 
  1. package redis.client.jredis.tests; 
  2.  
  3. import redis.clients.jedis.JedisPubSub; 
  4.  
  5. public class NewsListener extends JedisPubSub { 
  6.  
  7.     @Override 
  8.     public void onMessage(String channel, String message) { 
  9.         System.out.println("get message from"+channel+"   "+message); 
  10.     } 
  11.  
  12.     @Override 
  13.     public void onPMessage(String pattern, String channel, String message) { 
  14.         System.out.println("get message from"+channel+"   "+message); 
  15.     } 
  16.  
  17.     @Override 
  18.     public void onSubscribe(String channel, int subscribedChannels) { 
  19.         System.out.println("subscribe the channel:"+channel); 
  20.     } 
  21.  
  22.     @Override 
  23.     public void onUnsubscribe(String channel, int subscribedChannels) { 
  24.         System.out.println("get message from"+channel); 
  25.     } 
  26.  
  27.     @Override 
  28.     public void onPUnsubscribe(String pattern, int subscribedChannels) { 
  29.         System.out.println("get message from"+subscribedChannels); 
  30.     } 
  31.  
  32.     @Override 
  33.     public void onPSubscribe(String pattern, int subscribedChannels) { 
  34.         System.out.println("get message from"+subscribedChannels); 
  35.     } 
  36.  

发现只打印了一条数据subscribe the channel:news.sports

没按我们所期望的那样那所有发布的消息都打印出来。

到官网查看

https://github.com/xetorthio/jedis/wiki/AdvancedUsage

看到Note that subscribe is a blocking operation operation because it will poll Redis for responses on the thread that calls subscribe.可以看到subcribe是一个线程中的块操作。我猜测是在发布与接收的过程中,如果在同一线程里面进行操作,一边阻塞着流,另一边无法进行操作。于是将publish改写为另一线程启动。修改如下:

 
 
  1. public static void main(String[] args){ 
  2.         final String host = "192.168.5.146"
  3.         JedisPoolConfig config = new JedisPoolConfig(); 
  4.         pool = new JedisPool(new JedisPoolConfig(),host); 
  5.         Thread thread = new Thread(new Test().new PublishThread()); 
  6.         thread.start(); 
  7.         subscribe(new NewsListener(), "news.sports"); 
  8.        

 

 
 
  1. class PublishThread implements Runnable{ 
  2.         @Override 
  3.         public void run() { 
  4.             Timer timer = new Timer(); 
  5.             timer.schedule(new TimerTask() { 
  6.                  
  7.                 @Override 
  8.                 public void run() { 
  9.                     // TODO Auto-generated method stub 
  10.                     publish("news.sports", "{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}"); 
  11.                 } 
  12.             }, 1000, 3000); 
  13.         }  
  14.          
  15.     } 

最终发布订阅成功。

 
 
  1. Pool:redis.clients.jedis.JedisPool@18e2b22 
  2. subscribe the channel:news.sports 
  3. Pool:redis.clients.jedis.JedisPool@18e2b22 
  4. get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
  5. Pool:redis.clients.jedis.JedisPool@18e2b22 
  6. get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
  7. Pool:redis.clients.jedis.JedisPool@18e2b22 
  8. get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
  9. Pool:redis.clients.jedis.JedisPool@18e2b22 
  10. get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
  11. Pool:redis.clients.jedis.JedisPool@18e2b22 

 

本文出自 “若是人间” 博客,请务必保留此出处http://xiaoruoen.blog.51cto.com/4828946/835710

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/849463
推荐阅读
相关标签
  

闽ICP备14008679号