当前位置:   article > 正文

RabbitMQ如何创建缓存_the applicationcontext is closed and the connectio

the applicationcontext is closed and the connectionfactory can no longer cre
  1. CachingConnectionFactory(缓存工厂类)

    • 缓存模式:

      	public enum CacheMode {	
      		CHANNEL,CONNECTION;
      	}
      spring:
        rabbitmq:
          cache:
            connection:
            // 缓存模式: connection channel
              mode: connection
              size: 10
            channel:
              checkout-timeout: 10000
              size: 100
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
  • 创建连接:

    @Override
    	public final Connection createConnection() throws AmqpException {
    		if (this.stopped) {
    			throw new AmqpApplicationContextClosedException(
    					"The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
    		}
    		//上锁,防止线程不安全
    		synchronized (this.connectionMonitor) {
    			if (this.cacheMode == CacheMode.CHANNEL) {
    				if (this.connection.target == null) {
    					this.connection.target = super.createBareConnection();
    					// invoke the listener *after* this.connection is assigned
    					if (!this.checkoutPermits.containsKey(this.connection)) {
    						this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
    					}
    					this.connection.closeNotified.set(false);
    					getConnectionListener().onCreate(this.connection);
    				}
    				//返回ChannelCachingConnectionProxy
    				return this.connection;
    			}
    			else if (this.cacheMode == CacheMode.CONNECTION) {
    				return connectionFromCache();
    			}
    		}
    		return null; // NOSONAR - never reach here - exceptions
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 创建channel:

      // createConnection()方法中获取到的connection	
      private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
          // 信号量
      		Semaphore permits = null;
      		if (this.channelCheckoutTimeout > 0) {
      			permits = obtainPermits(connection);
      		}
          // 获取通道集合
      		LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional);
      		ChannelProxy channel = null;
      		if (connection.isOpen()) {
                  //查询可以用的channel
      			channel = findOpenChannel(channelList, channel);
      			if (channel != null && logger.isTraceEnabled()) {
      				logger.trace("Found cached Rabbit Channel: " + channel.toString());
      			}
      		}
      		if (channel == null) {
      			try {
      				channel = getCachedChannelProxy(connection, channelList, transactional);
      			}
      			catch (RuntimeException e) {
      				if (permits != null) {
      					permits.release();
      					if (logger.isDebugEnabled()) {
      						logger.debug("Could not get channel; released permit for " + connection + ", remaining:"
      								+ permits.availablePermits());
      					}
      				}
      				throw e;
      			}
      		}
      		return channel;
      	}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      //获取许可证
      	private Semaphore obtainPermits(ChannelCachingConnectionProxy connection) {
      		Semaphore permits;
      		permits = this.checkoutPermits.get(connection);
      		if (permits != null) {
      			try {
      				if (!permits.tryAcquire(this.channelCheckoutTimeout, TimeUnit.MILLISECONDS)) {
      					throw new AmqpTimeoutException("No available channels");
      				}
      				if (logger.isDebugEnabled()) {
      					logger.debug(
      							"Acquired permit for " + connection + ", remaining:" + permits.availablePermits());
      				}
      			}
      			catch (InterruptedException e) {
      				Thread.currentThread().interrupt();
      				throw new AmqpTimeoutException("Interrupted while acquiring a channel", e);
      			}
      		}
      		else {
      			throw new IllegalStateException("No permits map entry for " + connection);
      		}
      		return permits;
      	}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/559801
推荐阅读
相关标签
  

闽ICP备14008679号