赞
踩
https://www.cnblogs.com/darcy-yuan/p/17007635.html
1.前面我们在《(一)elasticsearch 编译和启动》和 《(二)elasticsearch 源码目录 》简单了解下es(elasticsearch,下同),现在我们来看下启动代码
下面是启动流程图,我们按照流程图的顺序依次描述
2.启动流程
- org.elasticsearch.bootstrap.Elasticsearch
-
- public static void main(final String[] args) throws Exception {
- overrideDnsCachePolicyProperties();
- /*
- * We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
- * presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This
- * forces such policies to take effect immediately.
- */
- System.setSecurityManager(new SecurityManager() {
-
- @Override
- public void checkPermission(Permission perm) {
- // grant all permissions so that we can later set the security manager to the one that we want
- }
-
- });
- LogConfigurator.registerErrorListener();
- final Elasticsearch elasticsearch = new Elasticsearch();
- int status = main(args, elasticsearch, Terminal.DEFAULT);
- if (status != ExitCodes.OK) {
- exit(status);
- }
- }
后续执行 Elasticsearch.execute -> Elasticsearch.init -> Bootstrap.init
- org.elasticsearch.bootstrap.Bootstrap
-
- static void init(
- final boolean foreground,
- final Path pidFile,
- final boolean quiet,
- final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
- // force the class initializer for BootstrapInfo to run before
- // the security manager is installed
- BootstrapInfo.init();
-
- INSTANCE = new Bootstrap();
-
- // 安全配置文件
- final SecureSettings keystore = loadSecureSettings(initialEnv);
- final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
-
- LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
- try {
- LogConfigurator.configure(environment);
- } catch (IOException e) {
- throw new BootstrapException(e);
- }
- if (JavaVersion.current().compareTo(JavaVersion.parse("11")) < 0) {
- final String message = String.format(
- Locale.ROOT,
- "future versions of Elasticsearch will require Java 11; " +
- "your Java version from [%s] does not meet this requirement",
- System.getProperty("java.home"));
- new DeprecationLogger(LogManager.getLogger(Bootstrap.class)).deprecated(message);
- }
- // 处理pidFile
- if (environment.pidFile() != null) {
- try {
- PidFile.create(environment.pidFile(), true);
- } catch (IOException e) {
- throw new BootstrapException(e);
- }
- }
-
- // 如果是后台启动,则不打印日志
- final boolean closeStandardStreams = (foreground == false) || quiet;
- try {
- if (closeStandardStreams) {
- final Logger rootLogger = LogManager.getRootLogger();
- final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
- if (maybeConsoleAppender != null) {
- Loggers.removeAppender(rootLogger, maybeConsoleAppender);
- }
- closeSystOut();
- }
-
- // fail if somebody replaced the lucene jars
- checkLucene();
-
- // 通用异常捕获
- // install the default uncaught exception handler; must be done before security is
- // initialized as we do not want to grant the runtime permission
- // setDefaultUncaughtExceptionHandler
- Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
-
- INSTANCE.setup(true, environment);
-
- try {
- // any secure settings must be read during node construction
- IOUtils.close(keystore);
- } catch (IOException e) {
- throw new BootstrapException(e);
- }
-
- INSTANCE.start();
-
- if (closeStandardStreams) {
- closeSysError();
- }
- }
这里我们可以关注下 INSTANCE.setup(true, environment);
- org.elasticsearch.bootstrap.Bootstrap
-
- private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
- Settings settings = environment.settings();
-
- try {
- spawner.spawnNativeControllers(environment);
- } catch (IOException e) {
- throw new BootstrapException(e);
- }
-
- // 检查一些mlock设定
- initializeNatives(
- environment.tmpFile(),
- BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
- BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
- BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
-
- // 探针
- // initialize probes before the security manager is installed
- initializeProbes();
-
- if (addShutdownHook) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- IOUtils.close(node, spawner);
- LoggerContext context = (LoggerContext) LogManager.getContext(false);
- Configurator.shutdown(context);
- if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
- throw new IllegalStateException("Node didn't stop within 10 seconds. " +
- "Any outstanding requests or tasks might get killed.");
- }
- } catch (IOException ex) {
- throw new ElasticsearchException("failed to stop node", ex);
- } catch (InterruptedException e) {
- LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
- Thread.currentThread().interrupt();
- }
- }
- });
- }
-
- try {
- // 检查类加载的一些问题
- // look for jar hell
- final Logger logger = LogManager.getLogger(JarHell.class);
- JarHell.checkJarHell(logger::debug);
- } catch (IOException | URISyntaxException e) {
- throw new BootstrapException(e);
- }
-
- // Log ifconfig output before SecurityManager is installed
- IfConfig.logIfNecessary();
-
- // 安全处理
- // install SM after natives, shutdown hooks, etc.
- try {
- Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
- } catch (IOException | NoSuchAlgorithmException e) {
- throw new BootstrapException(e);
- }
-
- node = new Node(environment) {
- @Override
- protected void validateNodeBeforeAcceptingRequests(
- final BootstrapContext context,
- final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
- BootstrapChecks.check(context, boundTransportAddress, checks);
- }
- };
- }
最后一句 node = new Node(environment) 初始化了节点,里面做了许多工作
- org.elasticsearch.node.Node
-
- protected Node(
- final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
- ...
- // 打印jvm信息
- final JvmInfo jvmInfo = JvmInfo.jvmInfo();
- logger.info(
- "version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
- Build.CURRENT.getQualifiedVersion(),
- jvmInfo.pid(),
- Build.CURRENT.flavor().displayName(),
- Build.CURRENT.type().displayName(),
- Build.CURRENT.hash(),
- Build.CURRENT.date(),
- Constants.OS_NAME,
- Constants.OS_VERSION,
- Constants.OS_ARCH,
- Constants.JVM_VENDOR,
- Constants.JVM_NAME,
- Constants.JAVA_VERSION,
- Constants.JVM_VERSION);
- ...
- // 初始化各类服务,以及他们相关的依赖
- this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(),
- environment.pluginsFile(), classpathPlugins);
- final Settings settings = pluginsService.updatedSettings();
- final Set<DiscoveryNodeRole> possibleRoles = Stream.concat(
- DiscoveryNodeRole.BUILT_IN_ROLES.stream(),
- pluginsService.filterPlugins(Plugin.class)
- .stream()
- .map(Plugin::getRoles)
- .flatMap(Set::stream))
- .collect(Collectors.toSet());
- DiscoveryNode.setPossibleRoles(possibleRoles);
- localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
- ...
- // guice注入
- modules.add(b -> {
- b.bind(Node.class).toInstance(this);
- b.bind(NodeService.class).toInstance(nodeService);
- b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
- b.bind(PluginsService.class).toInstance(pluginsService);
- b.bind(Client.class).toInstance(client);
- b.bind(NodeClient.class).toInstance(client);
- b.bind(Environment.class).toInstance(this.environment);
- b.bind(ThreadPool.class).toInstance(threadPool);
es 使用 guice注入框架,guice是个非常轻量级的依赖注入框架,既然各个组件都已经注入好了,我们现在可以启动了。
INSTANCE.start -> Bootstrap.start
- org.elasticsearch.bootstrap.Bootstrap
-
- private void start() throws NodeValidationException {
- node.start();
- keepAliveThread.start();
- }
node.start中启动各个组件。es中的各个组件继承了 AbstractLifecycleComponent。start方法会调用组件的doStart方法。
- org.elasticsearch.node.Node
-
- public Node start() throws NodeValidationException {
- if (!lifecycle.moveToStarted()) {
- return this;
- }
-
- logger.info("starting ...");
- pluginLifecycleComponents.forEach(LifecycleComponent::start);
-
- injector.getInstance(MappingUpdatedAction.class).setClient(client);
- injector.getInstance(IndicesService.class).start();
- injector.getInstance(IndicesClusterStateService.class).start();
- injector.getInstance(SnapshotsService.class).start();
- injector.getInstance(SnapshotShardsService.class).start();
- injector.getInstance(SearchService.class).start();
- nodeService.getMonitorService().start();
-
- final ClusterService clusterService = injector.getInstance(ClusterService.class);
-
- final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
- nodeConnectionsService.start();
- clusterService.setNodeConnectionsService(nodeConnectionsService);
- ...
具体的我们看两个比较重要的服务 transportService.start();
- org.elasticsearch.transport.TransportService
-
- @Override
- protected void doStart() {
- transport.setMessageListener(this);
- connectionManager.addListener(this);
- // 建立网络连接
- transport.start();
- if (transport.boundAddress() != null && logger.isInfoEnabled()) {
- logger.info("{}", transport.boundAddress());
- for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) {
- logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
- }
- }
- localNode = localNodeFactory.apply(transport.boundAddress());
-
- if (connectToRemoteCluster) {
- // here we start to connect to the remote clusters
- remoteClusterService.initializeRemoteClusters();
- }
- }
启动transport的实现类是 SecurityNetty4HttpServerTransport
另一个比较重要的服务,discovery.start(),具体实现类是 Coordinator
- org.elasticsearch.cluster.coordination.Coordinator
-
- @Override
- protected void doStart() {
- synchronized (mutex) {
- CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
- coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy));
- peerFinder.setCurrentTerm(getCurrentTerm());
- configuredHostsResolver.start();
- final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
- if (lastAcceptedState.metaData().clusterUUIDCommitted()) {
- logger.info("cluster UUID [{}]", lastAcceptedState.metaData().clusterUUID());
- }
- final VotingConfiguration votingConfiguration = lastAcceptedState.getLastCommittedConfiguration();
- if (singleNodeDiscovery &&
- votingConfiguration.isEmpty() == false &&
- votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) {
- throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +
- DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() +
- " does not have quorum in voting configuration " + votingConfiguration);
- }
- ...
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。