当前位置:   article > 正文

springboot整合RabbitMQ,实现快速入门。_springboot实现rabbitmq入门教程

springboot实现rabbitmq入门教程

RabbitMQ的简单介绍:    

     首先,我们先了解一下RabbitMQ的含义。RabbitMQ是一个开源的消息中间件,它实现了高级消息队列协议(AMQP)用于进行异步通信。

这里简单的介绍一下异步通信:异步通信是一种通信模式,其中发送方和接收方的操作不是同步进行的。在异步通信中,发送方向接收方发送消息,然后继续执行其他操作,而不必等待接收方的响应。接收方在接收到消息后,可以处理消息,并在处理完成后向发送方发送响应。(举个例子:就好比你在微信上向一个好友发了一条消息,你的好友不一定要立即回复你,而是他有时间了再回复你,你也不是一直守着等他回复你。)

RabbitMQ基于消息队列的模式,通过将消息发送到队列中,然后由消费者从队列中取出并处理这些消息。主要的概念如下:

  1. Producer(生产者):负责发送消息到RabbitMQ的队列中。

  2. Consumer(消费者):从RabbitMQ的队列中接收并处理消息。

  3. Queue(队列):用于存储消息的缓冲区,生产者将消息发送到队列中,消费者从队列中接收消息。

  4. Exchange(交换机):接收来自生产者的消息,并根据路由规则将消息发送到一个或多个队列中。

  5. Binding(绑定):用于将交换机和队列进行绑定,指定消息的路由规则。

  6. Routing Key(路由键):用于将消息从交换机路由到特定的队列。

你要先对着这些概念有个印象,接下来让我们用编码的方式来进行实现。

环境及项目的搭建:

本次我们使用JDK17、spring boot3.0、RabbitMQ3.9.11

本次采用父子工程的形式,一个父项目下面有两个子模块。父项目是一个空的maven项目,子模块为:生产者模块(Producer)和消费者模块(Consumer)两个子模块都是spring boot项目。在两个子模块中引入RabbitMQ的依赖AMQP,

AMQP是一种通信协议(类似于HTTP请求协议)RabbitMQ遵从AMQP,而spring boot整合过AMQP,所以只要引入AMQP的依赖就可以实现RabbitMQ,两个子模块的pom.xml文件如下:

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
<!--        AMQP-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

接下来,我们使用docker来创建RabbitMQ容器(默认已经安装过docker了):

docker run -d --name my-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
 

运行这条命令就可以创建一个docker容器,解释一下我们会用到的两个端口,

5672:是RabbitMQ默认的AMQP协议端口,用于客户端应用程序与RabbitMQ服务之间的通信。(服务端)

15672:是RabbitMQ的Web管理界面的默认端口,用于通过浏览器访问RabbitMQ的管理界面(客户端)

使用http://虚拟机地址:15672  就可以访问到。第一次登录,用户名和密码,你可以任意填,但是填入之后就会保存,下一次登录,就需要用到这次填的用户名和密码。

这就是登录成功的页面,接下来,我们根据这个用户来进行一系列的操作。

代码的实现:

在我们两个子项目的配置文件中加入关于RabbitMQ的配置信息:

spring:
  rabbitmq:
    port: 5672 
    host: 192.168.231.110
    username: zhangqiao
    password: 123456
    virtual-host: /

host:你的虚拟机地址

post:RabbitMQ的服务端口号,一般为5672

username:用户名

password:密码

 virtual-host:虚拟主机号,类似于分组,用来隔离不同的使用场景,每个虚拟主机都是一个独立的消息代理

接下来我先解释一下前面提到的6个概念,

生产者:见名知意,就是生产消息的一方。对应的模块为Producer,作用是发送消息

消费者:接收消息的一方,对应的模块为Consumer,接收Producer发送的消息。

注意:生产者和消费者的身份是相对的,A要对B发消息,A是生产者,B是消费者。不能单指某模块为生产者或消费者,且这两者的身份因为业务的不同,随时可以发生调换。

Queue(队列):用于存储消息的缓冲区,生产者将消息发送到队列中,消费者从队列中接收消息。但是我们一般不这样做,这样太直接了,而且也不能在进行其他的操作。这个时候我们会引入一个中间量:Exchange(交换机)。

Exchange(交换机):接收来自生产者的消息,并根据路由规则将消息发送到一个或多个队列中。

Binding(绑定):用于将交换机和队列进行绑定,指定消息的路由规则。

Routing Key(路由键):用于将消息从交换机路由到特定的队列。(并不是交换机与队列绑定了,就一定会将消息发送给队列。绑定只是一个前提,还要满足路由键(某些指定的条件)才行)

