赞
踩
@Bean(name = "connectionFactory") @Primary public ConnectionFactory normalConnectionFactory( @Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password, @Value("${spring.rabbitmq.addresses}") String address) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(address); connectionFactory.setUsername(username); connectionFactory.setPassword(password); // connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); connectionFactory.setExecutor(createThreadPool(10, 20, "mq-connection-", "mq-connection-group")); return connectionFactory; }
或者配置文件里配置
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
publisher-confirm-type: correlated
publisher-confirm-type属性有三个可选值:
开启simple模式需要在invoke方法中一起执行 rabbitTemplate.waitForConfirms
同时也会收到回调,回调后结束阻塞,同时可以获取到返回结果。
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println(correlationData.toString() + "发送成功"); }else { System.out.println(correlationData.toString() + "发送失败, 原因: " + cause); } } }; rabbitTemplate.setConfirmCallback(confirmCallback); Boolean invoke = rabbitTemplate.invoke(operations -> { rabbitTemplate.convertAndSend("direct_exchange", "ROUTING_KEY_01", message, correlationData); return rabbitTemplate.waitForConfirms(1000l); });
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println(correlationData.toString() + "发送成功"); }else { System.out.println(correlationData.toString() + "发送失败, 原因: " + cause); } } }; rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.convertAndSend("direct_exchange", "ROUTING_KEY_01", message, correlationData); // correlationData.getFuture().get(); sleep(1000*60); System.out.println("发送消息boot mq hello Direct成功");
可以看出来,在开启publisher-confirm的情况下,如果不自行实现ConfirmCallback的逻辑,也无法做到保证消息成功发送。
可以在发送消息时更新为发送中。
收到callback更新为发送成功,或者发送失败。
对于发送失败的安排重试,可以在消息头加上重试次数记录重试次数,达到指定次数,更新为发送失败。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。