赞
踩
eureka用于服务治理,实现各个微服务实例的自动化注册与发现,不同可用区域的服务注册中心通过异步的方式相互复制各自的状态,eureka客户端向注册中心注册服务并周期性地发送心跳更新服务租约,同时也能从服务端查询所有其他服务的信息。
当我们启动一个客户端时做了2件事
1.首先看代码如何从配置文件中读取服务地址:
在EndpointUtils类中实现了该功能
public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) { Map<String, List<String>> orderedUrls = new LinkedHashMap(); // region和zone是一对多的关系,如果获取不到使用default String region = getRegion(clientConfig); //从配置文件中读取region String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); //从配置文件中读取可用zones(多个) if (availZones == null || availZones.length == 0) { availZones = new String[]{"default"}; //如果没有配置使用default } logger.debug("The availability zone for the given region {} are {}", region, availZones); int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); //根据是否在同一zone和是否可用获取最终zone(区域亲和特性) String zone = availZones[myZoneOffset]; List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); //获取serviceUrls if (serviceUrls != null) { orderedUrls.put(zone, serviceUrls); } ... } //获取zone private static int getZoneOffset(String myZone, boolean preferSameZone, String[] availZones) { for(int i = 0; i < availZones.length; ++i) { if (myZone != null && availZones[i].equalsIgnoreCase(myZone.trim()) == preferSameZone) { return i; } } logger.warn("DISCOVERY: Could not pick a zone based on preferred zone settings. My zone - {}, preferSameZone - {}. Defaulting to {}", new Object[]{myZone, preferSameZone, availZones[0]}); return 0; }
2.实现服务注册等功能
@EnableDiscoveryClient主要用来开启DiscoveryClient实例,该类用于与eureka server相互协作,主要实现4个功能
private void initScheduledTasks() { int renewalIntervalInSecs; int expBackOffBound; if (this.clientConfig.shouldFetchRegistry()) { renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds(); expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); this.cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()); this.scheduler.schedule(this.cacheRefreshTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS); } if (this.clientConfig.shouldRegisterWithEureka()) { renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs); this.heartbeatTask = new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()); this.scheduler.schedule(this.heartbeatTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS); //心跳机制 this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); //进行服务注册 instanceInfo对象存储服务注册所需要的信息 this.statusChangeListener = new StatusChangeListener() { public String getId() { return "statusChangeListener"; } public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) { DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent); } else { DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent); } DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate(); } }; if (this.clientConfig.shouldOnDemandUpdateStatusChange()) { this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener); } this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } //InstanceInfoReplicator实现了Runnable接口重写了run方法进行服务注册 public void run() { boolean var6 = false; ScheduledFuture next; label53: { try { var6 = true; this.discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { this.discoveryClient.register(); //服务注册 this.instanceInfo.unsetIsDirty(dirtyTimestamp); var6 = false; } else { var6 = false; } break label53; } catch (Throwable var7) { logger.warn("There was a problem with the instance info replicator", var7); var6 = false; } finally { if (var6) { ScheduledFuture next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS); this.scheduledPeriodicRef.set(next); } } next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS); this.scheduledPeriodicRef.set(next); return; } next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS); this.scheduledPeriodicRef.set(next); }
最终进行服务的注册,获取,续约都是发起rest请求
3.注册中心处理请求
以 服务注册 请求为例,在ApplicationResource类中有addInstance方法
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); if (this.isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (this.isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (this.isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (this.isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!this.appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } else { //前面进行一堆校验 DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId(); if (this.isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo; String effectiveId = amazonInfo.get(MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } this.registry.register(info, "true".equals(isReplication)); //注册中心进行注册服务 return Response.status(204).build(); } }
在注册函数中现调用publishEvent方法,将该注册事件传播出去,注册信息instanceInfo对象中元数据信息存储在ConcurrentHashMap对象中,双层map,第一层key为appName属性,第二层key为instanceId属性。
服务注册类配置(eureka.client.*)
regisryFetchIntervalSeconds: 获取注册信息间隔时间
registerWithEureka : 是否将自身注册到服务端(默认为true,服务端需配置成false)
fetchRegistry : 是否从服务端获取注册信息(默认为true,服务端需配置成false)
服务实例类配置(eureka.instance.*)
leaseRenewalIntervalInSeconds : 客户端向服务端发送心跳时间间隔 (默认30)
leaseExpirationDurationInSeconds : 超时剔除该服务 (默认90)
参考资料:《springcloud微服务实战》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。