现在,我想你已经了解了它们之间的关系,生产者将消息发送给交换机,交换机与指定的对列进行绑定,并通过路由键判定交换机是否发送消息给队列。消费者监听队列,就能获得生产者发送的消息。

生产者与交换机、交换机与队列、队列与消费者;它们之间都是多对多的关系。

一个生产者可以绑定多个交换机,一个交换机也可以被多个生产者绑定。交换机与队列、队列与消费者之间亦是如此。

常用的有三种类型的交换机:

Direct、Topic、Fanout交换机,下面我们会进行详细介绍;

接下来在idea中编写代码:

首先,我们编写生产者(Producer)模块代码:按照我们之前的逻辑,生产者是要往交换机发送消息的,那么我们的首先拥有交换机才行。在RabbitMQ中创建交换机一般有两种常用方式,接下来我们挨个实现。

1、使用bean的方式新建交换机和队列,并指定绑定关系

@Configuration
public class RabbitMQConfig {

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("zhang.direct01");
    }

    @Bean
    public Queue defaultQueue() {
        return new Queue("zhang.queue01");
    }

    @Bean
    public Binding defaultBinding(Queue defaultQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(defaultQueue).to(directExchange).with("red");
    }
}
 

创建一个Direct交换机zhang.direct01,创建一个消息队列zhang.queue01,将他们进行绑定,指定他们之间的路由键为red。

2、使用注解建立交换机和队列,并指定绑定关系

@RabbitListener(bindings = @QueueBinding(
        value = @Queue("zhang.queue01"),
        exchange = @Exchange(value = "zhang.direct01",type = "direct"),
        key = "red"
))

这个注解必须要依托在方法上,不能独自使用。

@Queue("zhang.queue01"):新建一个zhang.queue01队列。(如果已存在,不做任何处理)

@Exchange(value = "zhang.direct01",type = "direct"):新建一个zhang.direct01交换机,类型为direct(如果已存在,不做任何处理)

key:路由键red。 

使用注解创建的交换机和队列是绑定的。(推荐使用第二种注解方式,简单、直白)

@Autowired
private RabbitTemplate rabbitTemplate;

    @Test
    void DirectSendMessage() {
    rabbitTemplate.convertAndSend("zhang.direct01","","张三,你好啊");
 rabbitTemplate.convertAndSend("zhang.direct01","red","张三,你好啊。路由键为red");
    }

引入rabbitTemplate,使用convertAndSend方法就能发送消息到交换机,zhang.direct01是交换机的名称,第二个参数是路由键,(也可以不设置,不设置时表示每个与交换机绑定的队列都能接收到消息)、第三个参数就是要发送的消息。

现在,我们已经创建了交换机zhang.direct01,创建了消息队列zhang.queue01,并将它们进行了绑定,指定他们的路由键为red。并且向zhang.direct01发送了两条消息,一条没有指定路由键,一条指定路由键为red。(没有指定路由键时,那么所有与之绑定的队列都能接受到消息)

现在,我们就可以在消费者模块(Consumer)编写代码,来监听队列了。

//    监听普通的消息
    @RabbitListener(queues = {"zhang.queue01"})
    public void  listenerQueue(String msg){
        System.out.println("消费者监听了队列zhang.queue01,接收到消息:"+msg);
    }

我们在一个bean中编写监听代码,使用@RabbitListener注解,并指定监听的消息队列为zhang.queue01,接收一个String类型的参数msg,那么队列中缓存的消息就会映射到msg中,在控制台进行输出,(这个参数类型要与生产者发送到交换机的消息类型一致,不然会报错)将这个模块运行起来就能一直监听zhang.queue01这个消息队列了。

如图所示,我们监听到了,zhang.queue01,这个对类中的所有消息。

现在,我们已经能发送和接收消息了,接下来,我们重点关注一下交换机和 路由键。

1、Direct Exchange(直连交换机): Direct Exchange 是最简单的交换机类型。它根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。如果消息的路由键与某个队列绑定的路由键完全相同,那么消息将被发送到该队列。

2、Fanout Exchange(扇形交换机): Fanout Exchange 将消息广播到与之绑定的所有队列。它忽略消息的路由键,只需将消息发送到与之绑定的所有队列即可。

3、Topic Exchange(主题交换机): Topic Exchange 根据消息的路由键模式进行匹配和分发。它使用带有通配符的路由键进行匹配,支持两种通配符符号:*(匹配一个单词)和#(匹配零个或多个单词)。例如,路由键 "stock.usd.nyse" 可以匹配绑定了 "stock..*" 或者 "stock.#" 的队列。

这三种路由键是我们最常用到的,我们之前建立的就是Direct类型的交换机,接下来我们建立第二种交换机Fanout,并绑定队列

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "zhang.queue02",durable = "true"),
        exchange = @Exchange(value = "zhang.fanout",type = ExchangeTypes.FANOUT)
))
@Test
    void FanoutSendMessage() {
    rabbitTemplate.convertAndSend("zhang.fanout","","张三,你好啊,广播模式");
    }

