当前位置:   article > 正文

Spring Boot集成RabbitMQ快速入门Demo

Spring Boot集成RabbitMQ快速入门Demo

1.什么是RabbitMQ?

RabbitMQ是一款使用Erlang语言开发的,基于AMQP协议的消息中间件,作为一款优秀的消息系统,RabbitMQ有高并发、可扩展等优势,并适用于大型系统中各个模块之间的通信。

RabbitMQ的特点为:

  • 持久化、传输确认、发布确认等功能保证消息可靠

  • 支持多种消息分发模式,处理更加灵活

  • 提供可视化管理界面,使用方便

  • 支持集群部署,保证服务高可用

2.RabbitMQ环境搭建

  1. version: '3'
  2. services:
  3. rabbitmq:
  4. image: registry.cn-hangzhou.aliyuncs.com/zhengqing/rabbitmq:3.7.8-management # 原镜像`rabbitmq:3.7.8-management` 【 注:该版本包含了web控制页面 】
  5. container_name: rabbitmq # 容器名为'rabbitmq'
  6. hostname: my-rabbit
  7. restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
  8. environment: # 设置环境变量,相当于docker run命令中的-e
  9. TZ: Asia/Shanghai
  10. LANG: en_US.UTF-8
  11. RABBITMQ_DEFAULT_VHOST: my_vhost # 主机名
  12. RABBITMQ_DEFAULT_USER: admin # 登录账号
  13. RABBITMQ_DEFAULT_PASS: admin # 登录密码
  14. volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
  15. - "./rabbitmq/data:/var/lib/rabbitmq"
  16. ports: # 映射端口
  17. - "5672:5672"
  18. - "15672:15672"

运行

docker-compose -f docker-compose-rabbitmq.yml -p rabbitmq up -d

web管理端:http://127.0.0.1:15672 登录账号密码:admin/admin

27a7e34328fda7e39066d77e99a7d4a8.png

3.代码工程

实验目的:实现通过rabbitmq发送和接收消息

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>springboot-demo</artifactId>
  7. <groupId>com.et</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>rabbitmq</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter-web</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-autoconfigure</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-test</artifactId>
  28. <scope>test</scope>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.springframework.boot</groupId>
  32. <artifactId>spring-boot-starter-amqp</artifactId>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-starter-amqp</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.projectlombok</groupId>
  40. <artifactId>lombok</artifactId>
  41. </dependency>
  42. </dependencies>
  43. </project>

application.properties

  1. server.port=8088
  2. #rabbitmq
  3. spring.rabbitmq.host=localhost
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=admin
  6. spring.rabbitmq.password=admin
  7. spring.rabbitmq.virtual-host=my_vahost

config

简单使用

  1. package com.et.rabbitmq.config;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitConfig {
  7. @Bean
  8. public Queue Queue() {
  9. return new Queue("hello");
  10. }
  11. }

topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列 首先对topic规则配置,这里使用两个队列来测试

  1. package com.et.rabbitmq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class TopicRabbitConfig {
  10. public final static String TOPIC_ONE = "topic.one";
  11. public final static String TOPIC_TWO = "topic.two";
  12. public final static String TOPIC_EXCHANGE = "topicExchange";
  13. @Bean
  14. public Queue queue_one(){
  15. return new Queue(TOPIC_ONE);
  16. }
  17. @Bean
  18. public Queue queue_two(){
  19. return new Queue(TOPIC_TWO);
  20. }
  21. @Bean
  22. TopicExchange exchange(){
  23. return new TopicExchange(TOPIC_EXCHANGE);
  24. }
  25. @Bean
  26. Binding bindingExchangeOne(Queue queue_one, TopicExchange exchange){
  27. return BindingBuilder.bind(queue_one).to(exchange).with("topic.one");
  28. }
  29. @Bean
  30. Binding bindingExchangeTwo(Queue queue_two, TopicExchange exchange){
  31. //# 表示零个或多个词
  32. //* 表示一个词
  33. return BindingBuilder.bind(queue_two).to(exchange).with("topic.#");
  34. }
  35. }

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

  1. package com.et.rabbitmq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class FanoutRabbitConfig {
  10. @Bean
  11. public Queue AMessage() {
  12. return new Queue("fanout.A");
  13. }
  14. @Bean
  15. public Queue BMessage() {
  16. return new Queue("fanout.B");
  17. }
  18. @Bean
  19. public Queue CMessage() {
  20. return new Queue("fanout.C");
  21. }
  22. @Bean
  23. FanoutExchange fanoutExchange() {
  24. return new FanoutExchange("fanoutExchange");
  25. }
  26. @Bean
  27. Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
  28. return BindingBuilder.bind(AMessage).to(fanoutExchange);
  29. }
  30. @Bean
  31. Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
  32. return BindingBuilder.bind(BMessage).to(fanoutExchange);
  33. }
  34. @Bean
  35. Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
  36. return BindingBuilder.bind(CMessage).to(fanoutExchange);
  37. }
  38. }

