当前位置:   article > 正文

Redis发布与订阅_redis 发布订阅实际案例

redis 发布订阅实际案例

什么是发布与订阅

: redis发布订阅是一种消息通信通信模式,由发送者(pub)发送消息,订阅者(sub)接收消息。

如下图client2、4、5就是订阅着,订阅了channel1的消息。

在这里插入图片描述

channel1要发送消息时,这几个订阅者都会实时收到消息。

在这里插入图片描述

发布订阅的方式有哪几种知道吗?

答: 有两种,一种基于频道的,还有一种是基于匹配的模式:

基于频道的订阅案例

指令如下所示,可以看到订阅者可以订阅多个频道

subscribe channel [channel ...]
  • 1

所以我们开启一个redis客户端,订阅一个channel:sport的频道

# 客户端1 订阅 channel:sport
127.0.0.1:6379> SUBSCRIBE channel:sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:sport"
3) (integer) 1


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

发布消息的指令格式为:

publish channel message
  • 1

此时,我们再开启一个redis客户端,发布一条消息到channel:sport的频道

# 另一个客户端发送消息
127.0.0.1:6379> PUBLISH channel:sport "this is why we play"
(integer) 1
127.0.0.1:6379>

  • 1
  • 2
  • 3
  • 4
  • 5

此时刚刚订阅消息的redis客户端就会实时的收到这条消息

127.0.0.1:6379> SUBSCRIBE channel:sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:sport"
3) (integer) 1

# 如下便是实时收到的消息内容
1) "message"
2) "channel:sport" #频道
3) "this is why we play" # 消息内容

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

基于匹配模式的发布与订阅示例

有时候我们会订阅多个频道,我们不可能每次都去手动增加订阅的频道,例如我们当前订阅频道有:c1、c2、c3、c4、c5、c6、c7、c8。将来还可能出现c9等情况。我们不可能实时去添加订阅的频道。
观察上面的频道我们发现频道都是以c开头,后续的数字不断变化,所以我们完全可以使用模式匹配来实现频道订阅。

模式订阅和取消的命令为

psubscribe pattern [pattern...]
punsubscribe [pattern [pattern ...]]
  • 1
  • 2

关于parttern常见的匹配符有

1. *:表示任意占位符,例如c*,可以匹配c、c1、c111
2. ?*:匹配一个及以上个占位符
3. ?:表示匹配一个占位符
  • 1
  • 2
  • 3

我们希望订阅c1-c9的频道基于模式匹配我们就能够做到这一点

我们首先开启一个客户端,使用模式匹配发起订阅

# 订阅匹配cxx相关的模式
PSUBSCRIBE c?*
  • 1
  • 2

然后我们在开启另一个客户端,发送消息到c1频道

127.0.0.1:6379> PUBLISH c1 "this is c1 message"
(integer) 1
127.0.0.1:6379>


  • 1
  • 2
  • 3
  • 4
  • 5

刚刚订阅的客户端就会收到消息

127.0.0.1:6379> PSUBSCRIBE c?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?*"
3) (integer) 1
1) "pmessage"
2) "c?*"
3) "c1"
4) "this is c1 message"

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

注意:当你订阅PSUBSCRIBE c?* c1订阅时,若另一个客户端发送消息到c1你会收到两条消息(原因会在后文源码解析时补充)。

如下便是PSUBSCRIBE c?* c1的收到PUBLISH c1 "this is c1 message"的消息内容

127.0.0.1:6379> PSUBSCRIBE c?* c1
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?*"
3) (integer) 1
1) "psubscribe"
2) "c1"
3) (integer) 2
1) "pmessage"
2) "c1"
3) "c1"
4) "this is c1 message"
1) "pmessage"
2) "c?*"
3) "c1"
4) "this is c1 message"

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

发布订阅的常见的使用场景和优缺点了解过嘛?

答: 使用场景如:聊天室、公告牌等需要实现消息解耦的场景都可以使用消息订阅发布,如下便是一个基于redis实现视频订阅的管理系统。

在这里插入图片描述

而优缺点也很明显:

缺点: 相较于KafkaRocketMQRedis的实现略显粗糙,无法实现消息堆积与回溯
优点: 实现简单,若不需要消息回溯等需求使用redis绰绰有余

redis是如何实现频道、模式发布与订阅的知道嘛?

答: 我们可以从源码的角度分析分析频道模式的发布与订阅:

频道发布订阅是基于字典这个数据结果,以频道作为key,用链表作为value存储所有订阅该频道的当订阅者,如下图,当channel1有消息时,程序就通过channel1订阅链,逐个给订阅者发送消息。

在这里插入图片描述

当如果客户端 client10086 执行命令 SUBSCRIBE channel1 channel2 channel3时,程序就会将这个订阅者挂到对应三个频道中。
同理,订阅者通过UNSUBSCRIBE 退订频道,程序也是通过pubsub_channels找到对应的链表将这个节点移除。

在这里插入图片描述

说完了频道的,我们再来说说redis匹配模式发布与订阅:

