赞
踩
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛 消息中间件 1.提升异步通信,扩展解耦 2.消息代理和目的地 3.目的地形式:队列(queue),主题(topic) 4.点对点式:一个发送者一个接收者 5.发布订阅:发布者发送消息,多个接收者监听,同时收到消息 6.JMS java消息服务:ActiveMQ和HornetMQ是这种模式,如果不跨语言可以使用此服务 7.AMQP 高级消息队列协议:RabbitMQ,可跨语言 RabbitMQ: 核心概念: Message 消息:由消息头和体组成,消息头由一系列可选属性组成,routing-key(路由键),priority(优先权)等 Publisher 消息生产者,交换器发布消息的客户端 Exchange 交换器,接收消息并路由到服务器队列 Queue 消息队列 Binding 绑定,用于消息队列和交换器之间的关系 Connection 网络连接 Channel 信道,建立一条独立双向数据流通道,建立TCP通信开销大,所有开辟信道,复用一条TCP连接 Consumer 消费者,取得消息的客户端 Virtual Host虚拟主机,一批交换器和消息队列和相关对象 Broker 消息队列服务器实体 安装:docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management 整合RabbitMQ 1.引入pom文件spring-boot-starter-amaq 2.application.yml配置 3.测试RabbitMQ 1.AamqpAdmin:管理组件 2.RabbitTemplate:消息发送处理组件 4.RabbitMQ消息确认机制-可靠抵达 1.publisher confirmCallback 确认模式 2.publisher returnCallback 未投递到queue退回模式 3.consumer ack机制,手动配置
RabbitMQ配置
spring.rabbitmq.host=192.168.77.130
spring.rabbitmq.port=5672
虚拟主机配置
spring.rabbitmq.virtual-host=/
开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
#只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
#手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@Configuration public class MyRabbitConfig { private RabbitTemplate rabbitTemplate; @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(); return rabbitTemplate; } @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } // @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { /** * 1、只要消息抵达Broker就ack=true * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) * ack:消息是否成功收到 * cause:失败的原因 */ //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); }); }} ```java 在这里插入代码片
@Configuration public class MyRabbitMQConfig { /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */ /** * 死信队列 * * @return */@Bean public Queue orderDelayQueue() { /* Queue(String name, 队列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自动删除 Map<String, Object> arguments) 属性 */ HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); arguments.put("x-dead-letter-routing-key", "order.release.order"); arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } /** * 普通队列 * * @return */ @Bean public Queue orderReleaseQueue() { Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } @Bean public Exchange orderEventExchange() { return new TopicExchange("order-event-exchange", true, false); } @Bean public Binding orderCreateBinding() { return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null); } @Bean public Binding orderReleaseBinding() { return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } /** * 订单释放直接和库存释放进行绑定 * @return */ @Bean public Binding orderReleaseOtherBinding() { return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null); } @Bean public Queue orderSecKillOrrderQueue() { Queue queue = new Queue("order.seckill.order.queue", true, false, false); return queue; } @Bean public Binding orderSecKillOrrderQueueBinding() { Binding binding = new Binding( "order.seckill.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.seckill.order", null); return binding; }
## RabbitMQ延时队列(实现定时任务)
支付宝未付款,超过一定时间后,系统自动取消并释放占有物品
常见解决方案:spring的schedule定时任务轮询数据库
缺点:消耗内存,增加数据库压力 解决:rabbitMQ的消息TTL和死信Exchange结合
TTL就是消息的过期时间 ,MQ可以对队列和消息设置TTL,超过时间消息就死了称为死信
延时队列步骤就是先将消息放进队列设置30分钟过期,过期后消息会被丢进死信路由,死信路由再放进另一个队列,而微服务监听的就是这个队列
创建一个配置类,类创建两个队列queue,一个交换机exchange,两个绑定关系
消息丢失,积压,重复解决方案:
消息丢失:db创建一个表,每个消息做好日志记录,保存消息的详细信息,定期扫描数据库将失败的消息再发送一遍,设法消息重试发送
Broker尚未持久化完成,死机:publisher必须加入确认回调机制,确认成功的消息,修改数据库消息状态
自动ACK状态下,消费者收到消息,但没来得及消费:手动ACK,没消费成功就重新入队
消息重复:消息消费成功,但没返回ack,导致重新发送等各种情况:设计为幂等性,防重表,唯一标识。判断业务修改状态等
消息积压:消费者死机没消费等:上线更多的消费者或上线专门队列消费服务,将消息都取出来,记录数据库,离线慢慢处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。