赞
踩
首先我们得知道,当一个用户订阅了某个频道,那么他就会一直在那监听该频道是否发出消息,是一种阻塞的状态,所以,我们把订阅功能用多线程来实现,给每一个订阅者都分配一个线程。
在Redis中发布订阅功能是由publish、subscribe、unsubscribe、psubscribe组成的。
publish是发布,subscribe是订阅,unsubscribe是取消订阅,psubscribe是订阅模式,所谓模式也就是可以对应多个频道的“频道”,例如订阅了[abc]hello模式,就会收到ahello频道、bhello频道、chello频道的消息。
订阅者可以订阅一个或多个频道,发送者可以给指定的频道发布消息
subscribe channel [channel …],订阅一个或多个频道,例如订阅cctv1这个频道
publish channel message,另一个客户端可以发布信息,发布成功之后,我们可以看到订阅者收到了信息
ctrl + c退出
如果订阅一个模式
127.0.0.1:6379> PSUBSCRIBE [abc]hello
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "[abc]hello"
3) (integer) 1
不管是发布ahello、bhello、chello,都会收到消息
127.0.0.1:6379> PUBLISH ahello "hello world"
(integer) 1
127.0.0.1:6379> PUBLISH bhello "hello world"
(integer) 1
127.0.0.1:6379> PUBLISH chello "hello world"
(integer) 1
下面是接收
127.0.0.1:6379> pSUBSCRIBE [abc]hello
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "[abc]hello"
3) (integer) 1
1) "pmessage"
2) "[abc]hello"
3) "ahello"
4) "hello world"
1) "pmessage"
2) "[abc]hello"
3) "bhello"
4) "hello world"
1) "pmessage"
2) "[abc]hello"
3) "chello"
4) "hello world"
发布消息
发布消息这个方法比较简单,我们只需要指定像哪个频道发布什么内容就行了,这里我把他们封装到了一个util类下,用静态方法,方便使用。
public static void publishMsg(String channel, String message) {
// Jedis jedis = getJedis();
Jedis jedis = new Jedis("127.0.0.1",6379);
try {
jedis.publish(channel, message);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
jedis.close();
}
}
订阅发布处理
这个类是用来处理订阅发布的,也就是当我们订阅了某个频道做什么处理,取消订阅某个频道做什么处理,收到了某条消息做什么处理。因为我把他们都封装到util类下了,所以这里用到内部类来实现。
public static JedisPubSub jedisPubSub = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println("收到了" + channel + "频道发来消息:" + message);
System.out.println(channel + ":" + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("订阅了" + channel + "频道");
System.out.println(channel + ":" + subscribedChannels);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("取消订阅了" + channel + "频道");
System.out.println(channel + ":" + subscribedChannels);
}
};
订阅频道
订阅频道,我们只需要指定订阅了哪个频道即可,这里需要注意的是,我们还需要指定,当做出与该频道相关的操作时(订阅、取消订阅、收到频道消息等)我们需要怎么处理,这也就用到了上面那个类,当然了,因为这是一个阻塞的线程,所以用多线程处理。
/**
* 接收消息。
* 该方法调用后,会一直执行下去。当有发布对应消息时,就会在jedisPubSub中接收到!
* @param channels
*/
public static void subscribeMsg(String channels) {
// Jedis jedis = getJedis();
Jedis jedis = new Jedis ("127.0.0.1", 6379);
new Thread(()->{
try {
jedis.subscribe(jedisPubSub, channels);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
// jedis.close();
}
}).start();
}
测试
为了保证发布在订阅之后,这里睡眠100毫秒
public static void main(String[] args) {
JedisUtil.subscribeMsg("test");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
JedisUtil.publishMsg("test","hello");
JedisUtil.publishMsg("test","hello aaa");
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。