赞
踩
@Configuration @EnableAsync @EnableRabbit @Slf4j public class MqConfig { @Autowired RabbitProperties properties; @Bean public ConnectionFactory rabbitConnectionFactory(@Value("${rabbit.uri}") String uriStr){ log.info("rabbit配置为:{}", uriStr); CachingConnectionFactory connectionFactory = new CachingConnectionFactory(URI.create(uriStr)); connectionFactory.setChannelCacheSize(200); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; } @Bean public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory(); simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory); simpleRabbitListenerContainerFactory.setConcurrentConsumers(Runtime.getRuntime().availableProcessors()); simpleRabbitListenerContainerFactory.setAdviceChain( RetryInterceptorBuilder .stateless() .recoverer(new RejectAndDontRequeueRecoverer()) .retryOperations(rabbitRetryTemplate()) .build() ); return simpleRabbitListenerContainerFactory; } @Bean public RetryTemplate rabbitRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // 设置监听(不是必须) retryTemplate.registerListener(new RetryListener() { @Override public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) { // 执行之前调用 (返回false时会终止执行) return true; } @Override public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { // 重试结束的时候调用 (最后一次重试 ) } @Override public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { // 异常 都会调用 log.error("-----第{}次调用", retryContext.getRetryCount()); } }); // 个性化处理异常和重试 (不是必须) /* Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>(); //设置重试异常和是否重试 retryableExceptions.put(AmqpException.class, true); //设置重试次数和要重试的异常 SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(5,retryableExceptions);*/ retryTemplate.setBackOffPolicy(backOffPolicyByProperties()); retryTemplate.setRetryPolicy(retryPolicyByProperties()); return retryTemplate; } @Bean public ExponentialBackOffPolicy backOffPolicyByProperties() { ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds(); long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds(); double multiplier = properties.getListener().getSimple().getRetry().getMultiplier(); // 重试间隔 backOffPolicy.setInitialInterval(initialInterval * 1000); // 重试最大间隔 backOffPolicy.setMaxInterval(maxInterval * 1000); // 重试间隔乘法策略 backOffPolicy.setMultiplier(multiplier); return backOffPolicy; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean public SimpleRetryPolicy retryPolicyByProperties() { SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts(); retryPolicy.setMaxAttempts(maxAttempts); return retryPolicy; } }
在rabbitListenerContainerFactory中设置消息处理策略和重试操作
simpleRabbitListenerContainerFactory.setAdviceChain(
RetryInterceptorBuilder
.stateless()
.recoverer(new RejectAndDontRequeueRecoverer())
.retryOperations(rabbitRetryTemplate())
.build()
);
消息处理策略:
重试机制
设置监听、重试间隔机制、重试机制
@Bean public RetryTemplate rabbitRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // 设置监听(不是必须) retryTemplate.registerListener(new RetryListener() { @Override public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) { // 执行之前调用 (返回false时会终止执行) return true; } @Override public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { // 重试结束的时候调用 (最后一次重试 ) } @Override public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { // 异常 都会调用 log.error("-----第{}次调用", retryContext.getRetryCount()); } }); // 个性化处理异常和重试 (不是必须) /* Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>(); //设置重试异常和是否重试 retryableExceptions.put(AmqpException.class, true); //设置重试次数和要重试的异常 SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(5,retryableExceptions);*/ retryTemplate.setBackOffPolicy(backOffPolicyByProperties()); retryTemplate.setRetryPolicy(retryPolicyByProperties()); return retryTemplate; }
设置重试间隔机制
@Bean
public ExponentialBackOffPolicy backOffPolicyByProperties() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();
// 重试间隔
backOffPolicy.setInitialInterval(initialInterval * 1000);
// 重试最大间隔
backOffPolicy.setMaxInterval(maxInterval * 1000);
// 重试间隔乘法策略
backOffPolicy.setMultiplier(multiplier);
return backOffPolicy;
}
设置重试次数
@Bean
public SimpleRetryPolicy retryPolicyByProperties() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts();
retryPolicy.setMaxAttempts(maxAttempts);
return retryPolicy;
}
后续补充说明mq接收消息功能的实现
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。