当前位置:   article > 正文

(三)elasticsearch 源码之启动流程分析_elasticsearch启动流程源码分析

elasticsearch启动流程源码分析

https://www.cnblogs.com/darcy-yuan/p/17007635.html

1.前面我们在《(一)elasticsearch 编译和启动》和 《(二)elasticsearch 源码目录 》简单了解下es(elasticsearch,下同),现在我们来看下启动代码

下面是启动流程图,我们按照流程图的顺序依次描述

2.启动流程

  1. org.elasticsearch.bootstrap.Elasticsearch
  2. public static void main(final String[] args) throws Exception {
  3. overrideDnsCachePolicyProperties();
  4. /*
  5. * We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
  6. * presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This
  7. * forces such policies to take effect immediately.
  8. */
  9. System.setSecurityManager(new SecurityManager() {
  10. @Override
  11. public void checkPermission(Permission perm) {
  12. // grant all permissions so that we can later set the security manager to the one that we want
  13. }
  14. });
  15. LogConfigurator.registerErrorListener();
  16. final Elasticsearch elasticsearch = new Elasticsearch();
  17. int status = main(args, elasticsearch, Terminal.DEFAULT);
  18. if (status != ExitCodes.OK) {
  19. exit(status);
  20. }
  21. }

后续执行 Elasticsearch.execute -> Elasticsearch.init -> Bootstrap.init

  1. org.elasticsearch.bootstrap.Bootstrap
  2. static void init(
  3. final boolean foreground,
  4. final Path pidFile,
  5. final boolean quiet,
  6. final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
  7. // force the class initializer for BootstrapInfo to run before
  8. // the security manager is installed
  9. BootstrapInfo.init();
  10. INSTANCE = new Bootstrap();
  11. // 安全配置文件
  12. final SecureSettings keystore = loadSecureSettings(initialEnv);
  13. final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
  14. LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
  15. try {
  16. LogConfigurator.configure(environment);
  17. } catch (IOException e) {
  18. throw new BootstrapException(e);
  19. }
  20. if (JavaVersion.current().compareTo(JavaVersion.parse("11")) < 0) {
  21. final String message = String.format(
  22. Locale.ROOT,
  23. "future versions of Elasticsearch will require Java 11; " +
  24. "your Java version from [%s] does not meet this requirement",
  25. System.getProperty("java.home"));
  26. new DeprecationLogger(LogManager.getLogger(Bootstrap.class)).deprecated(message);
  27. }
  28. // 处理pidFile
  29. if (environment.pidFile() != null) {
  30. try {
  31. PidFile.create(environment.pidFile(), true);
  32. } catch (IOException e) {
  33. throw new BootstrapException(e);
  34. }
  35. }
  36. // 如果是后台启动,则不打印日志
  37. final boolean closeStandardStreams = (foreground == false) || quiet;
  38. try {
  39. if (closeStandardStreams) {
  40. final Logger rootLogger = LogManager.getRootLogger();
  41. final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
  42. if (maybeConsoleAppender != null) {
  43. Loggers.removeAppender(rootLogger, maybeConsoleAppender);
  44. }
  45. closeSystOut();
  46. }
  47. // fail if somebody replaced the lucene jars
  48. checkLucene();
  49. // 通用异常捕获
  50. // install the default uncaught exception handler; must be done before security is
  51. // initialized as we do not want to grant the runtime permission
  52. // setDefaultUncaughtExceptionHandler
  53. Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
  54. INSTANCE.setup(true, environment);
  55. try {
  56. // any secure settings must be read during node construction
  57. IOUtils.close(keystore);
  58. } catch (IOException e) {
  59. throw new BootstrapException(e);
  60. }
  61. INSTANCE.start();
  62. if (closeStandardStreams) {
  63. closeSysError();
  64. }
  65. }

这里我们可以关注下  INSTANCE.setup(true, environment);

  1. org.elasticsearch.bootstrap.Bootstrap
  2. private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
  3. Settings settings = environment.settings();
  4. try {
  5. spawner.spawnNativeControllers(environment);
  6. } catch (IOException e) {
  7. throw new BootstrapException(e);
  8. }
  9. // 检查一些mlock设定
  10. initializeNatives(
  11. environment.tmpFile(),
  12. BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
  13. BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
  14. BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
  15. // 探针
  16. // initialize probes before the security manager is installed
  17. initializeProbes();
  18. if (addShutdownHook) {
  19. Runtime.getRuntime().addShutdownHook(new Thread() {
  20. @Override
  21. public void run() {
  22. try {
  23. IOUtils.close(node, spawner);
  24. LoggerContext context = (LoggerContext) LogManager.getContext(false);
  25. Configurator.shutdown(context);
  26. if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
  27. throw new IllegalStateException("Node didn't stop within 10 seconds. " +
  28. "Any outstanding requests or tasks might get killed.");
  29. }
  30. } catch (IOException ex) {
  31. throw new ElasticsearchException("failed to stop node", ex);
  32. } catch (InterruptedException e) {
  33. LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
  34. Thread.currentThread().interrupt();
  35. }
  36. }
  37. });
  38. }
  39. try {
  40. // 检查类加载的一些问题
  41. // look for jar hell
  42. final Logger logger = LogManager.getLogger(JarHell.class);
  43. JarHell.checkJarHell(logger::debug);
  44. } catch (IOException | URISyntaxException e) {
  45. throw new BootstrapException(e);
  46. }
  47. // Log ifconfig output before SecurityManager is installed
  48. IfConfig.logIfNecessary();
  49. // 安全处理
  50. // install SM after natives, shutdown hooks, etc.
  51. try {
  52. Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
  53. } catch (IOException | NoSuchAlgorithmException e) {
  54. throw new BootstrapException(e);
  55. }
  56. node = new Node(environment) {
  57. @Override
  58. protected void validateNodeBeforeAcceptingRequests(
  59. final BootstrapContext context,
  60. final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
  61. BootstrapChecks.check(context, boundTransportAddress, checks);
  62. }
  63. };
  64. }

