赞
踩
目录
@RabbitListener和@RabbitHandler接收消费消息
同步方式消耗150ms
异步方式消耗(等待注册邮件和注册短信中耗时最久的服务执行完响应给用户)100ms
耗时55ms,将注册邮件和注册短信服务写入消息,无需关注这两个服务,需要的时候去消息里面拿就行。
如果库存系统升级,订单系统业务跟着升级
订单系统只需写入消息,库存系统去订阅就行
JMS(Java Message Service) | AMQP(Advanced Message Queuing Protocol) | |
定义 | Java api | 网络线级协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
Model | 提供两种消息模型: 、Peer-2-Peer 、Pub/sub | 提供了五种消息模型: 、direct exchange 、fanout exchange 、topic change 、headers exchange 、system exchange 本质来讲,后四种和JMS的pub/sub模型没有太大差别 仅是在路由机制上做了更详细的划分; |
支持消息类 型 | 多种消息类型: TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message (只有消息头和属性) | byte[] 当实际应用时,有复杂的消息,可以将消息序列化后发 送。 |
综合评价 | JMS 定义了JAVA API层面的标准;在java体系中, 多个client均可以通过JMS进行交互,不需要应用修 改代码,但是其对跨平台的支持较差; | AMQP定义了wire-level层的协议标准;天然具有跨平 台、跨语言特性。 |
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。路由键是有队列设置的,用来和交换见建立绑定关系的,在点对点中必须指定路由键,发布订阅模式可以不用指定。生成者发送消息的交换机需要携带路由键,在点对点模式中携带的路由键和交换机绑定的key相同时才会在相应队列获取到消息,发布订阅模式,如果队列没有指定路由键,生产者也无需携带路由键。
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认),fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。
Exchange和Queue的绑定可以是多对多的关系。
Connection
网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都 是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个mini版的RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时 指定,RabbitMQ 默认的vhost是/。
Broker
表示消息队列服务器实体
、
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
4369,25672(Erlang发现&集群端口)
5672,5671(AMQP端口)
15672 (web管理后台端口)
61613,61614(STOMP协议端口)
1883,8883(MQTT协议端口)
https://www.rabbitmq.com/networking.html
生产者发送消息到交换机,消费者从队列中获取信息,交换机会绑定队列
Exchange 类型
目前headers弃用,只学习direct、fanout、topic三种交换机
direct点对点交换机:如果路由键和binding中的bindingkey一致,交换机将消息发到对应的队列中。
fonout发布订阅交换机:(可以没有路由键,因为他是广播机制,只要消息发给交换机,交换机绑定的队列全部收到)交换机绑定的所有队列都可以拿到消息
topic发布订阅:按照特定规则,交换机绑定的满足规则的队列拿到消息。
- @Test
- public void createExchange() {
- DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
- // 创建交换机
- amqpAdmin.declareExchange(directExchange);
- log.info("Exchange[{}]创建成功",directExchange.getName());
- }
- @Test
- public void createQueue() {
- Queue queue = new Queue("hello-java-queue", true, false, false);
- // 创建队列
- amqpAdmin.declareQueue(queue);
- log.info("Queue[{}]创建成功",queue.getName());
- }
-
- @Test
- public void createBinding() {
- // String destination 目的地, DestinationType destinationType 目的地类型
- // , String exchange 交换机, String routingKey,路由键
- // Map<String, Object> arguments
- Binding binding = new Binding("hello-java-queue",
- Binding.DestinationType.QUEUE,
- "hello-java-exchange",
- "hello.java",
- null);
- amqpAdmin.declareBinding(binding);
- log.info("Binding[{}]创建成功",binding.getRoutingKey());
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- @Test
- public void sendMsg() {
-
- //String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor,
- // CorrelationData correlationData
- // 1、发送的消息如果是对象,需要将对象序列化写出去,实现Serializable接口
- // 2、出了实现Serializable接口进行jdk序列化以外,对象还可以转为json进行传递(配置MessageConverter消息转换器)
- // 发送的都是同一类型消息
- // for (int i = 0; i < 10; i++) {
- // OrderEntity entity = new OrderEntity();
- // entity.setId((long) i);
- // entity.setCreateTime(new Date());
- // entity.setGrowth(99);
- // rabbitTemplate.convertAndSend("hello-java-exchange",
- // "hello.java",
- // entity);
- // log.info("消息发送完成[{}]",entity);
- // }
- // 发送不同类型的消息 这个时候使用@RabbitListener不能达到重载效果,需要配合@RabbitHandler使用
- for (int i = 0; i < 10; i++) {
- if(i%2==0) {
- OrderEntity entity = new OrderEntity();
- entity.setId((long) i);
- entity.setCreateTime(new Date());
- entity.setGrowth(99);
- rabbitTemplate.convertAndSend("hello-java-exchange",
- "hello.java",
- entity);
- log.info("消息发送完成[{}]",entity);
- }else{
- OrderReturnReasonEntity entity = new OrderReturnReasonEntity();
- entity.setId((long) i);
- entity.setCreateTime(new Date());
- rabbitTemplate.convertAndSend("hello-java-exchange",
- "hello.java",
- entity);
- log.info("消息发送完成[{}]",entity);
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
前期准备:接收消息要在启动类上添加@EnableRabbit注解,
@RabbitListener和可以作用在类和方法上
@RabbitHandler作用在方法上
只使用@RabbitListener也可作用在方法上也可接受消息,不过只能接收同种类型消息,配合@RabbitHandler进行重载可以接受多种不同消息。
- @RabbitListener(queues = {"hello-java-queue"})
- @Service("orderService")
- public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> implements OrderService {
- // @RabbitListener(queues = {"hello-java-queue"})
- public void receiveMsg(Message message,OrderEntity entity) {
- byte[] messageBody = message.getBody();
- System.out.println("接收到的消息"+message+"==内容是"+entity);
- }
- @RabbitHandler
- public void receiveMsg(Message message, OrderEntity entity) {
- System.out.println("接收到的消息" + entity);
- }
- @RabbitHandler
- public void receiveMsg2(Message message, OrderReturnReasonEntity entity) {
- System.out.println("接收到的消息" + entity);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
1、在application.properties中配置:
- # 开启发送端确认(消息代理broker确认收到消息)
- spring.rabbitmq.publisher-confirms=true
2、定制rabbitTemplate,设置confirmCallback
- /**
- * 定制 RabbitTemplate
- */
- @PostConstruct // MyRabbitmqConfig对象创建完成调用该方法设置ConfirmCallback
- public void initRabbitTemplate() {
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- /**
- * 只要消息抵达消息代理broker ack即为true
- * @param correlationData 当前消息唯一关联数据(id)
- * @param ack 消息是否成功发出
- * @param cause 失败的原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("confirm...=>CorrelationData["+correlationData+"]=>ack["+ack+"]=>cause["+cause+"]");
- }
- });
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
1、在application.properties中配置:
- # 开启发送端消息抵达队列确认
- spring.rabbitmq.publisher-returns=true
- # 只要抵达队列,以异步的方式优先回调我们这个returnconfirm (可以不做配置)
- spring.rabbitmq.template.mandatory=true
2、定制rabbitTemplate,设置ReturnCallback
- /**
- * 定制RabbitTemplate
- * 1、服务收到消息就会回调
- * 1、spring.rabbitmq.publisher-confirms: true
- * 2、设置确认回调
- * 2、消息正确抵达队列就会进行回调
- * 1、spring.rabbitmq.publisher-returns: true
- * spring.rabbitmq.template.mandatory: true
- * 2、设置确认回调ReturnCallback
- *
- * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
- *
- */
- @PostConstruct // MyRabbitmqConfig对象创建完成调用该方法设置ConfirmCallback
- public void initRabbitTemplate() {
- //设置broker收到消息确认回调
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- /**
- * 只要消息抵达消息代理broker ack即为true
- * @param correlationData 当前消息唯一关联数据(id)
- * @param ack 消息是否成功发出
- * @param cause 失败的原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("confirm...=>CorrelationData["+correlationData+"]=>ack["+ack+"]=>cause["+cause+"]");
- }
- });
-
- /**
- * 只要消息没有投递给指定的队列,就触发这个失败回调
- * message:投递失败的消息详细信息
- * replyCode:回复的状态码
- * replyText:回复的文本内容
- * exchange:当时这个消息发给哪个交换机
- * routingKey:当时这个消息用哪个路邮键
- */
- rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText,
- String exchange, String routingKey)->{
- System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
- "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
- });
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
/** * 定制RabbitTemplate * 1、服务收到消息就会回调 * 1、spring.rabbitmq.publisher-confirms: true * 2、设置确认回调 * 2、消息正确抵达队列就会进行回调 * 1、spring.rabbitmq.publisher-returns: true * spring.rabbitmq.template.mandatory: true * 2、设置确认回调ReturnCallback * * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) * 1、默认是自动确认的,只要消息接收到,客户端就会自动确认,broker服务端就会删除这个消息 * 问题:我们收到很多消息,自动回复给服务端ack,只有一个消息处理成功,宕机了。发生消息丢失(其他消息还没消费,消息就从broker中删除了) * 解决方法手动确认# 开启手动确认spring.rabbitmq.listener.simple.acknowledge-mode=manual * 手动确认只要我们没有明确告诉mq,消息被消费,没有ack,消息就一直保持unacked状态,即使生产者服务宕机,消息也会一直保存在broker队列中,不被删除,此时为ready(可用)状态 * 2、手动确认如何确认消费消息 * // 消费确认消息 * channel.basicAck(deliveryTag,false); 业务执行成功,消费消息 * // 拒绝消费消息 * channel.basicNack(deliveryTag,false,true); 业务执行失败,拒绝消费,可以重新放入队列给被别人消费,也可直接丢弃 * channel.basicReject(deliveryTag,true); * */ @PostConstruct // MyRabbitmqConfig对象创建完成调用该方法设置ConfirmCallback public void initRabbitTemplate() { //设置broker收到消息确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 只要消息抵达消息代理broker ack即为true * @param correlationData 当前消息唯一关联数据(id) * @param ack 消息是否成功发出 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm...=>CorrelationData["+correlationData+"]=>ack["+ack+"]=>cause["+cause+"]"); } }); /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * message:投递失败的消息详细信息 * replyCode:回复的状态码 * replyText:回复的文本内容 * exchange:当时这个消息发给哪个交换机 * routingKey:当时这个消息用哪个路邮键 */ rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{ System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); }); }
手动ack
- @RabbitHandler
- public void receiveMsg(Message message, OrderEntity entity, Channel channel) {
- System.out.println("接收到的消息" + entity);
- System.out.println("消息处理完成=>"+entity.getId());
- // channel内顺序自增的
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- System.out.println("deliveryTag==>"+deliveryTag);
- // 签收消息 multiple是否批量
- try {
- if(deliveryTag%2==0) {
- // 消费确认消息
- channel.basicAck(deliveryTag,false);
- System.out.println("消费了消息"+deliveryTag);
- }else{
- // 拒绝消费消息,退回给broker消息代理服务器,requeue为true消息服务器重新入队,等待消息被消费确认
- // requeue为false,消息被丢弃
- channel.basicNack(deliveryTag,false,true);
- System.out.println("取消消费"+deliveryTag+"消息,重新入队");
- // 和basicNack类似
- channel.basicReject(deliveryTag,true);
- }
- } catch (IOException e) {
- // 网络中断
- e.printStackTrace();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。