赞
踩
1.EurekaServer内定时更新集群内其他Server节点
public class PeerEurekaNodes { /** * Eureka-Server 集群节点数组 */ private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList(); /** * Eureka-Server 服务地址数组 */ private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet(); /** * 启动 Eureka-Server 集群节点集合(复制) */ public void start() { ...... // 初始化 集群节点信息 updatePeerEurekaNodes(resolvePeerUrls()); // 初始化 初始化固定周期更新集群节点信息的任务 Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; // 每隔10分钟更新集群节点 taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); ...... } /** * Resolve peer URLs. 获取Server集群的所有serviceUrl,不包括自身 * * @return peer URLs with node's own URL filtered out */ protected List<String> resolvePeerUrls() { // 获得 Eureka-Server 集群服务地址数组 InstanceInfo myInfo = applicationInfoManager.getInfo(); String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); // 获取相同Region下的所有serviceUrl List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); // 移除自己(避免向自己同步) int idx = 0; while (idx < replicaUrls.size()) { if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; } }
EurekaServer在初始化时会根据配置的Server集群url地址,来实例化集群内其他Server节点的交互实例,用于集群间的数据同步。
EurekaServer在获取Server URL时用的还是EurekaClient,也就是环境里eureka.client开头的配置,客户端配置一模一样。
其实EurekaServer集成了EurekaClient,Client的配置都可以用到Server上,只不过很多可以略去,比如是否需要向Server发送心跳registerWithEureka,可以设置为false等。
Client里的serviceUrl的含义是可以向哪些Server节点注册和拉取服务信息;而Server里的serviceUrl的含义是集群里有哪些Server节点,当自身节点有服务操作是需要向哪些节点同步。
Client正常情况下只合Server集群中的一个交互,而Server在有服务操作时会同步至所有其他的节点。
应用的配置信息是可能发生变化的,所以Client和Server才需要定时的刷新集群节点信息,关闭那些不再连接的Server节点,初始化新增的节点。
2.每隔一分钟统计最近一分钟内所有Client的续约次数,也就是接收到的心跳次数,以此来作为是否触发服务信息回收的依据之一
public class MeasuredRate { /** * 间隔, 默认60S */ private final long sampleInterval; public synchronized void start() { if (!isActive) { // 每隔一分钟执行一次定时任务, 更新最新的总的续约次数, 这样就能计算一分钟内续约的次数, 以此来判断续约次数是否低于阈值 timer.schedule(new TimerTask() { @Override public void run() { try { // Zero out the current bucket. 将 currentBucket 赋值给lastBucket, 然后重置 currentBucket lastBucket.set(currentBucket.getAndSet(0)); } catch (Throwable e) { logger.error("Cannot reset the Measured Rate", e); } } }, sampleInterval, sampleInterval); isActive = true; } } }
EurekaServer每隔1分钟执行一次服务信息回收,回收那些超过90S没有发送心跳,也就是续约的服务信息,当然前提是EurekaServer开启租约过期功能,且未触发自我保护临界值。
所谓自我保护,就是指最近一分钟内的续约总数 > 预估的续约总数 * 0.85。 近似的来讲,也就是一分钟内有超过85%的应用信息发送了心跳。如果这个条件未满足,那么不会执行服务回收操作。
比如当Server节点的网络不稳定,丢失了部分心跳信息,如果超过了15,那么就不会触发自我保护,停止服务信息的回收,而这也是我们希望服务发现组件应该具备的功能,强调可用性。
从这点上看其和Zookeeper的服务发现机制有很大不同。
3.EurekaServer每隔一分钟执行一次服务信息的回收
/** * 租约过期任务 */ /* visible for testing */ class EvictionTask extends TimerTask { /** * 上一次执行清理任务的时间 */ private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L); @Override public void run() { try { // 获取 补偿时间毫秒数, 计算这次执行距离上次执行的时间差,与60S的距离 long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); // 清理过期租约逻辑 evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } /** * compute a compensation time defined as the actual time this task was executed since the prev iteration, * vs the configured amount of time for execution. This is useful for cases where changes in time (due to * clock skew or gc for example) causes the actual eviction task to execute later than the desired time * according to the configured cycle. */ long getCompensationTimeMs() { long currNanos = getCurrentTimeNano(); long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0L) { return 0L; } // 此次执行与上次执行的时间差 long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); // 查看时间间隔是否比60S大 long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs(); // 如果未超过60S, 返回; 否则返回超过的时间差 return compensationTime <= 0L ? 0L : compensationTime; } long getCurrentTimeNano() { // for testing return System.nanoTime(); } }
EurekaServer每隔60S执行一次服务信息的回收任务,移除那些超过90S未更新租约信息的服务。
当然能够回收的前提是开启了租约回收功能,而且未触发自我保护。所谓的自我保护机制,就是最近一分钟内的实际续约次数比例超过期望总数的85%,如果未超过,那么认为是Server出现了问题,不进行服务回收。
4.定时更新续约次数的期望值和自我保护的临界值
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { /** * Schedule the task that updates <em>renewal threshold</em> periodically. * The renewal threshold would be used to determine if the renewals drop * dramatically because of network partition and to protect expiring too * many instances at a time. */ private void scheduleRenewalThresholdUpdateTask() { // 15分钟后更新续约阈值,之后每隔15分分钟执行一次 timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); } /** * 更新续约阈值,也就是每分钟期望续约的次数,以及触发自我保护的最低续约次数 * Updates the <em>renewal threshold</em> based on the current number of * renewals. The threshold is a percentage as specified in * {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals * received per minute {@link #getNumOfRenewsInLastMin()}. */ private void updateRenewalThreshold() { try { // 计算 应用实例数 Applications apps = eurekaClient.getApplications(); int count = 0; for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } // 计算 expectedNumberOfRenewsPerMin 、 numberOfRenewsPerMinThreshold 参数 synchronized (lock) { // Update threshold only if the threshold is greater than the // current expected threshold of if the self preservation is disabled. // 不会一次性的把续约次数将至85%以下,也就是只有在存活应用信息数量超过总数的85%时才能更新,这样就不会修改续约的自我保护的临界值 if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int)((count * 2) * serverConfig.getRenewalPercentThreshold()); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwable e) { logger.error("Cannot update renewal threshold", e); } } }
服务自身取消,会相应的降低续约期望总数和自我保护临界值,每取消一个,数值均减2。但因为续约时间超时而被动移除的服务信息,不会相应的减少期望总值和临界值。
如果不定时的更新期望总值和临界值,那么当服务逐渐的因心跳超时而被移除时,很容易就触发保护临界值,之后就不能再移除那些心跳超时的服务信息。
但是在更新总值和临界值时,如果当前Server处于自我保护状态,那么也不能强制的改变临界值,这会强制的退出自我保护状态。所以更新总值和临界值的前提是当前Server不处于自我保护状态,也就是上一分钟的续约总数的比例超过85%。
5.服务信息增量缓存更新任务
public abstract class AbstractInstanceRegistry implements InstanceRegistry { /** * 最近租约变更记录队列 */ private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>(); protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) { ...... // 30S后每隔30S执行一次, 移除3分钟前发生的续约记录 this.deltaRetentionTimer.schedule(getDeltaRetentionTask(), serverConfig.getDeltaRetentionTimerIntervalInMs(), // 30S serverConfig.getDeltaRetentionTimerIntervalInMs()); // 30S } private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); while (it.hasNext()) { RecentlyChangedItem item = it.next(); // 如果某个续约任务是3分钟前发生的,那么移除它 if (item.getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; } }
EurekaClient在初始化时进行一次全量拉取,之后每隔30S执行一次增量拉取,也就是会返回recentlyChangedQueue里的记录,EurekaClient根据记录的操作类型和服务信息,相应的更新自身持有的可用服务信息。
recentlyChangedQueue 是一个有序队列,当Client向Server执行操作时,比如注册,状态变更,取消等(续约不会记录),那么会记录操作的时间,类型和相应的服务信息。
通过增量信息来保持同步,能够极大的减少Server和Client之间的数据的传输,降低IO消耗。
6.每隔30S执行一次,更新只读响应缓存
public class ResponseCacheImpl implements ResponseCache { ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { ...... // 仅使用只读缓存, 因此每隔30S执行更新缓存任务 if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } } /** * 缓存更新任务, 每隔30S执行一次 * * @return */ private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { // 循环 readOnlyCacheMap 的缓存键 if (logger.isDebugEnabled()) { Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()}; logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { // 不一致时,进行替换 readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } } } }; } }
EurekaServer会缓存数据信息,根据Key的不同值缓存相应的结果,当Client获取信息时,优先用只读缓存的数据返回,如果只读缓存不存在,那么从读写缓存处获取,然后存入只读缓存,最后返回结果。
读写缓存借助guava的CacheBuilder来实现缓存淘汰,在写入180S后失效,这样当只读缓存定期更新时,如果发现读写缓存的值和只读缓存的不一致时,进行替换。
当Client进行相应操作,比如注册,状态变更,取消等操作时,会时对应的缓存立即失效,保证Client获取到的是有效的服务信息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。