赞
踩
SpringCloud 大型系列课程正在制作中,欢迎大家关注与提意见。
程序员每天的CV 与 板砖,也要知其所以然,本系列课程可以帮助初学者学习 SpringBooot 项目开发 与 SpringCloud 微服务系列项目开发
本文章是系列文章 ,每节文章都有对应的代码,每节的源码都是在上一节的基础上配置而来,对应的视频讲解课程正在火速录制中。
RabbitMQ是一个开源的AMQP实现,AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
消息中间件主要用于组件之间的解耦和通讯。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性和安全。
RabbitMQ 服务器端用 Erlang 语言编写,支持多种客户端,如:Java、Python、Ruby、.NET、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。
常用于在分布式系统中存储转发消息,具有很高的易用性和可用性。
开发与使用 RabbitMQ ,电脑环境或者服务器上需要安装 RabbitMQ 服务。
在 SpringBoot 项目中的 pom.xml 中添加依赖如下
<!-- 消息队列-->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.0.4</version>
</dependency>
然后在 application.yml 中添加RabbitMQ配置信息,根据自己安装的RabbitMQ配置,如我这里的
spring:
rabbitmq:
host: localhost # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: guest # 用户名
password: guest # 密码
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
新建一个RabbitMQ配置类,并添加一个名为 “testQueue1” 队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue testQueue1() {
return new Queue("testQueue1");
}
}
编写一个消息发布者,并编写一个发送方法,通过AmqpTemplate往"testQueue1"发送消息。
import java.text.SimpleDateFormat; import java.util.Date; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @Slf4j public class RabbitProducer { @Autowired private AmqpTemplate rabbitTemplate; public void sendDemoQueue() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); log.info("[testQueue1] 发送基本消息: " + dateString); // 第一个参数为刚刚定义的队列名称 this.rabbitTemplate.convertAndSend("testQueue1", dateString); } }
编写一个消息消费者,通过@RabbitListener(queues = “testQueue1”)注解监听"testQueue1"队列,并用@RabbitHandler注解相关方法,这样在在队列收到消息之后,交友@RabbitHandler注解的方法进行处理。
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "testQueue1") public class TestQueueConsumer { /** * 消息消费 * @RabbitHandler 代表此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[testQueue1] 消费者接收到消息: " + msg); } }
编写一个测试请求接口
@Api(tags = "消息队列测试") @RestController() @RequestMapping("/test/rabbit") @Slf4j public class RabbitMqController { @Autowired private RabbitProducer rabbitProducer; @GetMapping(value = "/testQueue1") @ApiOperation(value = "testQueue1 队列 ") public Object testQueue1() { rabbitProducer.sendDemoQueue(); return "发送消息成功"; } }
启动项目 使用 postman 访问
http://localhost:8899/test/rabbit/testQueue1
idea 中的日志控制台中也可以查看到相应的信息
利用topic模式可以实现模糊匹配,在RabbitConfig中配置topic队列跟交换器
@Configuration public class RabbitConfig { @Bean public Queue topiocA() { return new Queue("testTopiocA"); } /** * 定义个topic交换器 * * @return */ @Bean TopicExchange topicExchange() { // 定义一个名为fanoutExchange的fanout交换器 return new TopicExchange("testTopicExchange"); } /** * 将定义的topicA队列与topicExchange交换机绑定 * * @return */ @Bean public Binding bindingTopicExchangeWithA() { return BindingBuilder .bind(topiocA()) .to(topicExchange()) .with("topic.msg");//routingKey 路由 } }
然后再定义一个 testTopiocB 消息队列 ,并绑定上述创建的 topicExchange 交换机。
@Bean public Queue topiocB() { return new Queue("testTopiocB"); } /** * 将定义的topicB队列与topicExchange交换机绑定 * * @return */ @Bean public Binding bindingTopicExchangeWithB() { return BindingBuilder .bind(topiocB()) .to(topicExchange()) .with("topic.#"); }
到这里相当于是 两个消息队列 testTopiocA 、testTopiocB,一个 testTopicExchange 交换机。
也就是说队列 testTopiocA 中能取到的 testTopiocB中也能取到 ,testTopiocB中可以取到的 ,testTopiocA中不一定能取到。
然后定义 TopicAConsumer 与 TopicBConsumer 消费者
@Component
@RabbitListener(queues = "testTopiocA")//消息队列名称
public class TopicAConsumer {
/**
* 消息消费
* @RabbitHandler 代表此方法为接受到消息后的处理方法
*/
@RabbitHandler
public void recieved(String msg) {
System.out.println("testTopiocA 接收到消息:" + msg);
}
}
@Component
@RabbitListener(queues = "testTopiocB")//消息队列名称
public class TopicBConsumer {
/**
* 消息消费
* @RabbitHandler 代表此方法为接受到消息后的处理方法
*/
@RabbitHandler
public void recieved(String msg) {
System.out.println("testTopiocB 接收到消息:" + msg);
}
}
测试发送消息,同时向 A 与 B 发送消息
public void sendTopicAB() {
Date date = new Date();
String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
dateString = "测试 Topic 消息 " + dateString;
System.out.println(dateString);
// 注意 第一个参数是我们交换机的名称 ,
// 第二个参数是routerKey topic.msg,
// 第三个是你要发送的消息
// 这条信息将会被 testTopiocA testTopiocB 接收
this.rabbitTemplate.convertAndSend("testTopicExchange", "topic.msg", dateString);
}
只向B中发送消息
public void sendTopicTopicB() {
Date date = new Date();
String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
dateString = "测试 Topic 消息 :" + dateString;
System.out.println(dateString);
// 注意 第一个参数是我们交换机的名称
// 第二个参数是routerKey ,
// 第三个是你要发送的消息
// 这条信息将会被 testTopiocB 队列接收
this.rabbitTemplate.convertAndSend("testTopicExchange", "topic.good.msg", dateString);
}
当然更多匹配规则大家可以自定义
Fanout其实就是广播模式,只要跟它绑定的队列都会通知并且接受到消息。
其核心实现就是 定义 FanoutExchange 交换器,然后将需要接收消息的队列与其绑定。
//=================== fanout广播模式 ==================== @Bean public Queue fanoutA() { return new Queue("fanout.a"); } @Bean public Queue fanoutB() { return new Queue("fanout.b"); } /** * 定义个fanout交换器 * * @return */ @Bean FanoutExchange fanoutExchange() { // 定义一个名为fanoutExchange的fanout交换器 return new FanoutExchange("fanoutExchange"); } /** * 将定义的fanoutA队列与fanoutExchange交换机绑定 * * @return */ @Bean public Binding bindingExchangeWithA() { return BindingBuilder.bind(fanoutA()).to(fanoutExchange()); } /** * 将定义的fanoutB队列与fanoutExchange交换机绑定 * * @return */ @Bean public Binding bindingExchangeWithB() { return BindingBuilder.bind(fanoutB()).to(fanoutExchange()); }
本文章是系列文章 ,每节文章都有对应的代码,每节的源码都是在上一节的基础上配置而来,对应的视频讲解课程正在火速录制中。
项目源码在这里 :https://gitee.com/android.long/spring-boot-study/tree/master/biglead-api-09-rabbitmq
有兴趣可以关注一下公众号:biglead
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。