赞
踩
在Eureka开源框架中,使用了Jersey来构建RESTful Web服务,类似于Spring MVC中的Controller功能,不过Jersey更加专注RESTful风格的API。
Jersey参考文档:
1、Jersey是一个什么框架,价值在哪里?
2、Jersey 开发RESTful(七)Jersey快速入门
3、JAX-RS和Jersey
在Netflix开源框架中,eureka-core中的com.netflix.eureka.resources目录下,有一个ApplicationResource类,它就是基于jersey构建的处理与Application相关请求的资源类,其中主要提供了:
这是Eureka服务端为客户端提供服务注册的入口,该方法有两个参数,分别表示:
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// 省略 验证instanceinfo需要包含的必要信息的逻辑
……
//省略 处理dataCenterInfo相关内容,一般使用的是MyDataCenterInfo
……
//注册InstanceInfo 实例,这是最核心的逻辑
registry.register(info, "true".equals(isReplication));
//构建返回对象,注册成功,默认的状态码为204
return Response.status(204).build(); // 204 to be backwards compatible
}
在上述方法中,实际用来注册服务的逻辑,是通过调用registry.register()方法来完成的。
在前面的addInstance()方法中,通过调用registry.register()方法实现了服务注册的逻辑,其实就是调用了InstanceRegistry类(SpringCloud提供)的register()方法,在该方法中,首先通过handleRegistration()方法发布了EurekaInstanceRegisteredEvent广播事件,然后调用父类PeerAwareInstanceRegistryImpl的register()方法进行实例注册。
其实,在Spring Cloud集成和扩展Eureka组件时,SpringCloud提供InstanceRegistry类,主要作用就是发布对应的广播事件,包括后续的服务续约、服务剔除等。
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
resolveInstanceLeaseDuration()方法用来解析续约间隔,默认是90s,如果InstanceInfo有配置,就是用InstanceInfo 配置中的间隔时间。
private int resolveInstanceLeaseDuration(final InstanceInfo info) {
//默认时间
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
//根据info进行配置时间
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
return leaseDuration;
}
handleRegistration()方法用来处理注册事件,这里主要通过publishEvent()方法发布了一个EurekaInstanceRegisteredEvent广播事件,通过监听该事件可以进行一些扩展,比如监听到了注册事件进行一些回调之类的操作。
private void handleRegistration(InstanceInfo info, int leaseDuration,
boolean isReplication) {
log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
+ ", leaseDuration " + leaseDuration + ", isReplication "
+ isReplication);
//发布广播事件
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
isReplication));
}
在前面InstanceRegistry类的register()方法中,实际上又调用了PeerAwareInstanceRegistryImpl类的register()方法。在该方法中,主要做了两件事:第一,调用父类AbstractInstanceRegistry中的register()方法;第二,通过调用replicateToPeers()方法,复制所有的操作到集群的其他节点中。
PeerAwareInstanceRegistryImpl类针对AbstractInstanceRegistry抽象类的扩展,主要就是在处理集群节点间的数据同步等。
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//注册
super.register(info, leaseDuration, isReplication);
//注册信息同步到其他对等节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
该方法是PeerAwareInstanceRegistryImpl类提供的通用方法,用来同步实例的所有操作(Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride)信息到集群中的其他节点。该方法主要实现了遍历peerEurekaNodes节点,然后排除自身,再调用replicateInstanceActionsToPeers()方法实现数据的同步。
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // 如果节点为空或者已经是操作同步操作,就直接返回 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // 排除给自己同步数据 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } //真正进行同步信息的方法 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
replicateInstanceActionsToPeers()方法是一个根据不同操作类型,进行不同操作类型的数据同步,比如:注册的时候,主要执行的是 node.register(info)方法,其中node是需要同步注册信息的集群节点。
private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel://取消应用 node.cancel(appName, id); break; case Heartbeat://心跳 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register://注册 node.register(info); break; case StatusUpdate://状态修改 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride://删除覆盖状态 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } finally { CurrentRequestVersion.remove(); } }
在前面的replicateInstanceActionsToPeers()方法中,我们可以知道,最终是通过调用node.register(info)方法实现注册操作的数据同步的,实际上真正调用的方法就是PeerEurekaNode类的register()方法。
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
其中,关于batchingDispatcher.process()方法,主要用于处理任务InstanceReplicationTask对象,并执行其中的replicationClient.register(info)方法,详细请参考《Eureka 源码解析 —— 任务批处理》。
AbstractJerseyEurekaHttpClient类的register()方法,是基于Jersey构建的服务注册请求。
@Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); // 抽象方法,子类可以增加请求头 addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
在前面PeerAwareInstanceRegistryImpl类的register()方法中,有调用了父类AbstractInstanceRegistry中的register()方法,该方法是真正实现注册InstanceInfo实例信息的方法,具体实现如下:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); //registry是一个ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>类型的变量,其中第一次表示的是应用名称,第二层表示的是具体的实例对象,这里首先根据应用名称获取对应的租约对象 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); //Eureka监听器,用来记录注册数量和同步数量 REGISTER.increment(isReplication); //初始化应用的租约存储对象ConcurrentHashMap<String, Lease<InstanceInfo>> if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } //获取指定实例对应的租约对象 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // 如果存在InstanceInfo实例对象的租约 if (existingLease != null && (existingLease.getHolder() != null)) { //原来对象的租约时间 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); //当前注册对象的租约时间 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); //如果原来的对象租约时间比当前要注册对象的租约时间大,就是用原来的租约对象 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else {//当原来没有租约时,就是从新注册对象,首先需要更新续约的阈值 // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { //更新续约的阈值 this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } //根据registrant创建租约对象(存在租约的对象,也会被重新创建) Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); //设置ServiceUp时间戳,主要是记录第一次的时间(因为existingLease 存在,说明之前已经开始服务,所以把原来的赋值到这里即可) if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } //把InstanceInfo的新的租约对象放到registry变量中 gMap.put(registrant.getId(), lease); //添加实例信息到recentRegisteredQueue队列中,用于统计或调试 recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); // 记录实例状态到overriddenInstanceStatusMap对象中 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } //设置实例对象的覆盖状态 InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // 根据指定规则获取覆盖状态,并在不修改dirty时间戳情况下,修改该状态 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // 当实例状态是Up时,修改租约对象的service up时间戳 if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } //设置操作类型 registrant.setActionType(ActionType.ADDED); //添加变化实例对象到recentlyChangedQueue队列 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); //设置InstanceInfo实例对象的最后一次更新时间 registrant.setLastUpdatedTimestamp(); //处理缓存数据 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。