与频道订阅有所不同,匹配模式订阅数据结构如下所示,可以看到client123、client256订阅了tweet.shop.* ,订阅者的数据结构是一个记录客户端以及客户端订阅的模式频道的结构体,从源码我们可以看到这个结构体是这样的:

typedef struct pubsubPattern {
    redisClient *client; //指向订阅者
    robj *pattern;//指向匹配模式
} pubsubPattern;

  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

从上图我们也可以看到redisServer 就是记录一个个客户端匹配模式的结构体,从源码我们就可以看到,它包含了一个pubsub_patterns的指针用于记录每个模式订阅者

struct redisServer {
    // ...
    list *pubsub_patterns;
    // ...
};

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

例如client10086 订阅broadcast.live.*,程序就会在redisServer所指向的链表中挂一个新的pubsub_patterns ,如下图所示:

在这里插入图片描述

基于spring boot集成redis演示发布与订阅

需求

我们希望一个用户订阅channel:sport,一个用户订阅channel:stock,而另一个用户两个都订阅

实现

首先pom配置引入相关依赖

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

	</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

spring boot的application.yml配置redis配置

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:
    database: 0 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

接收redis消息行为接口

package com.example.demo;

import org.springframework.stereotype.Component;

/**
 * 发布消息接口类
 */
@Component
public interface RedisMsg {

     void receiveMessage(String message);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

三个订阅者

/**
 * 订阅所有channel:*的消息
 */
public class RedisSubPChannel implements RedisMsg {


    public void receiveMessage(String message) {
        System.out.println("RedisSubPChannel收到消息消息" + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
package com.example.demo;


/**
 * 订阅所有channel:sport最新消息
 */
public class RedisSubSport implements RedisMsg{


    public void receiveMessage(String message){

        System.out.println("RedisSubSport获取体育频道最新消息:"+message);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
package com.example.demo;

/**
 * 订阅所有channel:stock消息
 */
public class RedisSubStock implements RedisMsg {



    @Override
    public void receiveMessage(String message) {
        System.out.println("RedisSubStock获取channel:stock最新消息:"+message);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

redis配置类

@Configuration
@EnableCaching
public class RedisConfig{
    /**
     * Redis消息监听器容器
     * @param connectionFactory
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅三个频道
        container.addMessageListener(listenerAdapter(new RedisSubSport()),new PatternTopic("channel:sport"));
        container.addMessageListener(listenerAdapter(new RedisSubStock()),new PatternTopic("channel:stock"));
        container.addMessageListener(listenerAdapter(new RedisSubPChannel()),new PatternTopic("channel:*"));
        return container;
    }

    /**
     * 配置消息接收处理类
     * @param redisMsg  自定义消息接收类
     * @return
     */
    @Bean()
    @Scope("prototype")
    MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
        //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
        return new MessageListenerAdapter(redisMsg, "receiveMessage");//注意2个通道调用的方法都要为receiveMessage
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

消息发送者

//定时器
@EnableScheduling
@Component
public class SenderController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    //向redis消息队列index通道发布消息
    @Scheduled(fixedRate = 2000)
    public void sendMessage() {
        stringRedisTemplate.convertAndSend("channel:stock", "股票涨了"+Math.random()+"个百分点");
        stringRedisTemplate.convertAndSend("channel:sport",  "湖人队 No."+((int)(Math.random()*100)+1)+"选手得分");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

运行结果

RedisSubStock获取channel:stock最新消息:股票涨了0.46015187165884985个百分点
RedisSubPChannel收到消息消息股票涨了0.46015187165884985个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.40选手得分
RedisSubPChannel收到消息消息湖人队 No.40选手得分
RedisSubStock获取channel:stock最新消息:股票涨了0.5293839028647649个百分点
RedisSubPChannel收到消息消息股票涨了0.5293839028647649个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.26选手得分
RedisSubPChannel收到消息消息湖人队 No.26选手得分
RedisSubPChannel收到消息消息股票涨了0.6779431428770823个百分点
RedisSubStock获取channel:stock最新消息:股票涨了0.6779431428770823个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.35选手得分
RedisSubPChannel收到消息消息湖人队 No.35选手得分
RedisSubStock获取channel:stock最新消息:股票涨了0.40332150203704个百分点
RedisSubPChannel收到消息消息股票涨了0.40332150203704个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.14选手得分
RedisSubPChannel收到消息消息湖人队 No.14选手得分
RedisSubStock获取channel:stock最新消息:股票涨了0.8407461728769783个百分点
RedisSubPChannel收到消息消息股票涨了0.8407461728769783个百分点
RedisSubSport获取体育频道最新消息:湖人队 No.25选手得分
RedisSubPChannel收到消息消息湖人队 No.25选手得分
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

参考文献

Redis开发与运维

springboot入门–springboot集成redis实现消息发布订阅模式-双通道

Redis进阶 - 消息传递:发布订阅模式详解

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/472413
推荐阅读
相关标签
  

闽ICP备14008679号