赞
踩
rocketMQ集成springboot使用@RocketMQMessageListener监听消息,实体接收消息使用fastjson的@JSONField注解无法实现下划线自动转驼峰
RocketMQAutoConfiguration通过import的方式向容器中注入MessageConverterConfiguration类
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
...
}
MessageConverterConfiguration类会注入一个类型为RocketMQMessageConverter的bean
@Configuration
@ConditionalOnMissingBean(RocketMQMessageConverter.class)
class MessageConverterConfiguration {
@Bean
public RocketMQMessageConverter createRocketMQMessageConverter() {
return new RocketMQMessageConverter();
}
}
RocketMQMessageConverter在构造对象的时候执行静态代码块,初始化两个boolean类型的静态常量JACKSON_PRESENT
、FASTJSON_PRESENT
,只要项目中引入了相关的包就会赋值为true
JACKSON相关包com.fasterxml.jackson.databind.ObjectMapper|com.fasterxml.jackson.core.JsonGenerator
FASTJSON相关包com.alibaba.fastjson.JSON|com.alibaba.fastjson.support.config.FastJsonConfig
在构造方法中会构造一个类型为MessageConverter的list集合,结合中会添加JSON转换的处理方法,List集合是顺序的,JACKSON优先于FASTJSON加入到集合中;
public class RocketMQMessageConverter { private static final boolean JACKSON_PRESENT; private static final boolean FASTJSON_PRESENT; static { ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader(); JACKSON_PRESENT = ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) && ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", classLoader); FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader) && ClassUtils.isPresent("com.alibaba.fastjson.support.config.FastJsonConfig", classLoader); } private final CompositeMessageConverter messageConverter; public RocketMQMessageConverter() { List<MessageConverter> messageConverters = new ArrayList<>(); ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter(); byteArrayMessageConverter.setContentTypeResolver(null); messageConverters.add(byteArrayMessageConverter); messageConverters.add(new StringMessageConverter()); if (JACKSON_PRESENT) { messageConverters.add(new MappingJackson2MessageConverter()); } if (FASTJSON_PRESENT) { try { messageConverters.add( (MessageConverter)ClassUtils.forName( "com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter", ClassUtils.getDefaultClassLoader()).newInstance()); } catch (ClassNotFoundException | IllegalAccessException | InstantiationException ignored) { //ignore this exception } } messageConverter = new CompositeMessageConverter(messageConverters); } public MessageConverter getMessageConverter() { return messageConverter; } }
在监听消费消息的时候,在进入onMessage方法之前会走到DefaultRocketMQListenerContainer的内部类DefaultMessageListenerConcurrently实现了MessageListenerConcurrently接口,最终会调用handleMessage方法来处理消息
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
handleMessage方法中会调用onMessage方法,并且先调用doConvertMessage方法反序列化对象
private void handleMessage( MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException { if (rocketMQListener != null) { rocketMQListener.onMessage(doConvertMessage(messageExt)); } else if (rocketMQReplyListener != null) { Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt)); Message<?> message = MessageBuilder.withPayload(replyContent).build(); org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message)); DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer(); producer.setSendMsgTimeout(replyTimeout); producer.send(replyMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { if (sendResult.getSendStatus() != SendStatus.SEND_OK) { log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus()); } else { log.debug("Consumer replies message success."); } } @Override public void onException(Throwable e) { log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage()); } }); } }
private Object doConvertMessage(MessageExt messageExt) { if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) { return messageExt; } else { String str = new String(messageExt.getBody(), Charset.forName(charset)); if (Objects.equals(messageType, String.class)) { return str; } else { // If msgType not string, use objectMapper change it. try { if (messageType instanceof Class) { //if the messageType has not Generic Parameter return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType); } else { //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint". //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter. return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter); } } catch (Exception e) { log.info("convert failed. str:{}, msgType:{}", str, messageType); throw new RuntimeException("cannot convert message to " + messageType, e); } } } }
doConvertMessage方法里面最终会调用到fromMessage方法,fromMessage方法中会去变了前面初始化bean的时候构建的MessageConverter集合,JACKSON在集合中排在FASTJSON前面,使用JACKSON方式处理完成后就会直接返回rusult然后进入到onMessage方法
@Override
@Nullable
public Object fromMessage(Message<?> message, Class<?> targetClass) {
for (MessageConverter converter : getConverters()) {
Object result = converter.fromMessage(message, targetClass);
if (result != null) {
return result;
}
}
return null;
}
至此解释了为什么@JSONField注解在反序列化的时候不起作用
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。