最后一句 node = new Node(environment) 初始化了节点,里面做了许多工作

  1. org.elasticsearch.node.Node
  2. protected Node(
  3. final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
  4. ...
  5. // 打印jvm信息
  6. final JvmInfo jvmInfo = JvmInfo.jvmInfo();
  7. logger.info(
  8. "version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
  9. Build.CURRENT.getQualifiedVersion(),
  10. jvmInfo.pid(),
  11. Build.CURRENT.flavor().displayName(),
  12. Build.CURRENT.type().displayName(),
  13. Build.CURRENT.hash(),
  14. Build.CURRENT.date(),
  15. Constants.OS_NAME,
  16. Constants.OS_VERSION,
  17. Constants.OS_ARCH,
  18. Constants.JVM_VENDOR,
  19. Constants.JVM_NAME,
  20. Constants.JAVA_VERSION,
  21. Constants.JVM_VERSION);
  22. ...
  23. // 初始化各类服务,以及他们相关的依赖
  24. this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(),
  25. environment.pluginsFile(), classpathPlugins);
  26. final Settings settings = pluginsService.updatedSettings();
  27. final Set<DiscoveryNodeRole> possibleRoles = Stream.concat(
  28. DiscoveryNodeRole.BUILT_IN_ROLES.stream(),
  29. pluginsService.filterPlugins(Plugin.class)
  30. .stream()
  31. .map(Plugin::getRoles)
  32. .flatMap(Set::stream))
  33. .collect(Collectors.toSet());
  34. DiscoveryNode.setPossibleRoles(possibleRoles);
  35. localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
  36. ...
  37. // guice注入
  38. modules.add(b -> {
  39. b.bind(Node.class).toInstance(this);
  40. b.bind(NodeService.class).toInstance(nodeService);
  41. b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
  42. b.bind(PluginsService.class).toInstance(pluginsService);
  43. b.bind(Client.class).toInstance(client);
  44. b.bind(NodeClient.class).toInstance(client);
  45. b.bind(Environment.class).toInstance(this.environment);
  46. b.bind(ThreadPool.class).toInstance(threadPool);

es 使用 guice注入框架,guice是个非常轻量级的依赖注入框架,既然各个组件都已经注入好了,我们现在可以启动了。

INSTANCE.start -> Bootstrap.start

  1. org.elasticsearch.bootstrap.Bootstrap
  2. private void start() throws NodeValidationException {
  3. node.start();
  4. keepAliveThread.start();
  5. }

 node.start中启动各个组件。es中的各个组件继承了 AbstractLifecycleComponent。start方法会调用组件的doStart方法。

  1. org.elasticsearch.node.Node
  2. public Node start() throws NodeValidationException {
  3. if (!lifecycle.moveToStarted()) {
  4. return this;
  5. }
  6. logger.info("starting ...");
  7. pluginLifecycleComponents.forEach(LifecycleComponent::start);
  8. injector.getInstance(MappingUpdatedAction.class).setClient(client);
  9. injector.getInstance(IndicesService.class).start();
  10. injector.getInstance(IndicesClusterStateService.class).start();
  11. injector.getInstance(SnapshotsService.class).start();
  12. injector.getInstance(SnapshotShardsService.class).start();
  13. injector.getInstance(SearchService.class).start();
  14. nodeService.getMonitorService().start();
  15. final ClusterService clusterService = injector.getInstance(ClusterService.class);
  16. final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
  17. nodeConnectionsService.start();
  18. clusterService.setNodeConnectionsService(nodeConnectionsService);
  19. ...

具体的我们看两个比较重要的服务  transportService.start();

  1. org.elasticsearch.transport.TransportService
  2. @Override
  3. protected void doStart() {
  4. transport.setMessageListener(this);
  5. connectionManager.addListener(this);
  6. // 建立网络连接
  7. transport.start();
  8. if (transport.boundAddress() != null && logger.isInfoEnabled()) {
  9. logger.info("{}", transport.boundAddress());
  10. for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) {
  11. logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
  12. }
  13. }
  14. localNode = localNodeFactory.apply(transport.boundAddress());
  15. if (connectToRemoteCluster) {
  16. // here we start to connect to the remote clusters
  17. remoteClusterService.initializeRemoteClusters();
  18. }
  19. }

启动transport的实现类是 SecurityNetty4HttpServerTransport

 另一个比较重要的服务,discovery.start(),具体实现类是 Coordinator

  1. org.elasticsearch.cluster.coordination.Coordinator
  2. @Override
  3. protected void doStart() {
  4. synchronized (mutex) {
  5. CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
  6. coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy));
  7. peerFinder.setCurrentTerm(getCurrentTerm());
  8. configuredHostsResolver.start();
  9. final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
  10. if (lastAcceptedState.metaData().clusterUUIDCommitted()) {
  11. logger.info("cluster UUID [{}]", lastAcceptedState.metaData().clusterUUID());
  12. }
  13. final VotingConfiguration votingConfiguration = lastAcceptedState.getLastCommittedConfiguration();
  14. if (singleNodeDiscovery &&
  15. votingConfiguration.isEmpty() == false &&
  16. votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) {
  17. throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +
  18. DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() +
  19. " does not have quorum in voting configuration " + votingConfiguration);
  20. }
  21. ...
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/651522
推荐阅读
相关标签
  

闽ICP备14008679号