赞
踩
TopicExchange型交换机,会根据RouteKey将消息路由至匹配队列(推模式下,消息将被推送至消费者的监听函数处理),消息流转和路由的示意图如下。
RabbitMQ的高可靠保障主要在投递、持久化、消费三个方面,分别对应于生产者、消息队列、消费者,具体部流程如下图所示。
为防止消息在未发送到Broker前就由于网络原因或其他情况导致丢失,生产者端需要保障消息成功送达中间件,手段一般有两种,一是使用事务机制,但这种做法是非异步的,吞吐量性能较差;另一种是确认机制,本例使用确认机制。
生产端高可靠只能让生产者知道自己投递的消息是否成果到达交换机或某个队列,但是无法知道消费者有没有成功消费。
// 设置消息入队失败回调 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { log.debug(s); } }); // 设置消息发送至交换机回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message m,int i,String s,String q,String q) { log.debug(s); } }); MessageProperties messageProperties = new MessageProperties(); // 开启消息持久化 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setReceivedDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息ID messageProperties.setMessageId(UUID.randomUUID().toString()); // 设置消息格式 messageProperties.setContentType("application/json"); Message message = new Message("msgBody".getContent().getBytes(), messageProperties); // 发送消息 rabbitTemplate.send("exchangeName", "routingKey", message);
为防止消息中间件在重启或宕机等场景丢失消息,开启消息和队列的持久化设置。主要有两个步骤:
/************** * 交换机配置 **************/ @Bean public TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME); } /************** * 队列配置 **************/ @Bean public Queue queue() { // 参数2true代表持久化队列 return new Queue(QUEUE_NAME, true, false, false); } @Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); }
为防止消息不能成功送达消费者,需要使用消费确认机制,消费者在订阅队列时将acknowledge-mode设为manual,在消息被成功消费后主动响应。
此处为推模式:即队列把消息推送给监听者
@Component @RabbitListener(queues = QUEUE_NAME, containerFactory = "rabbitListenerContainerFactory") public class MsgListener { @Autowired private MsgService msgService; @RabbitHandler public void processStatisticMessage(Message message, Channel channel, Map content) throws Exception { MessageProperties msgProperties = message.getMessageProperties(); String messageId = msgProperties.getMessageId(); String routingKey = msgProperties.getReceivedRoutingKey(); long deliveryTag = msgProperties.getDeliveryTag(); // 幂等判断,拒绝重复消息 if (msgService.hasDuplicateMsgId(messageId)) { channel.basicReject(deliveryTag, false); return; } try { // 消费消息 msgService.process(content, routingKey); channel.basicAck(deliveryTag, false); } catch (Exception e) { channel.basicNack(deliveryTag, false, false); } } }
@Bean public CachingConnectionFactory cachingConnectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(HOST, PORT); // 设置用户名密码 factory.setUsername(rabbitHostConfig.getUsername()); factory.setPassword(rabbitHostConfig.getPassword()); // 设置虚拟主机 factory.setVirtualHost(VIRTUAL_HOST); // 消息送达交换机确认 factory.setPublisherConfirms(true); factory.setPublisherReturns(true); // 设置缓存模式 factory.setCacheMode(CacheMode.CHANNEL); // 设置缓存数 factory.setChannelCacheSize(8); return factory; } @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(cachingConnectionFactory); rabbitTemplate.setMandatory(true); return rabbitTemplate; }
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
// 消息转换器
factory.setMessageConverter(new Jackson2JsonMessageConverter());
// ack模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 消费者并发数
factory.setConcurrentConsumers(8);
// 预取消息数
factory.setPrefetchCount(8);
return factory;
}
在消息生产时,使用UUID构建唯一msgId。
在消息消费前,通过判断Redis中msgId来确认是否重复消费;在消息成功消费后,将msgId保存至Redis并设置过期时间。
在消息消费重试机制上,摒弃了requeue模式,因为requeue会和幂等性消费产生冲突,最致命的是requeue可能产生循环,导致队列被阻塞。
采用Spring自带的Retry可以在消费端业务逻辑里重试,避免消息重新入队导致的各种问题。这里将回退策略设置为指数型,最多重试3次,初始间隔6秒,每次间隔10倍。
需在启动类加上注解
@EnableRetry
@Override
@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(maxDelay = 600000L, delay = 6000L, multiplier = 10))
public void process(Map content, String type) throws Exception {
log.info("process msg:{}", content);
}
消息消费时,若重试超限,将消息转发至死信队列,以备人工排查定位或后期做补偿机制。
@Bean(name = "deadQueue") public Queue deadQueue() { Map<String, Object> args = new HashMap<String, Object>(); // 设置队列长度 // args.put("x-max-length", 1000); // 设置死信队列 args.put("x-dead-letter-exchange", "dlx.exchange"); // 持久化队列 return new Queue("dlx.queue", true, false, false, args); } @Bean(name = "deadExchange") public TopicExchange deadExchange() { return new TopicExchange("dlx.exchange"); } @Bean(name = "deadBinding") public Binding deadBinding(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("dead.#"); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。