赞
踩
业务场景
需要动态获取Tag标签,进行消息的消费。
于是乎,查看@RocketMQMessageListener注解发现,似乎支持占位符表达式的写法
那我们就采用跟他一样的写法如
发送消息
此时就会发现,好像并没有收到消息。
解决步骤
- 找到@RocketMQMessageListener 的配置类
经过dubug发现配置类org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration
如下代码初始化我们的监听器设置监听器
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; }
container.setRocketMQMessageListener(annotation);简单赋值
然后我们又发现该类实现了InitializingBean接口
实现afterPropertiesSet方法进行属性填充
真正解析赋值的是initRocketMQPushConsumer方法
其中我们发现会调用如下代码,解析注解中的占位符表达式,发现并没有我们的加载我们的selectorExpression属性,把我们的占位符当成字符串使用了。
解决方案
1. 修改ListenerContainerConfiguration的代码代码
beanBuilder.addPropertyValue("selectorExpress", this.environment.resolvePlaceholders(annotation.selectorExpression()));
2. 老老实实使用原生版
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。