赞
踩
刚才在org.springframework.cloud.netflix.eureka.server.InstanceRegistry 的每个方法都打了一个断点,而且现在EurekaServer已经处于Debug运行状态,那么我们就随便找一个被 @EnableEurekaClient 的微服务启动试试微服务来试试吧,直接Run。
InstanceRegistry.register(final InstanceInfo info, final boolean isReplication) 方法进断点了。
复制代码
主要是处理接收 Http 的服务请求。
public Response addInstance(InstanceInfo info, String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); } 复制代码
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
复制代码
private void handleRegistration(InstanceInfo info, int leaseDuration,
boolean isReplication) {
log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
+ ", leaseDuration " + leaseDuration + ", isReplication "
+ isReplication);
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
isReplication));
}
复制代码
进入PeerAwareInstanceRegistryImpl 类的 register(final InstanceInfo info, final boolean isReplication) 方法;
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
复制代码
进入super.register(info, leaseDuration, isReplication),如何写入EurekaServer 的注册表的,进入AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication) 方法。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease existingLease = gMap.get(registrant.getId()); if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } Lease lease = new Lease(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } } 复制代码
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的这个方法。
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info , InstanceStatus newStatus , boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } 复制代码
private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } } 复制代码
PeerEurekaNode.register(final InstanceInfo info) 方法,一窥究竟如何同步数据。
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
复制代码
public static TaskDispatcher createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor taskProcessor) { final AcceptorExecutor acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher() { public void process(ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; } 复制代码
static TaskExecutors batchExecutors(final String name,
int workerCount,
final TaskProcessor processor,
final AcceptorExecutor acceptorExecutor) {
final AtomicBoolean isShutdown = new AtomicBoolean();
final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
return new TaskExecutors<>(new WorkerRunnableFactory() {
public WorkerRunnable create(int idx) {
return new BatchWorkerRunnable<>("TaskBatchingWorker-" +name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor);
}
}, workerCount, isShutdown);
}
复制代码
public void run() { try { while (!isShutdown.get()) { List> holders = getWork(); metrics.registerExpiryTimes(holders); List tasks = getTasksOf(holders); ProcessingResult result = processor.process(tasks); switch (result) { case Success: break; case Congestion: case TransientError: taskDispatcher.reprocess(holders, result); break; case PermanentError: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); } metrics.registerTaskResult(result, tasks.size()); } } catch (InterruptedException e) { } catch (Throwable e) { logger.warn("Discovery WorkerThread error", e); } } 复制代码
public ProcessingResult process(List tasks) { ReplicationList list = createReplicationListOf(tasks); try { EurekaHttpResponse response = replicationClient.submitBatchUpdates(list); int statusCode = response.getStatusCode(); if (!isSuccess(statusCode)) { if (statusCode == 503) { logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId); return ProcessingResult.Congestion; } else { logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size()); return ProcessingResult.PermanentError; } } else { handleBatchResponse(tasks, response.getEntity().getResponseList()); } } catch (Throwable e) { if (isNetworkConnectException(e)) { logNetworkErrorSample(null, e); return ProcessingResult.TransientError; } else { logger.error("Not re-trying this exception because it does not seem to be a network exception", e); return ProcessingResult.PermanentError; } } return ProcessingResult.Success; } 复制代码
public EurekaHttpResponse submitBatchUpdates(ReplicationList replicationList) { ClientResponse response = null; try { response = jerseyApacheClient.resource(serviceUrl) .path(PeerEurekaNode.BATCH_URL_PATH) .accept(MediaType.APPLICATION_JSON_TYPE) .type(MediaType.APPLICATION_JSON_TYPE) .post(ClientResponse.class, replicationList); if (!isSuccess(response.getStatus())) { return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build(); } ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse.class); return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build(); } finally { if (response != null) { response.close(); } } } 复制代码
public Response batchReplication(ReplicationList replicationList) { try { ReplicationListResponse batchResponse = new ReplicationListResponse(); for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) { try { batchResponse.addResponse(dispatch(instanceInfo)); } catch (Exception e) { batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null)); logger.error(instanceInfo.getAction() + " request processing failed for batch item " + instanceInfo.getAppName() + '/' + instanceInfo.getId(), e); } } return Response.ok(batchResponse).build(); } catch (Throwable e) { logger.error("Cannot execute batch Request", e); return Response.status(Status.INTERNAL_SERVER_ERROR).build(); } } 复制代码
private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { ApplicationResource applicationResource = createApplicationResource(instanceInfo); InstanceResource resource = createInstanceResource(instanceInfo, applicationResource); String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); String instanceStatus = toString(instanceInfo.getStatus()); Builder singleResponseBuilder = new Builder(); switch (instanceInfo.getAction()) { case Register: singleResponseBuilder = handleRegister(instanceInfo, applicationResource); break; case Heartbeat: singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); break; case Cancel: singleResponseBuilder = handleCancel(resource); break; case StatusUpdate: singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); break; case DeleteStatusOverride: singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); break; } return singleResponseBuilder.build(); } 复制代码
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
复制代码
Run运行discovery-eureka服务,Debug 运行 provider-user 服务,先观察日志先;
2017-10-23 19:43:07.688 INFO 1488 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 2017-10-23 19:43:07.694 INFO 1488 --- [ main] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING 2017-10-23 19:43:07.874 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using JSON encoding codec LegacyJacksonJson 2017-10-23 19:43:07.874 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using JSON decoding codec LegacyJacksonJson 2017-10-23 19:43:07.971 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using XML encoding codec XStreamXml 2017-10-23 19:43:07.971 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using XML decoding codec XStreamXml 2017-10-23 19:43:08.134 INFO 1488 --- [ main] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Disable delta property : false 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Single vip registry refresh property : null 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Force full registry fetch : false 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Application is null : false 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Registered Applications size is zero : true 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Application version is -1: true 2017-10-23 19:43:08.345 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Getting all instance registry info from the eureka server 2017-10-23 19:43:08.630 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : The response status is 200 2017-10-23 19:43:08.631 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Starting heartbeat executor: renew interval is: 30 2017-10-23 19:43:08.634 INFO 1488 --- [ main] c.n.discovery.InstanceInfoReplicator : InstanceInfoReplicator onDemand update allowed rate per min is 4 2017-10-23 19:43:08.637 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Discovery Client initialized at timestamp 1508758988637 with initial instances count: 0 2017-10-23 19:43:08.657 INFO 1488 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application springms-provider-user with eureka with status UP 2017-10-23 19:43:08.658 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Saw local status change event StatusChangeEvent [timestamp=1508758988658, current=UP, previous=STARTING] 2017-10-23 19:43:08.659 INFO 1488 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient : DiscoveryClient_SPRINGMS-PROVIDER-USER/springms-provider-user:192.168.3.101:7900: registering service... 2017-10-23 19:43:08.768 INFO 1488 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 7900 (http) 2017-10-23 19:43:08.768 INFO 1488 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 7900 2017-10-23 19:43:08.773 INFO 1488 --- [ main] c.s.cloud.MsProviderUserApplication : Started ProviderApplication in 882.1 seconds (JVM running for 10.398) 复制代码
public EnableEurekaClient {}
复制代码
这个注解类竟然也使用了注解 @EnableDiscoveryClient,那么我们有必要去这个注解类看看。
public EnableDiscoveryClient {}
复制代码
这个注解类有个比较特殊的注解 @Import,由此我们猜想,这里的大多数逻辑是不是都写在这个 EnableDiscoveryClientImportSelector 类呢?
public class EnableDiscoveryClientImportSelector
extends SpringFactoryImportSelector {
protected boolean isEnabled() {
return new RelaxedPropertyResolver(getEnvironment()).getProperty(
"spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE);
}
protected boolean hasDefaultFactory() {
return true;
}
}
复制代码
EnableDiscoveryClientImportSelector 类继承了 SpringFactoryImportSelector 类,但是重写了一个 isEnabled() 方法,默认值返回 true,为什么会返回true。
public String[] selectImports(AnnotationMetadata metadata) { if (!isEnabled()) { return new String[0]; } AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(this.annotationClass.getName(), true)); Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is " + metadata.getClassName() + " annotated with @" + getSimpleName() + "?"); List factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); if (factories.isEmpty() && !hasDefaultFactory()) { throw new IllegalStateException("Annotation @" + getSimpleName() + " found, but there are no implementations. Did you forget to include a starter?"); } if (factories.size() > 1) { log.warn("More than one implementation " + "of @" + getSimpleName() + " (now relying on @Conditionals to pick one): " + factories); } return factories.toArray(new String[factories.size()]); } 复制代码
首先通过注解获取了一些属性,然后加载了一些类名称,我们进入loadFactoryNames 方法看看。
public static List loadFactoryNames(Class factoryClass, ClassLoader classLoader) { String factoryClassName = factoryClass.getName(); try { Enumeration urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) : ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION)); List result = new ArrayList(); while (urls.hasMoreElements()) { URL url = urls.nextElement(); Properties properties = PropertiesLoaderUtils.loadProperties(new UrlResource(url)); String factoryClassNames = properties.getProperty(factoryClassName); result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames))); } return result; } catch (IOException ex) { throw new IllegalArgumentException("Unable to load [" + factoryClass.getName() + "] factories from location [" + FACTORIES_RESOURCE_LOCATION + "]", ex); } } 复制代码
加载了一个配置文件,配置文件里面写了啥呢?打开SpringFactoryImportSelector该文件所在的jar包的spring.factories文件一看。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.CommonsClientAutoConfiguration,\
org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration,\
org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration,\
org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,\
org.springframework.cloud.commons.util.UtilAutoConfiguration
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.cloud.client.HostInfoEnvironmentPostProcessor
复制代码
都是一些 Configuration 后缀的类名,所以这些都是加载的一堆堆的配置文件类。 factories对象里面只有一个类名路径为 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration 。
public class EurekaDiscoveryClientConfiguration implements SmartLifecycle, Ordered { public void start() { if (this.port.get() != 0 && this.instanceConfig.getNonSecurePort() == 0) { this.instanceConfig.setNonSecurePort(this.port.get()); } if (!this.running.get() && this.instanceConfig.getNonSecurePort() > 0) { maybeInitializeClient(); if (log.isInfoEnabled()) { log.info("Registering application " + this.instanceConfig.getAppname() + " with eureka with status " + this.instanceConfig.getInitialStatus()); } this.applicationInfoManager .setInstanceStatus(this.instanceConfig.getInitialStatus()); if (this.healthCheckHandler != null) { this.eurekaClient.registerHealthCheck(this.healthCheckHandler); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, this.instanceConfig)); this.running.set(true); } } } 复制代码
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus prev = instanceInfo.setStatus(status);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, status));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
复制代码
public void registerStatusChangeListener(StatusChangeListener listener) {
listeners.put(listener.getId(), listener);
}
复制代码
果不其然,EurekaDiscoveryClientConfiguration.start 方法被调用了,紧接着 this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 也进入断点,然后在往下走,又进入的 DiscoveryClient.initScheduledTasks 方法中的 notify 回调处。
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args, Provider backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; } try { scheduler = Executors.newScheduledThreadPool(3, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); } 复制代码
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { public String getId() { return "statusChangeListener"; } public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } 复制代码
public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { scheduler.submit(new Runnable() { public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } } 复制代码
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } 复制代码
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
复制代码
public EurekaHttpResponse register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } } 复制代码
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。