赞
踩
又称Broker, 接受客户端的连接,实现AMQP实体服务,这里指RabbitMQ 服务器
连接,应用程序与Broker的网络连接。
交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种。
Exchange 和 Queue 之间的虚拟连接,binding 中可以包含 routing key。
路由规则,虚拟机可用它来确定如何路由一个特定消息,即交换机绑定到 Queue 的键。
也称为Message Queue,消息队列,保存消息并将它们转发给消费者。
2.2.7.RELEASE
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息重要时,生产者和消费者都要创建队列。避免造成消息延迟或丢失。
消息不重要时,消费者创建队列。避免造成在没有消费者时,生产者发送的消息在MQ积压。
1.生产者生产消息后投递到队列之前,先将消息信息保存到数据库,状态为发送中。
2.队列将消息投递到Exchange,并触发ConfirmCallback回调。
3.判断ConfirmCallback回调返回的ack信息,为true时删除DB中对应的信息,为false时修改状态为发送失败。
4.Exchange通过匹配Routing key将消息转发到指定的队列中。
5.没有匹配的队列时通过ReturnCallback通知生产者,记录日志或保存到死信队列。
6.启动定时任务将数据库中ConfirmCallback失败的消息重新投递到Exchange,为防止死循环应该设置一个最大重试次数。
spring:
rabbitmq:
addresses: ***.***.***.***
port: 5672
username: admin
password: ******
#开启 confirm 确认机制
publisher-confirm-type: correlated
#开启 return 确认机制
publisher-returns: true
template:
#设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
mandatory: true
@Configuration public class RabbitMqConfig { @Autowired private MsgSendReturnCallBack msgSendReturnCallBack; @Autowired private MsgSendConfirmCallBack msgSendConfirmCallBack; /** * 消息确认,生产者→mq * * @param connectionFactory * @return */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setConfirmCallback(msgSendConfirmCallBack); template.setReturnCallback(msgSendReturnCallBack); return template; } }
/** * @Author: World哥 * @Description: * @Date: Create in 15:42 2020/1/13 */ @Slf4j @Service public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MsgBroberService msgBroberService; /** * 发送消息 * * @param obj 消息对象 */ public void send(String exchange, String routingKey, Object obj) { String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); String msgJson = JsonMapper.INSTANCE.toJson(obj); Message message = MessageBuilder.withBody(msgJson.getBytes()).build(); // 消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // json数据 message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); // 消息落库 MsgBrober msgBrober = MsgBrober.builder() .corrId(uuid) .message(msgJson) .status(MsgBroberConstant.SENDING_STATUS) .exchange(exchange) .rountingKey(routingKey) .build(); msgBroberService.save(msgBrober); try { rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationId); log.info("消息正在发送。。。"); } catch (Exception e) { log.error("消息发送失败, e={}", e.getMessage()); msgBroberService.update(Wrappers.<MsgBrober>lambdaUpdate() .set(MsgBrober::getReason, e.getMessage()) .set(MsgBrober::getStatus, MsgBroberConstant.FAIL_STATUS) .eq(MsgBrober::getCorrId, uuid)); } } public void send(MsgBrober msgBrober) { CorrelationData correlationId = new CorrelationData(msgBrober.getCorrId()); Message message = MessageBuilder.withBody(msgBrober.getMessage().getBytes()).build(); // 消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // json数据 message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); rabbitTemplate.convertAndSend(msgBrober.getExchange(), msgBrober.getRountingKey(), message, correlationId); } }
/** * @ClassName MsgSendConfirmCallBack * @Description 消息发送到交换机确认机制回调 * @Author World哥 * @Date 2019/8/5 15:49 * @Version 1.0 **/ @Slf4j @Component public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Autowired private MsgBroberService msgBroberService; @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息已成功投递,MsgSendConfirmCallBack, 回调id={}, ack={}, returnedMessage={}",correlationData, ack, correlationData.getReturnedMessage()); msgBroberService.remove(Wrappers.<MsgBrober>lambdaQuery().eq(MsgBrober::getCorrId, correlationData.getId())); } else { log.error("MsgSendConfirmCallBack, 回调id={}, ack={}, returnedMessage={}", correlationData, ack, correlationData.getReturnedMessage()); msgBroberService.update(Wrappers.<MsgBrober>lambdaUpdate() .set(MsgBrober::getStatus, MsgBroberConstant.FAIL_STATUS) .set(MsgBrober::getReason, cause) .eq(MsgBrober::getCorrId, correlationData.getId())); } } }
/** * @Author: World哥 * @Description:消息从交换器发送到对应队列失败时触发,指定的routingKey找不到队列时会触发 * @Date: Create in 16:03 2020/1/13 */ @Slf4j @Component public class MsgSendReturnCallBack implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routing) { log.error("消息主题 message={}", message); log.error("回复代码 reply={}", replyCode); log.error("回复信息 replyText={}", replyText); log.error("使用的 exchange={}", exchange); log.error("路由 routing={}", routing); } }
@Data @Builder @NoArgsConstructor @AllArgsConstructor @TableName(value = "m_msg_broker") public class MsgBrober { @TableId(type = IdType.AUTO) private int id; private int retry; // 尝试投递次数 private int status; // 状态,0-消息正在发送;1-消息发送成功;2-消息发送失败 private String reason; // 失败原因 private String corrId; // 消息唯一标识 private String message;// 消息体 private String exchange;// 交换器 private String rountingKey; //路由键 private Date createTime; private Date updateTime; }
-- ---------------------------- -- Table structure for m_msg_broker -- ---------------------------- DROP TABLE IF EXISTS `m_msg_broker`; CREATE TABLE `m_msg_broker` ( `id` int(11) NOT NULL AUTO_INCREMENT, `retry` tinyint(4) NOT NULL DEFAULT 0 COMMENT '尝试投递次数', `status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '此时的消息的状态', `reason` varchar(1000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '失败原因', `corr_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '消息唯一标识', `message` varchar(10000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '消息体', `exchange` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '交换器', `rounting_key` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '' COMMENT '路由键', `create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0), `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0), PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `msg_broker_corr_id_uindex`(`corr_id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'rabbit消息' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
/** * @Author: World哥 * @Description: * @Date: Create in 15:08 2020/1/14 */ public class MsgBroberConstant { public static final int SENDING_STATUS = 0; //消息正在发送 public static final int SUCCESS_STATUS = 1; // 消息发送成功 public static final int FAIL_STATUS = 2; // 消息发送失败 public static final int NOT_FOUNT = -1; //数据不存在 public static final int FIRST_RETRY = 1; //第一次重试 public static final int MAX_RETRY = 3; //重试上限 }
/**
* @Author: World哥
* @Description:
* @Date: Create in 10:50 2020/7/3
*/
public class RabbitConstant {
public static final String TEST_EXCHANGE = "test_exchange.direct";
/**
* 车辆入场信息队列
*/
public static final String CARIN_QUEUE = "carin";
public static final String CARIN_KEY = "carin_key";
}
/** * @Author: World哥 * @Description: * @Date: Create in 16:00 2020/1/14 */ @Slf4j @EnableScheduling @Component public class RetryMessageTasker { @Autowired private MsgBroberService msgBroberService; @Autowired private MessageProducer messageProducer; @Scheduled(initialDelay = 5 * 1000L, fixedDelay = 10 * 1000L) public void retrySendFailMessage() { List<MsgBrober> list = msgBroberService.list( Wrappers.<MsgBrober>lambdaQuery() .eq(MsgBrober::getStatus, MsgBroberConstant.FAIL_STATUS) .lt(MsgBrober::getRetry, MsgBroberConstant.MAX_RETRY) ); if (!CollectionUtils.isEmpty(list)) { list.stream().forEach( msg -> { try { messageProducer.send(msg); log.info("失败消息重发中。。。"); } catch (Exception e) { log.error("消息重发失败"); msg.setReason(e.getMessage()); } // 更新发送次数 msg.setRetry(msg.getRetry() + 1); msgBroberService.updateById(msg); }); } } }
spring:
rabbitmq:
addresses: ***.***.***.***
port: 5672
username: admin
password: *****
listener:
simple:
acknowledge-mode: auto #消费端自动确认
retry:
enabled: true #开启消费端重试机制
max-attempts: 4 #最大重试次数,包含首次消费消息
initial-interval: 3000 #每次间隔时间
multiplier: 2 #应用于前一重试间隔的乘法器。
default-requeue-rejected: true # 重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
@Slf4j @Component public class MqConsumer { /** * QueueBinding将Queue绑定到Exchange,通过key匹配 * durable=true时设置EXCHANGE和QUEUE为持久的, * exclusive:将一个Queue声明成为排他性的, * ①只对首次声明它的连接(Connection)可见。 * ②会在其连接断开的时候自动删除。而不管这个队列是否被声明成持久性的(Durable =true)。 * 也就是说即使客户端程序将一个排他性的队列声明成了Durable的,只要调用了连接的Close方法或者客户端程序退出了, * RabbitMQ都会删除这个队列。注意这里是连接断开的时候,而不是通道断开。这个其实前一点保持一致,只区别连接而非通道。 * autoDelete订阅该队列的消费者下线后,该队列会自动删除 * @param body */ @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = RabbitConstant.TEST_EXCHANGE, type = ExchangeTypes.DIRECT,durable = "true", autoDelete = "false"), key = RabbitConstant.TEST_KEY, value = @Queue(name = RabbitConstant.TEST_QUEUE, durable = "true", exclusive = "false", autoDelete = "false") ), concurrency = "1-10") public void test(@Payload String body) { log.info("接收到消息,body={}", body); }
spring:
rabbitmq:
addresses: ***.***.***.***
port: 5672
username: admin
password: *****
listener:
simple:
acknowledge-mode: manual
@RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = RabbitConstant.TEST_EXCHANGE, type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"), key = RabbitConstant.TEST_KEY, value = @Queue(name = RabbitConstant.TEST_QUEUE, durable = "true", exclusive = "false", autoDelete = "false") ), concurrency = "1-10") public void test(Channel channel,@Payload String body, @Header(AmqpHeaders.DELIVERY_TAG) long tag,@Header(AmqpHeaders.REDELIVERED) boolean redelivered ) throws IOException { log.info("接收到消息,body={}", body); try { int i = 10 / 0; /** * 无异常就确认消息 * basicAck(long deliveryTag, boolean multiple) * deliveryTag:取出来当前消息在队列中的的索引; * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认 * deliveryTag为5及其以下的消息;一般设置为false */ channel.basicAck(tag, true); } catch (Exception e) { if (redelivered) { log.error("消息已重复处理失败,拒绝再次接收,{}", body); // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列 channel.basicReject(tag, false); } else { log.error("carIn message Error", e); /** * 有异常就绝收消息 * basicNack(long deliveryTag, boolean multiple, boolean requeue) * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者; * false:将消息丢弃 */ channel.basicNack(tag, false, true); } } }
通过acknowledge-mode进行消费确认配置,可选值有none、auto、manual。
有ack的模式下,需要考虑setDefaultRequeueRejected(false),否则当消费消息抛出异常没有catch住时,这条消息会被rabbitmq放回到queue头部,再被推送过来,然后再抛异常再放回…死循环了。设置false的作用是抛异常时不放回,而是直接丢弃,所以可能需要对这条消息做处理,以免丢失。
只启动生产者,此时MQ上不会创建Exchange
通过测试接口将消息发送到TEST_EXCHANGE = “test_exchange.direct”;
可以看到,第一次发送消息返回404 NOT_FOUND - no exchange ‘test_exchange.direct’ in vhost ‘/’
间隔5s后进行了第一次重试,然后间隔10s又进行了两次重试。
查看数据库保存的错误信息,重试此数已经达到了3次,此时该消息不再尝试投递,避免故障未排除时程序产生大量消息一直重试造成资源消耗。
启动消费者后,修改重试次数为1,该消息重新发送投递。
启动消费者,修改生产者Exchange 的routingkey为其它值后重启。
通过测试可以发现,消息已经成功投递到了Exchange,但是MsgSendReturnCallBack返回了错误信息
将routingkey恢复后重启生产者,再次测试,两端都成功了。
@RabbitListener底层会使用AOP拦截,代码正常结束则会自动提交事务,但是如果有异常抛出,则会自动实现补偿,消息会一直缓存到消息服务器中,一直到不抛出异常。
需要注意的是,无论是手动重试还是自动重试,都应该指定重试次数,以免程序出现死循环,造成资源消耗。消费失败的消息会重新回到队列的头部,造成后面的消息积压。
消费者接收到消息后,手动制造一个异常
第一次消费失败后又重试了三次,达到了配置的最大重试次数后不再重试,直接丢弃。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。