receiver

  1. package com.et.rabbitmq.receiver;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.messaging.handler.annotation.Payload;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.stereotype.Service;
  8. @Service
  9. @Slf4j
  10. public class HelloReceiver {
  11. @RabbitListener(queues = "hello")
  12. public void process(String hello) {
  13. System.out.println("Receiver : " + hello);
  14. }
  15. @RabbitListener(queues = {"topic.one"})
  16. public void receiveTopic1(@Payload String fileBody) {
  17. log.info("topic1:" + fileBody);
  18. }
  19. @RabbitListener(queues = {"topic.two"})
  20. public void receiveTopic2(@Payload String fileBody) {
  21. log.info("topic2:" + fileBody);
  22. }
  23. @RabbitListener(queues = {"fanout.A"})
  24. public void fanoutA(@Payload String fileBody) {
  25. log.info("fanoutA:" + fileBody);
  26. }
  27. @RabbitListener(queues = {"fanout.B"})
  28. public void fanoutB(@Payload String fileBody) {
  29. log.info("fanoutB:" + fileBody);
  30. }
  31. @RabbitListener(queues = {"fanout.C"})
  32. public void fanoutC(@Payload String fileBody) {
  33. log.info("fanoutC:" + fileBody);
  34. }
  35. }

sender

  1. package com.et.rabbitmq.sender;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Date;
  6. @Component
  7. public class HelloSender {
  8. @Autowired
  9. private AmqpTemplate rabbitTemplate;
  10. public void send() {
  11. String context = "hello " + new Date();
  12. System.out.println("Sender : " + context);
  13. this.rabbitTemplate.convertAndSend("hello", context);
  14. }
  15. }
  1. package com.et.rabbitmq.sender;
  2. import com.et.rabbitmq.config.TopicRabbitConfig;
  3. import org.springframework.amqp.core.AmqpTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class TopicSender {
  8. @Autowired
  9. private AmqpTemplate rabbitTemplate;
  10. //两个消息接受者都可以收到
  11. public void send_one() {
  12. String context = "Hi, I am message one";
  13. System.out.println("Sender : " + context);
  14. this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.one",context);
  15. }
  16. //只有TopicReceiverTwo都可以收到
  17. public void send_two() {
  18. String context = "Hi, I am message two";
  19. System.out.println("Sender : " + context);
  20. this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.two",context);
  21. }
  22. }

DemoApplication.java

  1. package com.et.quartz;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class DemoApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(DemoApplication.class, args);
  8. }
  9. }

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springboot-demo

4.测试

简单使用

  1. @Test
  2. public void hello() throws Exception {
  3. helloSender.send();
  4. Thread.sleep(50000);
  5. }

Topic Exchange

  1. @Test
  2. public void topicOne() throws Exception {
  3. topicSender.send_one();
  4. Thread.sleep(50000);
  5. }
  6. @Test
  7. public void topicTwo() throws Exception {
  8. topicSender.send_two();
  9. Thread.sleep(50000);
  10. }

Fanout Exchange

  1. @Test
  2. public void sendFanout() throws InterruptedException {
  3. String context = "hi, fanout msg ";
  4. System.out.println("Sender : " + context);
  5. this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
  6. Thread.sleep(50000);
  7. }

5.参考连接

  • https://www.rabbitmq.com/

  • https://spring.io/projects/spring-amqp

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/490515
推荐阅读
相关标签
  

闽ICP备14008679号