当前位置:   article > 正文

SpringBoot整合RabbitMQ实现消息的消息消费_springboot rabbitmq 消费者

springboot rabbitmq 消费者

什么是RabbitMQ?

是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。

组成部分

  • Broker:消息服务进程包含Exchange和Queue两个部分
    • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
      • Direct Exchange(point-to-point):直接交换器,生产者将消息丢给它后,它从与自己绑定的那些Queue里选择一个,然后直接将消息丢过去,其他的Queue就接不到这个消息了。 
      • Fanout Exchange(multicast):广播交换机,生产者将消息发送给所有的Queue。
      • Topic Exchange(publish-subscribe): 主题交换器,生产者将消息丢给它后,它就给与自己绑定的那些个关心这个消息Queue全部发送一份消息。 也就是发布-订阅模式。
    • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
    • RoutingKey路由key:用来控制交换器如何将消息发送给绑定的队列。
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

工作原理

 

生产者发送消息:

  1. 生产者和Broker进程建立TCP连接
  2. 生产者和Broker进程建立通道
  3. 生产者通过Channel信道将消息发送给Broker,由Exchange将消息进行转发

  4. Exchange将消息转发给指定的Queue

消费者消费消息:

  1. 消费者和Broker进程建立TCP连接
  2. 消费者和Broker进程建立通道
  3. 消费者监听指定的Queue
  4. 当有消息到达Queue时Broker默认将消息推送给消费者

安装

配置RabbitMQ 的系统环境,安装erlang

配置环境变量

  1. ERLANG_HOME = "安装路径"
  2. path = “%ERLANG_HOME%/bin”

输入erl 检查是否安装成功如下图:

安装RabbitMQ

安装好以后,配置环境变量

  1. RABBITMQ_HOME = "安装路径"
  2. path = “%RABBITMQ_HOME%/sbin”

运行 cmd 命令 输入 rabbitmq-server 启动成功,如下图:

 

访问 http://localhost:15672

 

输入用户名和密码,默认都是guest

 

好了接下来我们开始整合!!!

案例代码

搭建一个SpringBoot项目选择好rabbitmq

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <parent>
  7. <groupId>com.SpringBoot</groupId>
  8. <artifactId>SpringBoot</artifactId>
  9. <version>0.0.1-SNAPSHOT</version>
  10. </parent>
  11. <groupId>com.demo</groupId>
  12. <artifactId>SpringBoot_RabbitMQ</artifactId>
  13. <properties>
  14. <maven.compiler.source>8</maven.compiler.source>
  15. <maven.compiler.target>8</maven.compiler.target>
  16. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  17. </properties>
  18. <dependencies>
  19. <dependency>
  20. <groupId>org.springframework.boot</groupId>
  21. <artifactId>spring-boot-starter-web</artifactId>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-amqp</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-test</artifactId>
  30. <scope>test</scope>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.projectlombok</groupId>
  34. <artifactId>lombok</artifactId>
  35. </dependency>
  36. </dependencies>
  37. </project>

配置 application.yml

  1. server:
  2. port: 8080
  3. spring:
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. username: guest
  8. password: guest
  9. virtual-host: /

创建MQ配置类

  1. package com.demo.mq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @Program: SpringBoot
  10. * @ClassName RabbitMQConfig
  11. * @Author: liutao
  12. * @Description: 消息队列配置类
  13. * @Create: 2023-07-09 19:26
  14. * @Version 1.0
  15. **/
  16. @Configuration
  17. public class RabbitMQConfig {
  18. /**
  19. * 消息队列名称
  20. **/
  21. public static final String QUEEN_NAME = "queue";
  22. /**
  23. * 交换机名称
  24. **/
  25. public static final String EXCHANGE_NAME = "exchange";
  26. /**
  27. * 路由key
  28. **/
  29. public static final String ROUTING_KEY = "key";
  30. /**
  31. * @MethodName: queue
  32. * @description: 消息队列
  33. * @UpdateTime: 2023/7/9 20:20
  34. * @Return: org.springframework.amqp.core.Queue
  35. **/
  36. @Bean
  37. public Queue queue() {
  38. return new Queue(QUEEN_NAME);
  39. }
  40. /***
  41. * @MethodName: exchange
  42. * @description: 交换机
  43. * @UpdateTime: 2023/7/9 20:19
  44. * @Return: org.springframework.amqp.core.DirectExchange
  45. **/
  46. @Bean
  47. public DirectExchange exchange() {
  48. return new DirectExchange(EXCHANGE_NAME);
  49. }
  50. /**
  51. * @MethodName: binding
  52. * @description: 将消息队列 绑定到交换机
  53. * @UpdateTime: 2023/7/9 20:18
  54. * @Return: org.springframework.amqp.core.Binding
  55. **/
  56. @Bean
  57. public Binding binding() {
  58. return BindingBuilder
  59. .bind(queue())
  60. .to(exchange())
  61. .with(ROUTING_KEY);
  62. }
  63. }

创建生产者类

  1. package com.demo.mq;
  2. import com.demo.mq.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @Program: SpringBoot
  8. * @ClassName Producer
  9. * @Author: liutao
  10. * @Description: 消息生产者
  11. * @Create: 2023-07-09 19:51
  12. * @Version 1.0
  13. **/
  14. @Component
  15. public class Producer {
  16. @Autowired
  17. private RabbitTemplate rabbitTemplate;
  18. public void sendMessage(String message) {
  19. try {
  20. rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message);
  21. }catch (Exception e) {
  22. throw new RuntimeException("消息发布异常");
  23. }
  24. }
  25. }

创建消费者类

  1. package com.demo.mq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @Program: SpringBoot
  7. * @ClassName Consumer
  8. * @Author: liutao
  9. * @Description: 消息消费者
  10. * @Create: 2023-07-09 19:41
  11. * @Version 1.0
  12. **/
  13. @Slf4j
  14. @Component
  15. public class Consumer {
  16. @RabbitListener(queues = "queue")
  17. public void receiveMessage(String message) {
  18. // 处理接收到的消息
  19. log.info("Received message:" + message);
  20. }
  21. }

web接口

  1. package com.demo.mq.controller;
  2. import com.demo.mq.Producer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. /**
  7. * @Program: SpringBoot
  8. * @ClassName MqController
  9. * @Author: liutao
  10. * @Description: mq调用接口
  11. * @Create: 2023-07-09 19:58
  12. * @Version 1.0
  13. **/
  14. @RestController
  15. public class MqController {
  16. @Autowired
  17. private Producer producer;
  18. @GetMapping("/send")
  19. public String send(String message){
  20. producer.sendMessage(message);
  21. return "ok";
  22. }
  23. }

测试效果

启动服务 ,连接MQ成功

 

访问 http://localhost:8080/send?messgae="内容"

 

 

后台

一样我们也可以在mq管理板去测试 找到 Queues 点击queue 进去

 

 

后台

结尾

到这里我们今天的SpringBoot就学习完了!!see you!!

如果我的内容对你有用,欢迎点赞,关注 !!涛哥博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/386659
推荐阅读
相关标签
  

闽ICP备14008679号