赞
踩
二.RabbitMQ的工作原理
2.1 RabbitMQ的基本结构
2.2 组成部分说明
2.3 生产者发送消息流程
2.4 消费者接收消息流程
三.SpringBoot 整合实现RabbitMQ
3.1创建mq-rabbitmq-producer(生产者)发送消息
3.1.1pom.xml中添加相关的依赖
3.1.2 配置application.yml
3.1.3 配置RabbitMQ常量类
3.1.4 创建RabbitMQConfig配置类
3.1.5 创建生产者用于发送消息
3.1.6 创建一个类,用于模拟测试
3.2创建mq-rabbitmq-consumer(消费者)消费消息
3.2.1pom.xml中添加相关的依赖
3.2.2 配置application.yml
3.2.3 配置RabbitMQ常量类
3.2.4 创建RabbitMQConfig配置类
3.2.5 创建消费者消息监听
3.2.6 启动项目,监听器监听到生产者发送的消息,自动消费消息
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
1.生产者和Broker建立TCP连接。
2.生产者和Broker建立通道。
3.生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4.Exchange将消息转发到指定的Queue(队列)
1.消费者和Broker建立TCP连接
2.消费者和Broker建立通道
3.消费者监听指定的Queue(队列)
4.当有消息到达Queue时Broker默认将消息推送给消费者。
创建2个springboot项目,一个 mq-rabbitmq-producer(生产者),一个mq-rabbitmq-consumer(消费者)。
<!--添加AMQP的启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
server: port: 8080 spring: application: name: mq-rabbitmq-producer #rabbitmq配置 rabbitmq: host: localhost port: 5672 #注意:guest用户只能链接本地服务器 比如localhost 不可以连接远程服务器 username: guest password: guest #虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可 virtual-host: / #支持发布返回 publisher-returns: true listener: # Routing 路由模型(交换机类型:direct) direct: #消息确认:手动签收 acknowledge-mode: manual #当前监听容器数 concurrency: 1 #最大数 max-concurrency: 10 #是否支持重试 retry: enabled: true #重试次数5,超过5次抛出异常 max-attempts: 5 #重试间隔 3s max-interval: 3000
采用 Routing 路由模型(交换机类型:direct)方式,实现RabbitMQ消息队列。
配置直连交换机名称、消息队列名称、routingkey
package com.example.mqrabbitmqproducer.util.rabbitmq; /** * RabbitMQ RoutingKey 常量工具类 * @author qzz */ public class RabbitMQConstantUtil { /** * 交换机名称 */ public static final String DIRECT_EXCHANGE = "directExchange"; /** * 取消订单 队列名称 routingkey */ public static final String CANCEL_ORDER = "cancel-order"; /** * 自动确认订单 队列名称\routingkey */ public static final String CONFIRM_ORDER = "confirm-order"; }
注意:这里把消息队列名称和routingkey设置为同名。
rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
package com.example.mqrabbitmqproducer.util.rabbitmq.config; import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitmq配置类:配置Exchange、Queue、以及绑定交换机 * @author qzz */ @Configuration @EnableRabbit public class RabbitMQConfig { private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class); @Autowired private RabbitTemplate rabbitTemplate; @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); /** * 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式, * 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类 */ factory.setMessageConverter(new Jackson2JsonMessageConverter()); //开启手动ACK factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public AmqpTemplate amqpTemplate(){ rabbitTemplate.setEncoding("UTF-8"); rabbitTemplate.setMandatory(true); /** * ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行 * config : 需要开启rabbitmq发送失败回退 * yml配置publisher-returns: true * 或rabbitTemplate.setMandatory(true);设置为true */ rabbitTemplate.setReturnsCallback(returnedMessage -> { String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId(); byte[] message = returnedMessage.getMessage().getBody(); Integer replyCode = returnedMessage.getReplyCode(); String replyText = returnedMessage.getReplyText(); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}", new String(message),messageId,replyCode,replyText,exchange,routingKey); }); return rabbitTemplate; } /** * 声明直连交换机 支持持久化 * @return */ @Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE) public Exchange directExchange(){ return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build(); } /** * 取消订单 消息队列 * @return */ @Bean(RabbitMQConstantUtil.CANCEL_ORDER) public Queue cancelOrderQueue(){ return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true); } /** * 把取消订单消息队列绑定到交换机上 * @param queue * @param directExchange * @return */ @Bean public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue, @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){ //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名 return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs(); } /** * 自动确认订单 消息队列 * @return */ @Bean(RabbitMQConstantUtil.CONFIRM_ORDER) public Queue confirmOrderQueue(){ return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true); } /** * 把自动确认订单消息队列绑定到交换机上 * @param queue * @param directExchange * @return */ @Bean public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue, @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){ //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名 return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs(); } }
package com.example.mqrabbitmqproducer.util.rabbitmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; /** * Routing 路由模型(交换机类型:direct) * 消息生成者 * @author qzz */ @Component public class DirectSender { private static final Logger log = LoggerFactory.getLogger(DirectSender.class); @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * @param routingKey * @param msg */ public void send (String routingKey,String msg){ Message message = MessageBuilder.withBody(msg.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8") .setMessageId(UUID.randomUUID()+"").build(); log.info("【发送者】消息内容【{}】 交换机【{}】 路由【{}】 消息ID【{}】",msg,RabbitMQConstantUtil.DIRECT_EXCHANGE ,routingKey,message.getMessageProperties().getMessageId()); rabbitTemplate.convertAndSend(RabbitMQConstantUtil.DIRECT_EXCHANGE,routingKey,message); } }
package com.example.mqrabbitmqproducer.controller; import com.alibaba.fastjson.JSONObject; import com.example.mqrabbitmqproducer.util.rabbitmq.DirectSender; import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; /** * 模拟测试消息发送 * @author qzz */ @RestController @RequestMapping("/order") public class TestRabbitMQSendMsg { /** * rabbitMQ消息发送 */ @Autowired private DirectSender directSender; /** * 测试取消订单,发送消息 */ @GetMapping("/cancel") public void cancel(){ //取消订单逻辑省略 //取消订单,发送消息 Map<String, Object> map = new HashMap<>(); map.put("order_number","4364756867987989"); map.put("product_id","1"); directSender.send(RabbitMQConstantUtil.CANCEL_ORDER, JSONObject.toJSONString(map)); } /** * 测试自动确认订单,发送消息 */ @GetMapping("/confirm") public void confirm(){ //自动确认订单,发送消息 String order_number="4364756867987989"; directSender.send(RabbitMQConstantUtil.CONFIRM_ORDER, order_number); } }
启动项目,进行测试:
(1)在postman中输入 http://localhost:8080/order/cancel,进行测试:
(2)在postman中输入 http://localhost:8080/order/confirm,进行测试:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--添加AMQP的启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
server:
port: 8083
spring:
application:
name: mq-rabbitmq-consumer
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
#注意:guest用户只能链接本地服务器 比如localhost 不可以连接远程服务器
username: guest
password: guest
#虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
virtual-host: /
配置直连交换机名称、消息队列名称、routingkey
package com.example.mqrabbitmqconsumer.util.rabbitmq; /** * RabbitMQ RoutingKey 常量工具类 * @author qzz */ public class RabbitMQConstantUtil { /** * 交换机名称 */ public static final String DIRECT_EXCHANGE = "directExchange"; /** * 取消订单 队列名称 \routingkey */ public static final String CANCEL_ORDER = "cancel-order"; /** * 自动确认订单 队列名称\routingkey */ public static final String CONFIRM_ORDER = "confirm-order"; }
注意:这里把消息队列名称和routingkey设置为同名。
rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
package com.example.mqrabbitmqconsumer.util.rabbitmq.config; import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitmq配置类:配置Exchange、Queue、以及绑定交换机 * @author qzz */ @Configuration @EnableRabbit public class RabbitMQConfig { private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class); @Autowired private RabbitTemplate rabbitTemplate; @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); /** * 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式, * 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类 */ factory.setMessageConverter(new Jackson2JsonMessageConverter()); //开启手动ACK factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public AmqpTemplate amqpTemplate(){ rabbitTemplate.setEncoding("UTF-8"); rabbitTemplate.setMandatory(true); /** * ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行 * config : 需要开启rabbitmq发送失败回退 * yml配置publisher-returns: true * 或rabbitTemplate.setMandatory(true);设置为true */ rabbitTemplate.setReturnsCallback(returnedMessage -> { String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId(); byte[] message = returnedMessage.getMessage().getBody(); Integer replyCode = returnedMessage.getReplyCode(); String replyText = returnedMessage.getReplyText(); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}", new String(message),messageId,replyCode,replyText,exchange,routingKey); }); return rabbitTemplate; } /** * 声明直连交换机 支持持久化 * @return */ @Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE) public Exchange directExchange(){ return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build(); } /** * 取消订单 消息队列 * @return */ @Bean(RabbitMQConstantUtil.CANCEL_ORDER) public Queue cancelOrderQueue(){ return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true); } /** * 把取消订单消息队列绑定到交换机上 * @param queue * @param directExchange * @return */ @Bean public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue, @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){ //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名 return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs(); } /** * 自动确认订单 消息队列 * @return */ @Bean(RabbitMQConstantUtil.CONFIRM_ORDER) public Queue confirmOrderQueue(){ return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true); } /** * 把自动确认订单消息队列绑定到交换机上 * @param queue * @param directExchange * @return */ @Bean public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue, @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){ //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名 return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs(); } }
(1)监听取消订单
package com.example.mqrabbitmqconsumer.listener; import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 监听取消订单 * @author qzz */ @Component public class RabbitMQCancelOrderListener { private static final Logger log = LoggerFactory.getLogger(RabbitMQCancelOrderListener.class); /** * 接受消息 * @param channel * @param message * @throws Exception */ @RabbitHandler @RabbitListener(queues = RabbitMQConstantUtil.CANCEL_ORDER) public void receiverMsg(Channel channel, Message message) throws Exception { //body 即消息体 String msg = new String(message.getBody()); String messageId = message.getMessageProperties().getMessageId(); log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId); try{ //如果有业务逻辑,则在这里编写 //告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("消息处理出现异常:{}",e.getMessage()); //告诉消息服务器 消息处理异常,消息需要重新再次发送! channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true); } } }
(2)监听自动确认订单
package com.example.mqrabbitmqconsumer.listener; import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 监听自动确认订单 * @author qzz */ @Component public class RabbitMQConfirmOrderListener { private static final Logger log = LoggerFactory.getLogger(RabbitMQConfirmOrderListener.class); /** * 接受消息 * @param channel * @param message * @throws Exception */ @RabbitHandler @RabbitListener(queues = RabbitMQConstantUtil.CONFIRM_ORDER) public void receiverMsg(Channel channel, Message message) throws Exception { //body 即消息体 String msg = new String(message.getBody()); String messageId = message.getMessageProperties().getMessageId(); log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId); try{ //如果有业务逻辑,则在这里编写 //告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("消息处理出现异常:{}",e.getMessage()); //告诉消息服务器 消息处理异常,消息需要重新再次发送! channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。