Fanout忽略路由键,所以我们创建时,可以不写路由键。

在消费者模块监听zhang.queue02队列

//监听广播消息
    @RabbitListener(queues = {"zhang.queue02"})
public void  listenerBroadcast(String msg){
        System.out.println("消费者监听了队列zhang.queue02,接收到消息:"+msg);
    }

重新启动模块,就能监听到zhang.queue02的消息:

建立Topic 交换机,它与Direct交换机不同的是,Topic的路由键可以使用通配符,

  1. *(星号):匹配一个单词。例如,绑定键为 "stock.*" 的队列将匹配到路由键为 "stock.apple""stock.microsoft"等。

  2. #(井号):匹配零个或多个单词。例如,绑定键为 "stock.#" 的队列将匹配到路由键为 "stock""stock.apple""stock.apple.us"等。

接下来,我们来演示一下:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "zhang.queue03"),
        exchange = @Exchange(value = "zhang.topic01",type = ExchangeTypes.TOPIC),
        key = {"china.*","#.queue","mm.nn"}
))
@Test
    void TopicSendMessage() {
        rabbitTemplate.convertAndSend("zhang.topic01","mm.nn","张三,你好啊,mm.nn");
        rabbitTemplate.convertAndSend("zhang.topic01","mm.queue","张三,你好啊,mm.queue");
        rabbitTemplate.convertAndSend("zhang.topic01","china.nn","张三,你好啊,china.nn");
        rabbitTemplate.convertAndSend("zhang.topic01","mm.china","张三,你好啊,mm.china");
        rabbitTemplate.convertAndSend("zhang.topic01","queue.nn","张三,你好啊,queue.nn");
    }

我们创建了zhang.queue03队列,绑定了zhang.topic01,并制定了三个路由键。使用了通配符的形式。然后发送了五条消息,第四、五条消息不符合规则。因此不能接受到。

到这里,我们整合RabbitMQ实现快速入门就完成了。然后补充一点知识:

      如果发送的消息为对象类型时,由于Rabbit默认使用了JDK的序列化方式(底层会判断你传入的是不是message类型,如果不是会使用消息类型转化器转为消息类型),就算你收发双方都使用了同一种类型。那么,消费者在接收时也会报错,这是由于接收双发使用的消息类型转化器不一致。

生产者发送的消息类型为Student

消费者接收的消息类型也为Student

运行时报错,原因是:发送方与接收方的消息类型转换器不统一:

我们有两种解决方案,

<一>:自定义消息类型转换器,使接收方与发送方的消息类型转换器一致

导入json的依赖,MessageConverter 的类型为:

@Configuration
public class MessageConvent {
@Bean
    public MessageConverter jsonMessageConvert() {
    return new Jackson2JsonMessageConverter();
    }
}

在收发双方同时加入自定义消息类型转换器就可以了。

<二>:在发送消息时,将对象转化为字符串,接收方依旧使用String接收。

第一种方式更加正规一些,但是第二种方式简单一些。根据自己的需求选择适用于自己的方式。

一般来说:在生产者中,进行交换机的创建、消息队列的创建、绑定关系,与发送消息(推荐使用@RabbitListener注解)

在消费者中,只进行监听队列的绑定。。。。。

我在使用@RabbitListener注解时遇到了一些问题,在这里我记录一下,来警示自己。

我在使用@RabbitListener创建交换机与队列时,有时候会报错误,显示找不到队列。我感到很惊奇,明明@RabbitListener注解就是用来创建交换机和队列的,怎么会显示找不到队列呢?我之后请教了学校的老师后才发现,原来使用@RabbitListener创建交换机与队列时,有一些前置的条件要满足。

@RabbitListener 注解会自动声明队列、交换机和绑定。但是,它只会在第一次启动应用程序时执行,以确保队列、交换机和绑定的声明只发生一次。如果在后续的运行中,队列已经存在或者配置发生了变化,@RabbitListener 注解将不会再次声明队列。

因此,如果你在代码中使用 @RabbitListener注解监听的队列名称,但是该队列在 RabbitMQ 中并不存在,那么在第一次运行时会自动创建该队列。但如果在后续的运行中,你手动删除了该队列,或者更改了队列的配置,再次启动应用程序时@RabbitListener注解不会再次创建该队列。

解决该问题的方法是,在应用程序启动之前,在控制台(15672)手动创建队列或者在配置文件中配置队列的声明。这样可以确保队列的存在,并且与 @RabbitListener 注解中指定的队列名称一致。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/495669
推荐阅读
相关标签
  

闽ICP备14008679号