当前位置:   article > 正文

Eureka源码解析(三)—服务续约(心跳)_eureka 心跳包里有哪些东西

eureka 心跳包里有哪些东西

Eureka服务续约(心跳)

  • EurekaClient在注册到EurekaServer端之后,会通过启动时初始化的定时任务定时向EurekaServer端进行服务续约(心跳)。本篇文章主要解析EurekaClient端向EurekaServer端发起服务续约(心跳)请求以及EurekaServer端接收请求后的具体操作,分为EurekaClient端发送请求和EurekaServer端接收请求来解析。注:我理解的服务续约和心跳检测完全就是同一个操作,都是定时向EurekaServer端发送当前节点的信息(其实看了代码之后发现确实是同一个操作,服务续约即心跳检测)本文基于https://github.com/Netflix/eureka上的master分支。最近在github上fork了一下eureka项目,更详细的注释可以去我的git上看:https://github.com/qiuyangli/eureka

EurekaClient端发送请求

  • 首先回顾一下EurekaClient端启动时候初始化的这个定时任务:
    1. // 10秒
    2. int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
    3. // 心跳(续约)频率,默认30秒
    4. int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
    5. logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    6. // Heartbeat timer
    7. // 实际是一个心跳(续约)定时任务,后面会有详细解析
    8. scheduler.schedule(
    9. new TimedSupervisorTask(
    10. "heartbeat",
    11. scheduler,
    12. heartbeatExecutor,
    13. renewalIntervalInSecs,
    14. TimeUnit.SECONDS,
    15. expBackOffBound,
    16. // new一个续约线程,最终调用到renew()方法
    17. new HeartbeatThread()
    18. ),
    19. renewalIntervalInSecs, TimeUnit.SECONDS);
  • 发现new了一个名叫HeartbeatThread的线程,赶紧看一下run()方法
    1. public void run() {
    2. if (renew()) {
    3. // 最后成功心跳(续约)时间
    4. lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
    5. }
    6. }
  • run发现在run()里调用了一个名字为renew()的方法,到了这里更加确信了服务续约和心跳检测是一个东西。。。这个renew()方法便是发起续约(心跳)请求的方法。看一下具体代码:
    1. // 心跳(续约)
    2. boolean renew() {
    3. EurekaHttpResponse<InstanceInfo> httpResponse;
    4. try {
    5. // 使用Jersey构建Rest调用EurekaServer端
    6. // AbstractJerseyEurekaHttpClient-sendHeartBeat(appName, id, info, overriddenStatus)方法
    7. // 通过sendHeartBeat方法调用eureka-core中的方法
    8. // 具体为调用com.netflix.eureka.resources包下的InstanceResource类的renewLease方法进行续约
    9. httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
    10. logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
    11. // 租约不存在的时候,进行注册操作
    12. if (httpResponse.getStatusCode() == 404) {
    13. REREGISTER_COUNTER.increment();
    14. logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
    15. long timestamp = instanceInfo.setIsDirtyWithTime();
    16. // 发起注册操作
    17. boolean success = register();
    18. if (success) {
    19. instanceInfo.unsetIsDirty(timestamp);
    20. }
    21. return success;
    22. }
    23. return httpResponse.getStatusCode() == 200;
    24. } catch (Throwable e) {
    25. logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
    26. return false;
    27. }
    28. }

EurekaServer端接收请求

  • EurekaServer端接收续约(心跳)请求的方法是eureka-core包里的InstanceResource-renewLease方法,这个方法同EurekaServer接收注册信息的方法一样,都用isReplication这个字段来表明当前请求是接收EurekaClient端发来的续约(心跳)请求还是其他EurekaServer端发来的同步信息的请求。debug发现续约时isReplication还是为null
    具体代码如下:

    1. // 接收EurekaClient端发送的续租(心跳)请求
    2. // 也有可能是接收其他EurekaServer端同步数据的请求
    3. @PUT
    4. public Response renewLease(// 是否是Replication模式
    5. @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
    6. @QueryParam("overriddenstatus") String overriddenStatus,// 实例的覆盖状态
    7. @QueryParam("status") String status,// 实例状态
    8. // 实例信息在EurekClient端上次被修改的时间
    9. @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    10. boolean isFromReplicaNode = "true".equals(isReplication);
    11. // 续租(心跳)
    12. boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    13. // Not found in the registry, immediately ask for a register
    14. // 续租失败,返回404,EurekaClient端收到404后会发起注册请求
    15. if (!isSuccess) {
    16. logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
    17. return Response.status(Status.NOT_FOUND).build();
    18. }
    19. // Check if we need to sync based on dirty time stamp, the client
    20. // instance might have changed some value
    21. Response response = null;
    22. if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
    23.   // 验证传入的lastDirtyTimestamp和EurekaServer端保存的lastDirtyTimestamp是否相同
    24. response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
    25. // Store the overridden status since the validation found out the node that replicates wins
    26. if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
    27. && (overriddenStatus != null)
    28. && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
    29. && isFromReplicaNode) {
    30. registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
    31. }
    32. } else {
    33. // 续约成功,返回200
    34. response = Response.ok().build();
    35. }
    36. logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
    37. return response;
    38. }
  • 在boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);这里调用了PeerAwareInstanceRegistryImpl的renew(appName, id, isReplication)方法,代码如下:

    1. public boolean renew(final String appName, final String id, final boolean isReplication) {
    2. // 调用父类里的renew(appName, id, isReplication)方法续约
    3. if (super.renew(appName, id, isReplication)) {
    4. // 如果是续约请求则向其他EurekaServer节点同步续约信息
    5. // 如果是同步信息请求则直接返回
    6. replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
    7. return true;
    8. }
    9. return false;
    10. }
  • AbstractInstanceRegistry的renew(appName, id, isReplication)方法代码如下:

    1. public boolean renew(String appName, String id, boolean isReplication) {
    2. RENEW.increment(isReplication);
    3. // 根据实例名称取出实例信息集合
    4. Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    5. Lease<InstanceInfo> leaseToRenew = null;
    6. if (gMap != null) {
    7. // 根据实例id取出具体实例租约信息
    8. leaseToRenew = gMap.get(id);
    9. }
    10. // 租约不存在
    11. if (leaseToRenew == null) {
    12. RENEW_NOT_FOUND.increment(isReplication);
    13. logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
    14. return false;
    15. } else {
    16. InstanceInfo instanceInfo = leaseToRenew.getHolder();
    17. if (instanceInfo != null) {
    18. // touchASGCache(instanceInfo.getASGName());
    19. // 获得实例的覆盖状态
    20. InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
    21. instanceInfo, leaseToRenew, isReplication);
    22. // 实例覆盖状态为UNKNOWN,续租失败
    23. if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
    24. logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
    25. + "; re-register required", instanceInfo.getId());
    26. RENEW_NOT_FOUND.increment(isReplication);
    27. return false;
    28. }
    29. // 实例状态与覆盖状态不一致
    30. if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
    31. logger.info(
    32. "The instance status {} is different from overridden instance status {} for instance {}. "
    33. + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
    34. instanceInfo.getOverriddenStatus().name(),
    35. instanceInfo.getId());
    36. // 强行把实例的覆盖状态设为实例状态
    37. // 即status = overriddenInstanceStatus
    38. instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
    39. }
    40. }
    41. renewsLastMin.increment();
    42. // 续租(设置lastUpdateTimestamp(租约最后更新时间))
    43. leaseToRenew.renew();
    44. return true;
    45. }
    46. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/818283
推荐阅读
相关标签
  

闽ICP备14008679号