赞
踩
compile ‘org.springframework.amqp:spring-rabbit:1.7.5.RELEASE’
package com.autoyol.car.study.service.message;
public class RabbitMqConfirmProducer implements ConfirmCallback { private static final Logger log = LoggerFactory.getLogger(RabbitMqConfirmProducer.class); private RabbitTemplate confirmRabbitTemplate; public RabbitMqConfirmProducer(RabbitTemplate confirmRabbitTemplate) { this.confirmRabbitTemplate = confirmRabbitTemplate; } public void sendTopicMsg(String exchange, String routeKey, Object message) { String correlationDataId = UUID.randomUUID().toString(); if (message == null) { return; } this.confirmRabbitTemplate.setConfirmCallback(this); CorrelationData correlationData = new CorrelationData(correlationDataId); this.confirmRabbitTemplate.convertAndSend(exchange, routeKey, message, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("messageId is " + correlationData.getId() + ",send message success"); } else { log.error("messageId is " + correlationData.getId() + ",send message failed:" + cause); } } }
package com.autoyol.car.study.service.message;
/** * @Description: 消息序列化 * @menu: FastJsonMessageConverter * @Date: 2021/2/25 **/ public class JsonMessageConverter extends AbstractMessageConverter { private static Logger log = LoggerFactory.getLogger(JsonMessageConverter.class); public static final String DEFAULT_CHARSET = "UTF-8"; public JsonMessageConverter() { } @Override public Object fromMessage(Message message) throws MessageConversionException { String json = ""; try { json = new String(message.getBody(), DEFAULT_CHARSET); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return JsonUtil.fromJson(json, Object.class); } @Override protected Message createMessage(Object objectToConvert, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes; try { String jsonString = JsonUtil.toJson(objectToConvert); bytes = jsonString.getBytes(DEFAULT_CHARSET); } catch (UnsupportedEncodingException e) { throw new MessageConversionException("Failed to convert Message content", e); } messageProperties.setContentType("application/json"); messageProperties.setContentEncoding(DEFAULT_CHARSET); if (bytes != null) { messageProperties.setContentLength((long) bytes.length); } return new Message(bytes, messageProperties); } }
package com.autoyol.car.study.service.message;
@Configuration @Data public class RabbitMqConsumeConfig { private final static Logger LOGGER = LoggerFactory.getLogger(RabbitMqConsumeConfig.class); private final static Integer prefetchCount = 10; @Bean public MessageConverter jsonMessageConverter() { return new JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); return template; } @Bean @Qualifier(value = "confirmRabbitTemplate") @Scope("prototype") public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory) { connectionFactory.setPublisherConfirms(true); RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(this.jsonMessageConverter()); return template; } @Bean public RabbitMqConfirmProducer rabbitMqConfirmProducer(CachingConnectionFactory connectionFactory) { RabbitMqConfirmProducer template = new RabbitMqConfirmProducer(confirmRabbitTemplate(connectionFactory)); return template; } @Bean("rabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 并发消费者数量 factory.setConcurrentConsumers(10); factory.setMaxConcurrentConsumers(50); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); return factory; } @Bean("simpleRabbitListenerSimpleFactory") public SimpleRabbitListenerContainerFactory simpleRabbitListenerSimpleFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(this.jsonMessageConverter()); factory.setConcurrentConsumers(prefetchCount); // 并发消费者数量 factory.setConcurrentConsumers(10); factory.setMaxConcurrentConsumers(50); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); return factory; } @Bean("confirmRabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory confirmRabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(jsonMessageConverter()); // 并发消费者数量 factory.setConcurrentConsumers(10); factory.setMaxConcurrentConsumers(50); /** 设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。*/ factory.setDefaultRequeueRejected(true); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setAdviceChain( RetryInterceptorBuilder .stateless() .recoverer(new RejectAndDontRequeueRecoverer()) .retryOperations(rabbitRetryTemplate()) .build() ); return factory; } @Bean public RetryTemplate rabbitRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // 设置监听(不是必须) retryTemplate.registerListener(new RetryListener() { @Override public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) { // 执行之前调用 (返回false时会终止执行) return true; } @Override public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { // 重试结束的时候调用 (最后一次重试 ) } @Override public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { // 异常 都会调用 LOGGER.info("当前队列消费重试次数:{},异常信息:", retryContext.getRetryCount(), throwable); } }); // 个性化处理异常和重试 (不是必须) Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>(); //设置重试异常和是否重试 retryableExceptions.put(AmqpException.class, true); //设置重试次数和要重试的异常 SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(5, retryableExceptions); retryTemplate.setBackOffPolicy(backOffPolicyByProperties()); retryTemplate.setRetryPolicy(retryPolicy); retryTemplate.setThrowLastExceptionOnExhausted(true); return retryTemplate; } @Bean public ExponentialBackOffPolicy backOffPolicyByProperties() { ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); // 重试间隔 backOffPolicy.setInitialInterval(10 * 1000); // 重试最大间隔 backOffPolicy.setMaxInterval(300 * 1000); // 重试间隔乘法策略 backOffPolicy.setMultiplier(3); return backOffPolicy; } }
package com.autoyol.car.study.service.message;
@Component public class TemplateRabbitConsumer { private final static Logger LOGGER = LoggerFactory.getLogger(TemplateRabbitConsumer.class); public final static String topic = "weixin_topic"; public final static String topic_key = "weixin_key"; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "weixin_queue", durable = "true", autoDelete = "true"), exchange = @Exchange(value = topic, type = ExchangeTypes.TOPIC), key = topic_key), containerFactory = "confirmRabbitListenerContainerFactory") public void onMessageOfOfRabbit(Message message, Channel channel) { try { LOGGER.info("onMessageOfOfRabbit->>当前消息数据:{}", new String(message.getBody())); User user = JsonUtil.fromJson(new String(message.getBody()), User.class); LOGGER.info("onMessageOfOfRabbit->>当前车辆编号:{}", JsonUtil.toJson(user)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { LOGGER.info("exception is ", e); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。