当前位置:   article > 正文

SpringBoot如何与RabbitMQ建立连接_springboot连接rabbitmq

springboot连接rabbitmq

一. 引言

本文主要描述SpringBoot如何与RabbitMQ建立连接,建立多少个连接,以及如何接收消息

二. 相关概念

  • ConnectionFactory接口,是客户端与RabbitMQ服务器的tcp socket连接工厂,负责根据服务器地址创建Connection
  • Connection是客户端与RabbitMQ服务器的socket连接,它封装了socket协议相关的逻辑(比如接收和发送消息)
  • Channel是RabbitMQ客户端与服务器交互的最重要的一个接口,大部分的业务操作是在Channel接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息、接收消息等。Channel之间是完全隔离的

Channel与Connection的区别?

  • 职责不同。Channel负责封装业务,Connection负责封装Socket连接
  • 多线程和性能因素。建立和销毁一个Connection的开销比较大,而Channel是一个建立在Connection内部的逻辑连接,建立和销毁的成本比较低。

注意:在Connection底层,当有多个Channel发送消息时,也是排队发送的。

三. 连接代码实现

  • 在https://blog.csdn.net/qq_43216019/article/details/128824328?spm=1001.2014.3001.5501中介绍了如何实现消息接收功能,其中RabbitListenerAnnotationBeanPostProcessor中类中,没扫描到一个消息接收方法,就定义为如下实例:

    RabbitListenerEndpoint(接口)
    SimpleRabbitListenerEndpoint(实现:表示RabbitListenerConfigurer接口所实现类的endPoint
    MethodRabbitListenerEndpoint(实现:表示@RabbitListener+@RabbitHandler注解方法对应的endpoint))
    
    • 1
    • 2
    • 3
  • 同时在每个RabbitListenerAnnotationBeanPostProcessor类中,对于每一个endpoint,最终都会回调RabbitListenerEndpointRegistery的doStart方法,伪代码:

    public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory, boolean startImmediately) {
            synchronized(this.listenerContainers) {
          		//创建SimpleMessageListenerContainer实例
                MessageListenerContainer container = this.createListenerContainer(endpoint, factory);
                this.listenerContainers.put(id, container);
                if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
                    Object containerGroup;
                    if (this.applicationContext.containsBean(endpoint.getGroup())) {
                        containerGroup = (List)this.applicationContext.getBean(endpoint.getGroup(), List.class);
                    } else {
                        containerGroup = new ArrayList();
                        this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
                    }
    
                    ((List)containerGroup).add(container);
                }
    
                if (this.contextRefreshed) {
                    container.lazyLoad();
                }
    						//启动调用SimpleMessageListenerContainer的start方法
                if (startImmediately) {
                    this.startIfNecessary(container);
                }
    
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
  • 在SimpleMessagelistenerContainer类中,start方法最终会调用的dostart方法,伪代码:

    protected void doStart() {
            Assert.state(!this.consumerBatchEnabled || this.getMessageListener() instanceof BatchMessageListener || this.getMessageListener() instanceof ChannelAwareBatchMessageListener, "When setting 'consumerBatchEnabled' to true, the listener must support batching");
            this.checkListenerContainerAware();
            super.doStart();
            synchronized(this.consumersMonitor) {
                if (this.consumers != null) {
                    throw new IllegalStateException("A stopped container should not have consumers");
                } else {
                //根据@RabbitListener注解中定义的concurrency数量初始化consumer的个数
                对于每个consumer生成一个BlockingQueueConsumer实例
                    int newConsumers = this.initializeConsumers();
                    if (this.consumers == null) {
                        this.logger.info("Consumers were initialized and then cleared (presumably the container was stopped concurrently)");
                    } else if (newConsumers <= 0) {
                        if (this.logger.isInfoEnabled()) {
                            this.logger.info("Consumers are already running");
                        }
    
                    } else {
                        Set<SimpleMessageListenerContainer.AsyncMessageProcessingConsumer> processors = new HashSet();
                        Iterator var4 = this.consumers.iterator();
    
                        while(var4.hasNext()) {
                        //对于每个BlockingQueueConsumer,生成AsyncMessageProcessingConsumer,并启动一个线程启动这个consumer,实际上调用了BlockingQueueConsumer实例的start方法
                            BlockingQueueConsumer consumer = (BlockingQueueConsumer)var4.next();
                            SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
                            processors.add(processor);
                            this.getTaskExecutor().execute(processor);
                            if (this.getApplicationEventPublisher() != null) {
                                this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                            }
                        }
    
                        this.waitForConsumersToStart(processors);
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
  • BlockingQueueConsumer的start方法启动consumer

    public void start() throws AmqpException {
            
            try {
                this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory, this.transactional);
                this.channel = this.resourceHolder.getChannel();
                ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel);
            } catch (AmqpAuthenticationException var2) {
                throw new FatalListenerStartupException("Authentication failure", var2);
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

​ 注意:每个监听线程都会调用以上方法创建connection和channel,那么connection和channel的数量岂不是一致?

​ CachingConnectionFactory bean的createConnection负责创建connection:

public final Connection createConnection() throws AmqpException {
    ......
        Object var1 = this.connectionMonitor;
        synchronized(this.connectionMonitor) {
            if (this.cacheMode == CachingConnectionFactory.CacheMode.CHANNEL) {
                //当第一个线程创建connection时,this.connection.target为空,所以调用父类方法创建真实的connection。
                //当第二个以上的线程创建connection时,直接返回之前创建好的connection。
                if (this.connection.target == null) {
                    this.connection.target = super.createBareConnection();
                    ......
                }
                return this.connection;
            } 
            ......
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

所以系统即使有多个线程,也仅仅创建了一个connection,而每个线程创建了自己独立的channel。

后续继续学习记录AMQP协议相关知识

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/617118
推荐阅读
相关标签
  

闽ICP备14008679号