赞
踩
在PeerAwareInstanceRegistryImpl这个注册实现类中会看到replicateToPeers这个方法:
@Override public boolean cancel(final String appName, final String id, final boolean isReplication) { if (super.cancel(appName, id, isReplication)) { replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); ... return true; } return false; } @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
了解eureka的朋友可能都知道replicateToPeers这个方法是用于eureka server之间同步eureka client信息的方法。其中的代码是这样的:
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
其中有两个关键代码:peerEurekaNodes.getPeerEurekaNodes() 和 peerEurekaNodes.isThisMyUrl(node.getServiceUrl()))
peerEurekaNodes.isThisMyUrl() 这个方法是判断node是否自己(当前节点),如果是自己,就跳过,否则发送复制信息给这个eureka server node。
peerEurekaNodes.getPeerEurekaNodes() 是eureka server node的列表,eureka server会维护集群中跟自己有直接关系的eureka server信息,其实就是将eureka.client.serviceUrl.{zone}这个url列表中生成,默认是eureka.client.serviceUrl.defaultZone,接下来看下维护这个peerEurekaNodes的逻辑。
peerEurekaNodes管理
其实逻辑很简单,代码的执行序列是这样的:
这里将PeerEurekaNodes的部分代码贴出来,我认为设计得挺巧妙的
public void start() { ... try { updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } ... } } protected List<String> resolvePeerUrls() { InstanceInfo myInfo = applicationInfoManager.getInfo(); String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); List<String> replicaUrls = EndpointUtils .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); int idx = 0; while (idx < replicaUrls.size()) { if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; } protected void updatePeerEurekaNodes(List<String> newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } // Remove peers no long available List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // Add new peers if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); }
1、在创建执行updatePeerEurekaNodes的定时任务之前,先执行一遍updatePeerEurekaNodes()方法,更新peer eureka nodes列表,之前要先执行这个方法,是因为定时任务执行的周期默认是10分钟,如果等定时调度来执行的话,太慢了。
2、resolvePeerUrls()获取peer node url时,调用了一个内部判断方法isThisMyUrl(),判断peer url是否是当前节点的url。如果是,就剔除掉。
3、updatePeerEurekaNodes()方法中,通过以前的peerurls减去当前的peerurls判断是否要更新peerEurekaNodes列表,以减少PeerEurekaNode的创建工作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。