当前位置:   article > 正文

SpringBoot 整合 RabbitMQ 消息队列【SpringBoot系列11】_springboot使用rabbitmq消息队列

springboot使用rabbitmq消息队列

SpringCloud 大型系列课程正在制作中,欢迎大家关注与提意见。
程序员每天的CV 与 板砖,也要知其所以然,本系列课程可以帮助初学者学习 SpringBooot 项目开发 与 SpringCloud 微服务系列项目开发

1 项目准备
  1. 创建SpringBoot基础项目
  2. SpringBoot项目集成mybatis
  3. SpringBoot 集成 Druid 数据源【SpringBoot系列3】
  4. SpringBoot MyBatis 实现分页查询数据【SpringBoot系列4】
  5. SpringBoot MyBatis-Plus 集成 【SpringBoot系列5】
  6. SpringBoot mybatis-plus-generator 代码生成器 【SpringBoot系列6】
  7. SpringBoot MyBatis-Plus 分页查询 【SpringBoot系列7】
  8. SpringBoot 集成Redis缓存 以及实现基本的数据缓存【SpringBoot系列8】
  9. SpringBoot 整合 Spring Security 实现安全认证【SpringBoot系列9】
  10. SpringBoot Security认证 Redis缓存用户信息【SpringBoot系列10】本文章 基于这个项目来开发

本文章是系列文章 ,每节文章都有对应的代码,每节的源码都是在上一节的基础上配置而来,对应的视频讲解课程正在火速录制中。

RabbitMQ是一个开源的AMQP实现,AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

消息中间件主要用于组件之间的解耦和通讯。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性和安全。

RabbitMQ 服务器端用 Erlang 语言编写,支持多种客户端,如:Java、Python、Ruby、.NET、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。

常用于在分布式系统中存储转发消息,具有很高的易用性和可用性。

开发与使用 RabbitMQ ,电脑环境或者服务器上需要安装 RabbitMQ 服务。

在 SpringBoot 项目中的 pom.xml 中添加依赖如下

 <!-- 消息队列-->
 <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
     <version>3.0.4</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

然后在 application.yml 中添加RabbitMQ配置信息,根据自己安装的RabbitMQ配置,如我这里的

spring:
  
  rabbitmq:
    host: localhost # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: guest # 用户名
    password: guest # 密码
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

