赞
踩
CachingConnectionFactory(缓存工厂类)
缓存模式:
public enum CacheMode {
CHANNEL,CONNECTION;
}
spring:
rabbitmq:
cache:
connection:
// 缓存模式: connection channel
mode: connection
size: 10
channel:
checkout-timeout: 10000
size: 100
创建连接:
@Override public final Connection createConnection() throws AmqpException { if (this.stopped) { throw new AmqpApplicationContextClosedException( "The ApplicationContext is closed and the ConnectionFactory can no longer create connections."); } //上锁,防止线程不安全 synchronized (this.connectionMonitor) { if (this.cacheMode == CacheMode.CHANNEL) { if (this.connection.target == null) { this.connection.target = super.createBareConnection(); // invoke the listener *after* this.connection is assigned if (!this.checkoutPermits.containsKey(this.connection)) { this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize)); } this.connection.closeNotified.set(false); getConnectionListener().onCreate(this.connection); } //返回ChannelCachingConnectionProxy return this.connection; } else if (this.cacheMode == CacheMode.CONNECTION) { return connectionFromCache(); } } return null; // NOSONAR - never reach here - exceptions }
创建channel:
// createConnection()方法中获取到的connection private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) { // 信号量 Semaphore permits = null; if (this.channelCheckoutTimeout > 0) { permits = obtainPermits(connection); } // 获取通道集合 LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional); ChannelProxy channel = null; if (connection.isOpen()) { //查询可以用的channel channel = findOpenChannel(channelList, channel); if (channel != null && logger.isTraceEnabled()) { logger.trace("Found cached Rabbit Channel: " + channel.toString()); } } if (channel == null) { try { channel = getCachedChannelProxy(connection, channelList, transactional); } catch (RuntimeException e) { if (permits != null) { permits.release(); if (logger.isDebugEnabled()) { logger.debug("Could not get channel; released permit for " + connection + ", remaining:" + permits.availablePermits()); } } throw e; } } return channel; }
//获取许可证 private Semaphore obtainPermits(ChannelCachingConnectionProxy connection) { Semaphore permits; permits = this.checkoutPermits.get(connection); if (permits != null) { try { if (!permits.tryAcquire(this.channelCheckoutTimeout, TimeUnit.MILLISECONDS)) { throw new AmqpTimeoutException("No available channels"); } if (logger.isDebugEnabled()) { logger.debug( "Acquired permit for " + connection + ", remaining:" + permits.availablePermits()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new AmqpTimeoutException("Interrupted while acquiring a channel", e); } } else { throw new IllegalStateException("No permits map entry for " + connection); } return permits; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。