赞
踩
我们先介绍服务端和客户端的核心类,然后再总结执行流程。
作为一个服务注册及发现中心,主要解决一下几个问题
1 . 服务实例如何注册到服务中心。
本质上就是在服务启动的时候,需要调用Eureka Server 的Rest API 的refister 方法,去注册应用实例信息。
2.服务实例如何从服务中心剔除
正常情况下,通过钩子函数或其他生命周期方法去调用Eureka Server 的Rest API 的 deRegister 方法。在实例异常情况没用及时删除自身信息的问题,Eureka Server 要求Client 定时进行服务续约。如果超出一定时间没用进行续约操作,Eureka Server 会主动将实例剔除。
在集群中,Eureka 是peer to peer 架构,副本之间不分主从,任何副本都可以接收写操作,然后每个副本之间相互进行数据更新,Eureka Server 在接收到Client 的请求后,会复制到其他peer节点,其中Eureka Server 在执行复制的时候,使用HEADER_REPLICATION 的http header 来将这个请求与普通的实例的正常请求区分。peer to peer 还需要解决的一个问题就是数据的复制问题,Eureka 使用lastDirtyTImestamp标识进行处理:
在介绍完服务端设计后,我们开始分析SpringCloud对Eureka的扩展。
我们以注解@EnableEurekaServer为切入点
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
@EnableEurekaServer使用了@Import 注解导入EurekaServerMarkerConfiguration类
/** * Responsible for adding in a marker bean to activate * {@link EurekaServerAutoConfiguration} * * @author Biju Kunjummen */ @Configuration public class EurekaServerMarkerConfiguration { @Bean public Marker eurekaServerMarkerBean() { return new Marker(); } class Marker { } }
通过EurekaServerMarkerConfiguration上面的注释可知,EurekaServerMarkerConfiguration为标记类用来激活EurekaServerAutoConfiguration。
我们熟悉 SpringBoot 在启动时会扫描所有META-INF 下的Spring.factories 中的所有org.springframework.boot.autoconfigure.EnableAutoConfiguration。我们可以在spring-cloud-netflix-eureka-server下找到Spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
正如我们所料SpringCloudEureka 将配置交给了SpringBoot。接下来看看EurekaServerAutoConfiguration。
@Configuration @Import(EurekaServerInitializerConfiguration.class) // 上面有提到过EurekaServerMarkerConfiguration 用来激活 EurekaServerAutoConfiguration ,这里通过ConditionalOnBean实现 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) // 获取服务实例配置 @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter { /** * 这两个包中包含Jersey需要的资源,Eureka server 是通过Jersey框架暴露各个端口 */ private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery", "com.netflix.eureka" }; // 实例信息 @Autowired private ApplicationInfoManager applicationInfoManager; // 服务端配置 @Autowired private EurekaServerConfig eurekaServerConfig; // 客户端配置 @Autowired private EurekaClientConfig eurekaClientConfig; .... @Configuration protected static class EurekaServerConfigBeanConfiguration { @Bean @ConditionalOnMissingBean public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { EurekaServerConfigBean server = new EurekaServerConfigBean(); // 是否需要想Eureka 注册, 有时候单实例Server 是不需要的 if (clientConfig.shouldRegisterWithEureka()) { // Set a sensible default if we are supposed to replicate server.setRegistrySyncRetries(5); } return server; } } // eureka 控制台 @Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); } .... // PeerAwareInstanceRegistry 服务端用于处理服务通信 , 由上一节的AbstractInstanceRegistry实现,PeerAwareInstanceRegistryImpl扩展集群通信 @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } // 集群节点 @Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) { return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager); } // 可刷新的PeerEurekaNodes peer节点 static class RefreshablePeerEurekaNodes extends PeerEurekaNodes implements ApplicationListener<EnvironmentChangeEvent> { public RefreshablePeerEurekaNodes( final PeerAwareInstanceRegistry registry, final EurekaServerConfig serverConfig, final EurekaClientConfig clientConfig, final ServerCodecs serverCodecs, final ApplicationInfoManager applicationInfoManager) { super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager); } // 监听 EnvironmentChangeEvent , 更新 peerEurekaNodes @Override public void onApplicationEvent(final EnvironmentChangeEvent event) { if (shouldUpdate(event.getKeys())) { updatePeerEurekaNodes(resolvePeerUrls()); } } /* * 检查是否需要更新 */ protected boolean shouldUpdate(final Set<String> changedKeys) { assert changedKeys != null; // 读取配置 if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { return false; } if (changedKeys.contains("eureka.client.region")) { return true; } for (final String key : changedKeys) { // property keys are not expected to be null. if (key.startsWith("eureka.client.service-url.") || key.startsWith("eureka.client.availability-zones.")) { return true; } } return false; } } // 保存 EurekaServer 实例的信息 PeerAwareInstanceRegistry 以及 PeerEurekaNodes @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); } // EurekaServer 启动类 @Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); } // 创建 jerseyApplication 并将 EUREKA_PACKAGES 下的Eureka 类资源交给 jerseyApplication @Bean public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider( false, environment); // Filter to include only classes that have a particular annotation. // provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class)); // Find classes in Eureka packages (or subpackages) // Set<Class<?>> classes = new HashSet<>(); for (String basePackage : EUREKA_PACKAGES) { Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage); for (BeanDefinition bd : beans) { Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader()); classes.add(cls); } } // Construct the Jersey ResourceConfig // Map<String, Object> propsAndFeatures = new HashMap<>(); propsAndFeatures.put( // Skip static content used by the webapp ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX, EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*"); DefaultResourceConfig rc = new DefaultResourceConfig(classes); rc.setPropertiesAndFeatures(propsAndFeatures); return rc; } }
由上面代码可以看出 EurekaServerAutoConfiguration 配置了 EurekaServerConfig , PeerAwareInstanceRegistry,PeerEurekaNodes , EurekaServerContext ,EurekaServerBootstrap,jerseyApplication。下面介绍几个核心类 。
EurekaServerContext:
@Singleton public class DefaultEurekaServerContext implements EurekaServerContext { private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class); private final EurekaServerConfig serverConfig; private final ServerCodecs serverCodecs; private final PeerAwareInstanceRegistry registry; private final PeerEurekaNodes peerEurekaNodes; private final ApplicationInfoManager applicationInfoManager; @Inject public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.registry = registry; this.peerEurekaNodes = peerEurekaNodes; this.applicationInfoManager = applicationInfoManager; } // 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器调用一次,类似于Serclet的inti()方法。被@PostConstruct修饰的方法会在构造函数之后,init()方法之前运行。 @PostConstruct @Override public void initialize() { logger.info("Initializing ..."); peerEurekaNodes.start(); try { registry.init(peerEurekaNodes); } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized"); } // 被@PreDestroy修饰的方法会在服务器卸载Servlet的时候运行,并且只会被服务器调用一次,类似于Servlet的destroy()方法。被@PreDestroy修饰的方法会在destroy()方法之后运行,在Servlet被彻底卸载之前。 @PreDestroy @Override public void shutdown() { logger.info("Shutting down ..."); registry.shutdown(); peerEurekaNodes.shutdown(); logger.info("Shut down"); } @Override public EurekaServerConfig getServerConfig() { return serverConfig; } @Override public PeerEurekaNodes getPeerEurekaNodes() { return peerEurekaNodes; } @Override public ServerCodecs getServerCodecs() { return serverCodecs; } @Override public PeerAwareInstanceRegistry getRegistry() { return registry; } @Override public ApplicationInfoManager getApplicationInfoManager() { return applicationInfoManager; } }
initialize() 方法会调用PeerEurekaNodesde的start方法 ,更新peer节点路径信息。
PeerEurekaNodes :
public class PeerEurekaNodes { protected final PeerAwareInstanceRegistry registry; protected final EurekaServerConfig serverConfig; protected final EurekaClientConfig clientConfig; protected final ServerCodecs serverCodecs; private final ApplicationInfoManager applicationInfoManager; // 保存 peer 节点信息 private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList(); private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet(); // 用于更新 PeerEurekaNodes 定时任务 private ScheduledExecutorService taskExecutor; // 开启更新peer节点信息的定时任务 public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } } // 关闭定时任务, 清除节点信息 public void shutdown() { taskExecutor.shutdown(); List<PeerEurekaNode> toRemove = this.peerEurekaNodes; this.peerEurekaNodes = Collections.emptyList(); this.peerEurekaNodeUrls = Collections.emptySet(); for (PeerEurekaNode node : toRemove) { node.shutDown(); } } /** 其他方法比较简单 */ }
EurekaServerBootstrap:
public class EurekaServerBootstrap { protected EurekaServerConfig eurekaServerConfig; protected ApplicationInfoManager applicationInfoManager; protected EurekaClientConfig eurekaClientConfig; protected PeerAwareInstanceRegistry registry; protected volatile EurekaServerContext serverContext; protected volatile AwsBinder awsBinder; //初始化eureka 环境信息, 设置EurekaServerContextHolder public void contextInitialized(ServletContext context) { try { initEurekaEnvironment(); initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } } //销毁eureka 环境信息, 设置EurekaServerContextHolder public void contextDestroyed(ServletContext context) { try { log.info("Shutting down Eureka Server.."); context.removeAttribute(EurekaServerContext.class.getName()); destroyEurekaServerContext(); destroyEurekaEnvironment(); } catch (Throwable e) { log.error("Error shutting down eureka", e); } log.info("Eureka Service is now shutdown..."); } }
Eureka Server 是通过jersey 框架暴露REST 端口的。在eureka-core的resources包中可以看到AppcationResource类和InstanceResource类,可以看到Eureka Server 的API 端口。
@Configuration public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered { ..... @Override public void start() { new Thread(new Runnable() { @Override public void run() { try { //启动eureka server eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); // 发布事件通知 publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); } ..... }
EurekaServerInitializerConfiguration 实现了SmartLifecycle接口 ,会在ApplicationContext的refresh的finishRefresh方法调用start()方法,执行eurekaServerBootstrap的contextInitialized方法,启动Eureka Server ,这时Eureka Server就可以正常服务了。
同样,我们在Maven仓库找到eureka-client,可以在spring.factories文件钟的配置类:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
我们先介绍EurekaClientAutoConfiguration:
@Configuration @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @Import(DiscoveryClientOptionalArgsConfiguration.class) @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) @AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"}) public class EurekaClientAutoConfiguration { .... // eureka client 配置类 @Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { EurekaClientConfigBean client = new EurekaClientConfigBean(); if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) { // We don't register during bootstrap by default, but there will be another // chance later. client.setRegisterWithEureka(false); } return client; } // eureka 实例配置 @Bean @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT) public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) { String hostname = getProperty("eureka.instance.hostname"); boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address")); String ipAddress = getProperty("eureka.instance.ip-address"); boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled")); String serverContextPath = env.getProperty("server.context-path", "/"); int serverPort = Integer.valueOf(env.getProperty("server.port", env.getProperty("port", "8080"))); Integer managementPort = env.getProperty("management.server.port", Integer.class);// nullable. should be wrapped into optional String managementContextPath = env.getProperty("management.server.servlet.context-path");// nullable. should be wrapped into optional Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);//nullable EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils); instance.setNonSecurePort(serverPort); instance.setInstanceId(getDefaultInstanceId(env)); instance.setPreferIpAddress(preferIpAddress); instance.setSecurePortEnabled(isSecurePortEnabled); if (StringUtils.hasText(ipAddress)) { instance.setIpAddress(ipAddress); } if(isSecurePortEnabled) { instance.setSecurePort(serverPort); } if (StringUtils.hasText(hostname)) { instance.setHostname(hostname); } String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path"); String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path"); if (StringUtils.hasText(statusPageUrlPath)) { instance.setStatusPageUrlPath(statusPageUrlPath); } if (StringUtils.hasText(healthCheckUrlPath)) { instance.setHealthCheckUrlPath(healthCheckUrlPath); } ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath, managementContextPath, managementPort); if(metadata != null) { instance.setStatusPageUrl(metadata.getStatusPageUrl()); instance.setHealthCheckUrl(metadata.getHealthCheckUrl()); if(instance.isSecurePortEnabled()) { instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl()); } Map<String, String> metadataMap = instance.getMetadataMap(); if (metadataMap.get("management.port") == null) { metadataMap.put("management.port", String.valueOf(metadata.getManagementPort())); } } else { //without the metadata the status and health check URLs will not be set //and the status page and health check url paths will not include the //context path so set them here if(StringUtils.hasText(managementContextPath)) { instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath()); instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath()); } } setupJmxPort(instance, jmxPort); return instance; } // 客户端服发现类 用于获取服务 @Bean public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) { return new EurekaDiscoveryClient(config, client); } // 用于注册和取消注册 通过 EurekaRegistration @Bean public EurekaServiceRegistry eurekaServiceRegistry() { return new EurekaServiceRegistry(); } // 可刷新的配置 大部分情况下都是可刷新 @Configuration @ConditionalOnRefreshScope protected static class RefreshableEurekaClientConfiguration { @Autowired private ApplicationContext context; @Autowired private AbstractDiscoveryClientOptionalArgs<?> optionalArgs; // 客户端用于服务通信类 (服务发现,注册, 续约 ) @Lazy 懒加载 @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) { manager.getInfo(); // force initialization return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } // 应用信息 @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } // 代表了一个应用服务的实例在服务发现系统中 @Bean @org.springframework.cloud.context.config.annotation.RefreshScope @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) { return EurekaRegistration.builder(instanceConfig) .with(applicationInfoManager) .with(eurekaClient) .with(healthCheckHandler) .build(); } } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) { return new EurekaAutoServiceRegistration(context, registry, registration); } }
上面代码自动配置的大部分配置实例,注释上有大致作用,希望读者自行研究。
自动配置类中主要的就是EurekaClientAutoConfiguration 。 在上面EurekaClientAutoConfiguration 类介绍提到过一般都是配置可刷新的,其中EurekaClient为懒加载模式
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
manager.getInfo(); // force initialization
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
CloudEurekaClient 是继承自 DiscoveryClient , 我们上一节讲过 DiscoveryClient 在初始化的时候会配置实例信息 , 向服务器发起注册,并启动服务续约定时任务和缓存刷新任务。由于上面配置的是懒加载模式,在Spring 配置bean 时,不会立即初始化。CloudEurekaClient 初始化在EurekaAutoServiceRegistration,其实从名字就可与看出这个类的作用(这也是Spring的优点)。
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered { ..... @Override public void start() { // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); this.context.publishEvent( new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); this.running.set(true); } } .... }
看到了我们熟悉的SmartLifecycle接口以及start方法,我们说过实现了SmartLifecycle接口 ,会在ApplicationContext的refresh的finishRefresh方法调用start()方法,这里主要看this.serviceRegistry.register(this.registration);
public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> { ...... @Override public void register(EurekaRegistration reg) { // 启动 EurekaClient maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getInstanceConfig().getAppname() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler)); } private void maybeInitializeClient(EurekaRegistration reg) { // 强制初始化 。 reg.getApplicationInfoManager().getInfo(); reg.getEurekaClient().getApplications(); } ...... }
在这里初始化了DiscoveryClient 配置实例信息 , 向服务器发起注册,并启动服务续约定时任务和缓存刷新任务。客户端执行完成 。
自动配置类中主要的就是EurekaServerAutoConfiguration。主要看EurekaServerContext , EurekaServerInitializerConfiguration,EurekaServerBootstrap,javax.ws.rs.core.Application(jerseyApplication) Eureka Server 使用jerseyApplication暴露服务端口,进行服务通信。
EurekaServerContext :
@Singleton public class DefaultEurekaServerContext implements EurekaServerContext { .... // @PostConstruct 这个注解好像在哪里见过? 没错他会在bean初始化时调用 @PostConstruct @Override public void initialize() { logger.info("Initializing ..."); peerEurekaNodes.start(); try { registry.init(peerEurekaNodes); } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized"); } .... }
EurekaServerContext 默认实现类 DefaultEurekaServerContext 会在初始化时调用peerEurekaNodes.start() ,以及registry.init(peerEurekaNodes)
PeerEurekaNodes:
@Singleton public class PeerEurekaNodes { ..... public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } } .... }
从PeerEurekaNodes#start()方法知道,会先更新Peer节点信息,以及开启更新Peer节点的定时任务。
看updatePeerEurekaNodes()方法:
// 从配置里拿到 peers 的url protected List<String> resolvePeerUrls() { InstanceInfo myInfo = applicationInfoManager.getInfo(); String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); List<String> replicaUrls = EndpointUtils .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); int idx = 0; while (idx < replicaUrls.size()) { if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; } protected void updatePeerEurekaNodes(List<String> newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } // 得到 需要剔除的peer 以及需要添加的url Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } // Remove peers no long available List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // Add new peers if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); }
在eureka 初始化的时候 , 没用需要剔除的url 都需要添加,会调用createPeerEurekaNode()方法。
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
通过HttpReplicationClient 向peer 发送请,获得信息后创建PeerEurekaNode。完成peer之间的注册。
EurekaServerInitializerConfiguration:
@Configuration public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered { ..... @Override public void start() { new Thread(new Runnable() { @Override public void run() { try { //TODO: is this class even needed now? eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); } ..... }
熟悉的SmartLifecycle接口? 直接看start方法吧,启动EurekaServerBootstrap#contextInitialized . 将running设置为true。发布eureka 事件 。
EurekaServerBootstrap :
public class EurekaServerBootstrap { ..... public void contextInitialized(ServletContext context) { try { // 初始化 eureka 环境 initEurekaEnvironment(); // 初始化 eureka 上下文 initEurekaServerContext(); // 设置ServletContext 属性 context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } } ..... }
Eureka Server 就以及正常启动了(使用jerseyApplication暴露服务端口)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。