新建一个RabbitMQ配置类,并添加一个名为 “testQueue1” 队列

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    
    @Bean
    public Queue testQueue1() {
        return new Queue("testQueue1");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

编写一个消息发布者,并编写一个发送方法,通过AmqpTemplate往"testQueue1"发送消息。

import java.text.SimpleDateFormat;
import java.util.Date;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class RabbitProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendDemoQueue() {
        Date date = new Date();
        String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
        log.info("[testQueue1] 发送基本消息: " + dateString);
        // 第一个参数为刚刚定义的队列名称
        this.rabbitTemplate.convertAndSend("testQueue1", dateString);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

编写一个消息消费者,通过@RabbitListener(queues = “testQueue1”)注解监听"testQueue1"队列,并用@RabbitHandler注解相关方法,这样在在队列收到消息之后,交友@RabbitHandler注解的方法进行处理。

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = "testQueue1")
public class TestQueueConsumer {

    /**
     * 消息消费
     * @RabbitHandler 代表此方法为接受到消息后的处理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("[testQueue1] 消费者接收到消息: " + msg);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

编写一个测试请求接口

@Api(tags = "消息队列测试")
@RestController()
@RequestMapping("/test/rabbit")
@Slf4j
public class RabbitMqController {
    @Autowired
    private RabbitProducer rabbitProducer;

    @GetMapping(value = "/testQueue1")
    @ApiOperation(value = "testQueue1 队列 ")
    public Object testQueue1() {
        rabbitProducer.sendDemoQueue();
        return "发送消息成功";
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

启动项目 使用 postman 访问
http://localhost:8899/test/rabbit/testQueue1
在这里插入图片描述
idea 中的日志控制台中也可以查看到相应的信息
在这里插入图片描述

2 Topic主题模式

利用topic模式可以实现模糊匹配,在RabbitConfig中配置topic队列跟交换器

@Configuration
public class RabbitConfig {
    @Bean
    public Queue topiocA() {
        return new Queue("testTopiocA");
    }

    /**
     * 定义个topic交换器
     *
     * @return
     */
    @Bean
    TopicExchange topicExchange() {
        // 定义一个名为fanoutExchange的fanout交换器
        return new TopicExchange("testTopicExchange");
    }

    /**
     * 将定义的topicA队列与topicExchange交换机绑定
     *
     * @return
     */
    @Bean
    public Binding bindingTopicExchangeWithA() {
        return BindingBuilder
                .bind(topiocA())
                .to(topicExchange())
                .with("topic.msg");//routingKey 路由
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

然后再定义一个 testTopiocB 消息队列 ,并绑定上述创建的 topicExchange 交换机。

    @Bean
    public Queue topiocB() {
        return new Queue("testTopiocB");
    }
     /**
     * 将定义的topicB队列与topicExchange交换机绑定
     *
     * @return
     */
    @Bean
    public Binding bindingTopicExchangeWithB() {
        return BindingBuilder
                .bind(topiocB())
                .to(topicExchange())
                .with("topic.#");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

到这里相当于是 两个消息队列 testTopiocA 、testTopiocB,一个 testTopicExchange 交换机。

  • testTopiocA 队列会取交换机中 topic.msg 中精准匹配的消息
  • testTopiocB 队列会取交换机中 topic. 匹配的消息

也就是说队列 testTopiocA 中能取到的 testTopiocB中也能取到 ,testTopiocB中可以取到的 ,testTopiocA中不一定能取到。

然后定义 TopicAConsumer 与 TopicBConsumer 消费者

@Component
@RabbitListener(queues = "testTopiocA")//消息队列名称
public class TopicAConsumer {

    /**
     * 消息消费
     * @RabbitHandler 代表此方法为接受到消息后的处理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("testTopiocA 接收到消息:" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
@Component
@RabbitListener(queues = "testTopiocB")//消息队列名称
public class TopicBConsumer {

    /**
     * 消息消费
     * @RabbitHandler 代表此方法为接受到消息后的处理方法
     */
    @RabbitHandler
    public void recieved(String msg) {
        System.out.println("testTopiocB 接收到消息:" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

测试发送消息,同时向 A 与 B 发送消息

public void sendTopicAB() {
    Date date = new Date();
    String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
    dateString = "测试 Topic 消息 " + dateString;
    System.out.println(dateString);
    // 注意 第一个参数是我们交换机的名称 ,
    // 第二个参数是routerKey topic.msg,
    // 第三个是你要发送的消息
    // 这条信息将会被 testTopiocA  testTopiocB 接收
    this.rabbitTemplate.convertAndSend("testTopicExchange", "topic.msg", dateString);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在这里插入图片描述
只向B中发送消息

 public void sendTopicTopicB() {
     Date date = new Date();
     String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
     dateString = "测试 Topic 消息 :" + dateString;
     System.out.println(dateString);
     // 注意 第一个参数是我们交换机的名称
     // 第二个参数是routerKey ,
     // 第三个是你要发送的消息
     // 这条信息将会被 testTopiocB 队列接收
     this.rabbitTemplate.convertAndSend("testTopicExchange", "topic.good.msg", dateString);
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在这里插入图片描述
当然更多匹配规则大家可以自定义

3 Fanout广播模式

Fanout其实就是广播模式,只要跟它绑定的队列都会通知并且接受到消息。
其核心实现就是 定义 FanoutExchange 交换器,然后将需要接收消息的队列与其绑定。

    //=================== fanout广播模式  ====================

    @Bean
    public Queue fanoutA() {
        return new Queue("fanout.a");
    }

    @Bean
    public Queue fanoutB() {
        return new Queue("fanout.b");
    }

    /**
     * 定义个fanout交换器
     *
     * @return
     */
    @Bean
    FanoutExchange fanoutExchange() {
        // 定义一个名为fanoutExchange的fanout交换器
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 将定义的fanoutA队列与fanoutExchange交换机绑定
     *
     * @return
     */
    @Bean
    public Binding bindingExchangeWithA() {
        return BindingBuilder.bind(fanoutA()).to(fanoutExchange());
    }

    /**
     * 将定义的fanoutB队列与fanoutExchange交换机绑定
     *
     * @return
     */
    @Bean
    public Binding bindingExchangeWithB() {
        return BindingBuilder.bind(fanoutB()).to(fanoutExchange());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

本文章是系列文章 ,每节文章都有对应的代码,每节的源码都是在上一节的基础上配置而来,对应的视频讲解课程正在火速录制中。
项目源码在这里 :https://gitee.com/android.long/spring-boot-study/tree/master/biglead-api-09-rabbitmq
有兴趣可以关注一下公众号:biglead

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/556958
推荐阅读
相关标签
  

闽ICP备14008679号