赞
踩
目录
2、RabbitListenerContainerFactory
3、SimpleMessageListenerContainer
4、RabbitListenerAnnotationBeanPostProcessor
Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。这里也用了终端的概念,例如:被@RabbitListener注解修饰方法也有终端的特点 可以接受外部信息并响应。
- public interface RabbitListenerEndpoint {
-
- /**
- * the id of this endpoint
- */
- String getId();
-
- /**
- * the group of this endpoint or null if not in a group.
- */
- String getGroup();
-
- /**
- *the concurrency of this endpoint.
- */
- String getConcurrency();
-
- /**
- * Override of the default autoStartup property.
- */
- Boolean getAutoStartup();
-
- /**
- * Setup the specified message listener container with the model
- * defined by this endpoint.
- */
- void setupListenerContainer(MessageListenerContainer listenerContainer);
-
- }
这里的终端有很多种,刚才说到 『被@RabbitListener注解修饰方法也是终端』 就是 MethodRabbitListenerEnpoint 方法终端。
接口方法中 void setupListenerContainer(MessageListenerContainer listenerContainer) 方法,中的MessageListenerContainer就是用来接收并处理消息的。
- 在抽象类中AbstractRabbitListenerEndpoint对setupListenerContainer方法进了实现:
- @Override
- public void setupListenerContainer(MessageListenerContainer listenerContainer) {
- ……
- setupMessageListener(listenerContainer);
- }
- private void setupMessageListener(MessageListenerContainer container) {
- //MessageListenerContainer为容器,容器是要装东西的,这里显然是装MessageListener。
- //createMessageListener(container)为抽象方法,用于子类覆盖。
- MessageListener messageListener = createMessageListener(container);
- container.setupMessageListener(messageListener);
-
- }
-
-
子类MethodRabbitListenerEndpoint中覆盖了createMessageListener。
-
- @Override
- protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
- //创建 messageListener
- MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
- //
- messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
-
- return messageListener;
-
- }
-
- protected MessagingMessageListenerAdapter createMessageListenerInstance() {
- return new MessagingMessageListenerAdapter(this.bean, this.method, this.returnExceptions, this.errorHandler);
- }
-
- protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) {
- InvocableHandlerMethod invocableHandlerMethod =
- this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
- return new HandlerAdapter(invocableHandlerMethod);
- }
-
-
RabbitListenerContainerFactory为:rabbit 监听器容器工厂。既然为 监听器容器工厂 那一定是生产监听器容器。
监听器容器工厂:
- public interface RabbitListenerContainerFactory<C extends MessageListenerContainer> {
-
- /**
- * Create a {@link MessageListenerContainer} for the given {@link RabbitListenerEndpoint}.
- */
- C createListenerContainer(RabbitListenerEndpoint endpoint);
-
- }
监听器容器:
- public interface MessageListenerContainer extends SmartLifecycle {
-
- /**
- * Setup the message listener to use. Throws an {@link IllegalArgumentException}
- * if that message listener type is not supported.
- */
- void setupMessageListener(Object messageListener);
-
- /**
- * @return the {@link MessageConverter} that can be used to
- * convert {@link org.springframework.amqp.core.Message}, if any.
- */
- MessageConverter getMessageConverter();
-
- }
监听器容器工厂:
在其抽象子类AbstractRabbitListenerContainerFactory中对 createListenerContainer 方法已做了实现。
我常用的是其子类SimpleRabbitListenerContainerFactory
- public class SimpleRabbitListenerContainerFactory
- extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> {}
SimpleRabbitListenerContainerFactory工厂生产的是SimpleMessageListenerContainer
- public class AbstractRabbitListenerContainerFactory<C extends AbstractMessageListenerContainer> implements RabbitListenerContainerFactory<C>{
-
- //在使用SimpleRabbitListenerContainerFactory时,下面的 C 就是 SimpleMessageListenerContainer
- @Override
- public C createListenerContainer(RabbitListenerEndpoint endpoint) {
- //创建SimpleMessageListenerContainer
- C instance = createContainerInstance();
-
- if (this.connectionFactory != null) {
- instance.setConnectionFactory(this.connectionFactory);
- }
- if (this.errorHandler != null) {
- instance.setErrorHandler(this.errorHandler);
- }
- if (this.messageConverter != null) {
- instance.setMessageConverter(this.messageConverter);
- }
-
- ……
-
- //setupListenerContainer方法参数为 MessageListenerContainer类型,
- //SimpleMessageListenerContainer是MessageListenerContainer的子类。
- endpoint.setupListenerContainer(instance);
- if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
- AbstractAdaptableMessageListener messageListener = (AbstractAdaptableMessageListener) instance
- .getMessageListener();
- if (this.beforeSendReplyPostProcessors != null) {
- messageListener.setBeforeSendReplyPostProcessors(this.beforeSendReplyPostProcessors);
- }
- if (this.retryTemplate != null) {
- messageListener.setRetryTemplate(this.retryTemplate);
- if (this.recoveryCallback != null) {
- messageListener.setRecoveryCallback(this.recoveryCallback);
- }
- }
- }
- initializeContainer(instance, endpoint);
-
- return instance;
- }
- }
在被@RabbitListener标注的方法。该方法会被封装成HanderMethod。
- HandlerMethod{
- private final Object bean;
- private final Method method;
-
- public HandlerMethod(Object bean, Method method) {
- this.bean = bean;
- this.method = method;
- }
- }
我们常用其子类InvocableHandlerMethod。该类提供一个Object invoke(Message<?> message, Object... providedArgs) 方法,
可以通过该invoke方法调用handlerMethod 中的method:method.invoke(bean,args)
HandlerAdapter称为hander适配器,该适配器中有InvocableHandlerMethod、DelegatingInvocableHandler这两个hander方法。注:InvocableHandlerMethod上文刚说过。
- public class HandlerAdapter {
- private final InvocableHandlerMethod invokerHandlerMethod;
- private final DelegatingInvocableHandler delegatingHandler;
-
- public Object invoke(Message<?> message, Object... providedArgs) throws Exception
- {
- if (this.invokerHandlerMethod != null) {
- //InvocableHandlerMethod不为null,就调用invokerHandlerMethod.invoke方法。
- return this.invokerHandlerMethod.invoke(message, providedArgs);
-
- }else if (this.delegatingHandler.hasDefaultHandler()) {
- //……
- }else {
- //……
- }
- }
- }
这里可以简单的理解调用HandlerAdapter.invoke方法可以间接调用@RabbitListener修饰的方法。
消费端接收到mq的消息会调用ChannelAwareMessageListener 接口中的onMessage方法。
- public interface ChannelAwareMessageListener {
-
- /**
- * 回调函数处理接受的消息
- */
- void onMessage(Message message, Channel channel) throws Exception;
-
- }
下面要说的是其子类:MessagingMessageListenerAdapter
- public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageListener {
-
- private HandlerAdapter handlerMethod;
-
-
- @Override
- public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception {
-
- //ChannelAwareMessageListener.onMessage()方法中的message是org.springframework.amqp.core.Message类型。
- //这里转换成org.springframework.messaging.Message类型。
- Message<?> message = toMessagingMessage(amqpMessage);
- try {
- Object result = invokeHandler(amqpMessage, channel, message);
- if (result != null) {
- handleResult(result, amqpMessage, channel, message);
- }
- }
- catch () {}
- }
-
- private Object invokeHandler(org.springframework.amqp.core.Message amqpMessage, Channel channel,
- Message<?> message) {
- try {
- return this.handlerMethod.invoke(message, amqpMessage, channel);
- }
- catch () {}
-
- }
-
- }
onMessage->invokehandler->this.handlerMethod.invoke 。this.handlerMethod为 HandlerAdapter类型。上面刚说完:调用HandlerAdapter.invoke方法可以间接调用@RabbitListener修饰的方法。
现在我们就已经梳洗清楚从onMessage到调用被@RabbitListener修饰的方法的整个流程
SimpleMessageListenerContainer 中有一个内部类AsyncMessageProcessingConsumer :
- //当终端接受到信息时,一个线程会执行该run方法。
- private final class AsyncMessageProcessingConsumer implements Runnable {
- @Override
- public void run() {
- //这个 while 可以无限的循环,
- while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
- boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
- }
- }
- }
run方法中调用了receiveAndExecute方法,方法名直译:接受并执行。
- SimpleMessageListenerContainer {
-
-
- //接受并执行
- private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
-
- //do接受并执行
- return doReceiveAndExecute(consumer);
- }
- //do接受并执行
- private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable {
-
- Channel channel = consumer.getChannel();
- for (int i = 0; i < this.txSize; i++) {//txSize为一次事务接受的消息个数
- //读取消息,这里阻塞的,但是有一个超时时间。
- Message message = consumer.nextMessage(this.receiveTimeout);
- if (message == null) {//阻塞超时
- break;
- }
- try {
- executeListener(channel, message);//消息接收已完成,现在开始处理消息。
- }
- catch (Exception e) {}
- }
- return consumer.commitIfNecessary(isChannelLocallyTransacted());
- }
-
-
- //处理消息开始。该方法在其父类中
- protected void executeListener(Channel channel, Message messageIn) throws Exception {
- try {
- Message message = messageIn;
- if (……) {
- //批处理信息,这个不研究
- }else {
- invokeListener(channel, message);
- }
- }catch (Exception ex) {}
- }
- //在其父类中
- protected void invokeListener(Channel channel, Message message) throws Exception {
- //这里this.proxy.invokeListener最终会调用actualInvokeListener方法。
- this.proxy.invokeListener(channel, message);
- }
- //在其父类中
- protected void actualInvokeListener(Channel channel, Message message) throws Exception {
- Object listener = getMessageListener();
- if (listener instanceof ChannelAwareMessageListener) {
- doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
- }
- else if (listener instanceof MessageListener) {
- //……
- doInvokeListener((MessageListener) listener, message)
- }else{
- //……
- }
-
- }
-
- protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
- throws Exception {
- Channel channelToUse = channel;
- try {
- listener.onMessage(message, channelToUse);
- }
- catch (Exception e) {
- throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
- }
-
- }
- }
在doInvokeListener方中最终调用了onMessage方法。
到这我知道了SimpleMessageListenerContainer内部类AsyncMessageProcessingConsumer被放入线程池中运行,是谁把AsyncMessageProcessingConsumer放入线程池中的呢?
RabbitListenerAnnotationBeanPostProcessor 听名得意:@RabbitListener注解的后置处理器。
- public class RabbitListenerAnnotationBeanPostProcessor implements BeanPostProcessor,SmartInitializingSingleton {
-
- //对接口BeanPostProcessor的实现。
- @Override
- public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
- Class<?> targetClass = AopUtils.getTargetClass(bean);
- final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
- for (ListenerMethod lm : metadata.listenerMethods) {//获取被@RabbitListener修饰的方法。
- for (RabbitListener rabbitListener : lm.annotations) {//获取方法上的@RabbitListener注解。
- //处理Amqp监听:
- //有这三个参数就以实现监听队列并调用方法。参数:rabbitListener为我们的方法上@RabbitListener注解。参数method和bean可以能过反射的方式调用我们的方法。
- processAmqpListener(rabbitListener, lm.method, bean, beanName);
- }
- }
- return bean;
- }
-
- //处理Amqp监听
- protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
- Method methodToUse = checkProxy(method, bean);
-
- //Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。
- //这里也用了终端的概念,被@RabbitListener注解修饰方法也有终端的特点可以接受外部信息并响应。
- //MethodRabbitListenerEndpoint名子也很形象,叫方法监听终端。
- MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
- endpoint.setMethod(methodToUse);//终端接收到信息时,会调用 methodToUse
- processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
- }
- //注册员
- private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
-
- protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
- Object adminTarget, String beanName) {
- //下面是为终端注入一些属性
- endpoint.setBean(bean);
- endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
- endpoint.setId(getEndpointId(rabbitListener));
- endpoint.setQueueNames(resolveQueues(rabbitListener));
- endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
- endpoint.setBeanFactory(this.beanFactory);
- endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
-
- ……
-
- RabbitListenerContainerFactory<?> factory = null;
- String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
- if (StringUtils.hasText(containerFactoryBeanName)) {
- Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
- try {
- factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
- }
- catch (NoSuchBeanDefinitionException ex) {
- throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
- adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
- containerFactoryBeanName + "' was found in the application context", ex);
- }
- }
- //终端准备完成后,还要将终端进行注册。
- //注册员将终端注册到注册处
- this.registrar.registerEndpoint(endpoint, factory);
- }
-
-
-
-
-
- //对SmartInitializingSingleton的实现,该方法在bean的后置处理BeanPostProcessor之后调用。
- @Override
- public void afterSingletonsInstantiated() {
- this.registrar.setBeanFactory(this.beanFactory);
-
- if (this.containerFactoryBeanName != null) {
- //this.containerFactoryBeanName为“rabbitListenerContainerFactory”
- this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
- }
-
- // Actually register all listeners
- this.registrar.afterPropertiesSet();
-
- }
-
- }
-
RabbitListenerEndpointRegistrar :rabbit 监听器注册员
- //终端注册员
- public class RabbitListenerEndpointRegistrar implements InitializingBean{
-
- private RabbitListenerEndpointRegistry endpointRegistry;//终端注册处
- //终端描述器集合
- private final List<AmqpListenerEndpointDescriptor> endpointDescriptors =
- new ArrayList<AmqpListenerEndpointDescriptor>();
-
- //注册员将终端注册到注册处
- public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
-
- //将 endpoint和 factory 封装成 终端描述器
- AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
- synchronized (this.endpointDescriptors) {
- if (this.startImmediately) { //马下注册
- //将 终端注册到 终端注册处
- this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
- resolveContainerFactory(descriptor), true);
- }
- else {//afterPropertiesSet()方法中统一注册
- //放入终端描述器集合
- this.endpointDescriptors.add(descriptor);
- }
- }
- }
-
- //InitializingBean接口的实现
- @Override
- public void afterPropertiesSet() {
- //统一注册
- registerAllEndpoints();
- }
- //统一注册
- protected void registerAllEndpoints() {
- synchronized (this.endpointDescriptors) {
- for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
- this.endpointRegistry.registerListenerContainer(
- descriptor.endpoint, resolveContainerFactory(descriptor));
- }
- }
- }
-
-
-
-
-
- }
RabbitListenerEndpointRegistry:注册处
- public class RabbitListenerEndpointRegistry implements SmartLifecycle{
-
- private final Map<String, MessageListenerContainer> listenerContainers =
- new ConcurrentHashMap<String, MessageListenerContainer>();
- //注册终端
- public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
- boolean startImmediately) {
-
- String id = endpoint.getId();
- synchronized (this.listenerContainers) {
- //创建 listenerContainer
- MessageListenerContainer container = createListenerContainer(endpoint, factory);
- this.listenerContainers.put(id, container);
- ……
-
- if (startImmediately) {
- startIfNecessary(container);
- }
- }
- }
-
- protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
- RabbitListenerContainerFactory<?> factory) {
- //调用RabbitListener容器工厂的createListenerContainer方法获取RabbitListener容器
- MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
-
- return listenerContainer;
- }
-
- //start 是对 smartLifecycle 接口的实现,由 spring 容器调用
- //上面的的代码中已经创建了listenerContainer 并放入了listenerContainers集合中,现在要将集合中的 listenerContainer放入线程池中。
- @Override
- public void start() {
- //遍历listenerContainers集合
- for (MessageListenerContainer listenerContainer : getListenerContainers()) {
- startIfNecessary(listenerContainer);
- }
- }
- //调用集合中的listenerContainer的 start 方法。
- private void startIfNecessary(MessageListenerContainer listenerContainer) {
- if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
- listenerContainer.start();
- }
- }
-
- }
分析了RabbitListenerAnnotationBeanPostProcessor:@RabbitListener注解的后置处理器 。从读取被@RabbitListener修饰的方法 => 创建 endpoint =>创建MessageListenerContainer并放入集合中=>遍历集合中的MessageListenerContainer并调用 start方法。
MessageListenerContainer中的 start 方法做了什么?
其子类 SimpleMessageListenerContainer是我们学习的重点
- public interface MessageListenerContainer extends SmartLifecycle {
-
- /**
- * Setup the message listener to use. Throws an {@link IllegalArgumentException}
- * if that message listener type is not supported.
- */
- void setupMessageListener(Object messageListener);
-
- /**
- * @return the {@link MessageConverter} that can be used to
- * convert {@link org.springframework.amqp.core.Message}, if any.
- */
- MessageConverter getMessageConverter();
-
- }
start 方法在其抽象子类 AbstractMessageListenerContainer 中实现了:
- //MessageListenerContainer 同样也继承了 SmartLifecycle接口,SmartLifecycle中的方法是由 Spring 容器调用的,这里我们手动调用了 start 方法,这意味这 start 会被调用两次。
- @Override
- public void start() {
- if (isRunning()) {//第二调用时会直接 return。
- return;
- }
-
- try {
- doStart();
- }
- catch (Exception ex) {
- }
- }
doStart 方法子类会覆盖:
在子类:SimpleMessageListenerContainer中:
- @Override
- protected void doStart() throws Exception {
- ……
- super.doStart();
- synchronized (this.consumersMonitor) {
-
- int newConsumers = initializeConsumers();
-
- Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
- for (BlockingQueueConsumer consumer : this.consumers) {
- //AsyncMessageProcessingConsumer是SimpleMessageListenerContainer的内部类
- AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
- processors.add(processor);
- //放入线程池
- getTaskExecutor().execute(processor);
-
- }
-
- }
- }
RabbitListenerAnnotationBeanPostProcessor调用SimpleMessageListenerContainer是AsyncMessageProcessingConsumer放入线程池中的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。