赞
踩
此篇文章会涉及到SpringBoot RocketMQ源码,后面会单独出一篇详细介绍源码
本次主要使用RocketMQ消息监听器后置注册方案,k8s层面也可以解决,但是不在本次讨论范围之内
RocketMQ消息监听器后置注册:就是在SpringBoot启动完成可以正常接收外部接口请求时再动态注册消息加监听器
本篇文章初衷如下
主要解决
前置文章参考
import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; /** * RocketMQ注册拦截,自行注册,加快容器启动速度 * * @author tianxincoord@163.com * @since 2022/8/15 */ @Component @Aspect @Order(1) @Slf4j public class RocketMqSkipRegisterAop { /** * 通过切入afterSingletonsInstantiated拦截注册 */ @Around("execution(* org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.afterSingletonsInstantiated())") public Object init(ProceedingJoinPoint joinPoint) { log.info("RocketMQ开启代理,默认注册已跳过"); /// 不调用原生执行方法,直接返回null,跳过方法调用 // Object proceed = joinPoint.proceed(); // return proceed return null; } }
由于ListenerContainerConfiguration注册类方法被切面拦截,所以可以复制一个这个类出来保留里面的方法,然后调整一下参数等信息,根据自己需要,也可以直接使用实例中的代码
示例类代码和ListenerContainerConfiguration基本一致,去掉继承关系和接口实现,当做普通类
import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; import org.springframework.aop.scope.ScopedProxyUtils; import org.springframework.beans.factory.support.BeanDefinitionValidationException; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.env.StandardEnvironment; import org.springframework.util.StringUtils; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** * RocketMQ队列监听器 * * @author tianxincoord@163.com * @since 2022/09/23 */ public class RocketMqListenerRegistry { private final static Logger log = LoggerFactory.getLogger( org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.class); private final ConfigurableApplicationContext applicationContext; private final AtomicLong counter = new AtomicLong(0); private final StandardEnvironment environment; private final RocketMQProperties rocketMqProperties; private final RocketMQMessageConverter rocketMqMessageConverter; public RocketMqListenerRegistry(ConfigurableApplicationContext applicationContext, RocketMQMessageConverter rocketMqMessageConverter, StandardEnvironment environment, RocketMQProperties rocketMqProperties) { this.applicationContext = applicationContext; this.rocketMqMessageConverter = rocketMqMessageConverter; this.environment = environment; this.rocketMqProperties = rocketMqProperties; } public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class) .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); beans.forEach(this::registerContainer); } public void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom( bean.getClass())) { throw new IllegalStateException( clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName()); } if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom( bean.getClass())) { throw new IllegalStateException( clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName()); } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean) rocketMqProperties.getConsumer().getListeners() .getOrDefault(consumerGroup, Collections.EMPTY_MAP).getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMqProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (!StringUtils.isEmpty(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); if (RocketMQListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQListener((RocketMQListener) bean); } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQReplyListener((RocketMQReplyListener) bean); } container.setMessageConverter(rocketMqMessageConverter.getMessageConverter()); container.setName(name); return container; } private void validate(RocketMQMessageListener annotation) { if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) { throw new BeanDefinitionValidationException( "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); } } }
在启动类中单独在SpringApplication.run(xxx.class, args);之后开始注册。SpringApplication.run执行完成才表示整个SpringBoot服务真正意义上的启动完成
import com.codecoord.rocketmq.config.RocketMqListenerRegistry; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.StandardEnvironment; /** * RocketMQ启动类 */ @Slf4j @SpringBootApplication public class RocketMqApplication { public static void main(String[] args) { /* * 指定使用的日志框架,否则将会报错 * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap). * RocketMQLog:WARN Please initialize the logger system properly. */ System.setProperty("rocketmq.client.logUseSlf4j", "true"); ConfigurableApplicationContext run = SpringApplication.run(RocketMqApplication.class, args); log.info("SpringBoot RocketMQ服务启动完成"); ConfigurableEnvironment environment = run.getEnvironment(); registerRocketMq(environment, run); } private static void registerRocketMq(ConfigurableEnvironment environment, ConfigurableApplicationContext applicationContext) { try { log.info("开始注册RocketMQ"); RocketMQMessageConverter messageConverter = applicationContext.getBean(RocketMQMessageConverter.class); RocketMQProperties mqProperties = applicationContext.getBean(RocketMQProperties.class); RocketMqListenerRegistry registry = new RocketMqListenerRegistry(applicationContext, messageConverter, (StandardEnvironment) environment, mqProperties); // 手工调用此方法完成原来应该完成的注册调用 registry.afterSingletonsInstantiated(); log.info("RocketMQ注册成功"); } catch (Exception e) { // 此处可以整合消息通知,当注册失败时发送企业微信、钉钉、邮件等告警 log.error("RocketMQ注册异常", e); e.printStackTrace(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。