当前位置:   article > 正文

聊聊elasticsearch的SeedHostsResolver

elasticsearch [o.e.d.seedhostsresolver ] [master] failed to resolve host

本文主要研究一下elasticsearch的SeedHostsResolver

ConfiguredHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

  1. public interface ConfiguredHostsResolver {
  2. /**
  3. * Attempt to resolve the configured unicast hosts list to a list of transport addresses.
  4. *
  5. * @param consumer Consumer for the resolved list. May not be called if an error occurs or if another resolution attempt is in
  6. * progress.
  7. */
  8. void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer);
  9. }
  • ConfiguredHostsResolver接口定义了resolveConfiguredHosts方法用于解析配置的transport address列表

SeedHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java

  1. public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {
  2. public static final Setting<Integer> LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =
  3. Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Setting.Property.NodeScope,
  4. Setting.Property.Deprecated);
  5. public static final Setting<TimeValue> LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =
  6. Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5),
  7. Setting.Property.NodeScope, Setting.Property.Deprecated);
  8. public static final Setting<Integer> DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING =
  9. Setting.intSetting("discovery.seed_resolver.max_concurrent_resolvers", 10, 0, Setting.Property.NodeScope);
  10. public static final Setting<TimeValue> DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING =
  11. Setting.positiveTimeSetting("discovery.seed_resolver.timeout", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope);
  12. private static final Logger logger = LogManager.getLogger(SeedHostsResolver.class);
  13. private final Settings settings;
  14. private final AtomicBoolean resolveInProgress = new AtomicBoolean();
  15. private final TransportService transportService;
  16. private final SeedHostsProvider hostsProvider;
  17. private final SetOnce<ExecutorService> executorService = new SetOnce<>();
  18. private final TimeValue resolveTimeout;
  19. private final String nodeName;
  20. private final int concurrentConnects;
  21. public SeedHostsResolver(String nodeName, Settings settings, TransportService transportService,
  22. SeedHostsProvider seedProvider) {
  23. this.settings = settings;
  24. this.nodeName = nodeName;
  25. this.transportService = transportService;
  26. this.hostsProvider = seedProvider;
  27. resolveTimeout = getResolveTimeout(settings);
  28. concurrentConnects = getMaxConcurrentResolvers(settings);
  29. }
  30. public static int getMaxConcurrentResolvers(Settings settings) {
  31. if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.exists(settings)) {
  32. if (DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.exists(settings)) {
  33. throw new IllegalArgumentException("it is forbidden to set both ["
  34. + DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.getKey() + "] and ["
  35. + LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.getKey() + "]");
  36. }
  37. return LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
  38. }
  39. return DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.get(settings);
  40. }
  41. public static TimeValue getResolveTimeout(Settings settings) {
  42. if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.exists(settings)) {
  43. if (DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.exists(settings)) {
  44. throw new IllegalArgumentException("it is forbidden to set both ["
  45. + DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.getKey() + "] and ["
  46. + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.getKey() + "]");
  47. }
  48. return LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
  49. }
  50. return DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.get(settings);
  51. }
  52. /**
  53. * Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of
  54. * addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up
  55. * to the specified resolve timeout.
  56. *
  57. * @param executorService the executor service used to parallelize hostname lookups
  58. * @param logger logger used for logging messages regarding hostname lookups
  59. * @param hosts the hosts to resolve
  60. * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
  61. * @param transportService the transport service
  62. * @param resolveTimeout the timeout before returning from hostname lookups
  63. * @return a list of resolved transport addresses
  64. */
  65. public static List<TransportAddress> resolveHostsLists(
  66. final ExecutorService executorService,
  67. final Logger logger,
  68. final List<String> hosts,
  69. final int limitPortCounts,
  70. final TransportService transportService,
  71. final TimeValue resolveTimeout) {
  72. Objects.requireNonNull(executorService);
  73. Objects.requireNonNull(logger);
  74. Objects.requireNonNull(hosts);
  75. Objects.requireNonNull(transportService);
  76. Objects.requireNonNull(resolveTimeout);
  77. if (resolveTimeout.nanos() < 0) {
  78. throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
  79. }
  80. // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
  81. final List<Callable<TransportAddress[]>> callables =
  82. hosts
  83. .stream()
  84. .map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
  85. .collect(Collectors.toList());
  86. final List<Future<TransportAddress[]>> futures;
  87. try {
  88. futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
  89. } catch (InterruptedException e) {
  90. Thread.currentThread().interrupt();
  91. return Collections.emptyList();
  92. }
  93. final List<TransportAddress> transportAddresses = new ArrayList<>();
  94. final Set<TransportAddress> localAddresses = new HashSet<>();
  95. localAddresses.add(transportService.boundAddress().publishAddress());
  96. localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
  97. // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
  98. // hostname with the corresponding task by iterating together
  99. final Iterator<String> it = hosts.iterator();
  100. for (final Future<TransportAddress[]> future : futures) {
  101. final String hostname = it.next();
  102. if (!future.isCancelled()) {
  103. assert future.isDone();
  104. try {
  105. final TransportAddress[] addresses = future.get();
  106. logger.trace("resolved host [{}] to {}", hostname, addresses);
  107. for (int addressId = 0; addressId < addresses.length; addressId++) {
  108. final TransportAddress address = addresses[addressId];
  109. // no point in pinging ourselves
  110. if (localAddresses.contains(address) == false) {
  111. transportAddresses.add(address);
  112. }
  113. }
  114. } catch (final ExecutionException e) {
  115. assert e.getCause() != null;
  116. final String message = "failed to resolve host [" + hostname + "]";
  117. logger.warn(message, e.getCause());
  118. } catch (InterruptedException e) {
  119. Thread.currentThread().interrupt();
  120. // ignore
  121. }
  122. } else {
  123. logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
  124. }
  125. }
  126. return Collections.unmodifiableList(transportAddresses);
  127. }
  128. @Override
  129. protected void doStart() {
  130. logger.debug("using max_concurrent_resolvers [{}], resolver timeout [{}]", concurrentConnects, resolveTimeout);
  131. final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_configured_hosts_resolver]");
  132. executorService.set(EsExecutors.newScaling(nodeName + "/" + "unicast_configured_hosts_resolver",
  133. 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, transportService.getThreadPool().getThreadContext()));
  134. }
  135. @Override
  136. protected void doStop() {
  137. ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS);
  138. }
  139. @Override
  140. protected void doClose() {
  141. }
  142. @Override
  143. public void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer) {
  144. if (lifecycle.started() == false) {
  145. logger.debug("resolveConfiguredHosts: lifecycle is {}, not proceeding", lifecycle);
  146. return;
  147. }
  148. if (resolveInProgress.compareAndSet(false, true)) {
  149. transportService.getThreadPool().generic().execute(new AbstractRunnable() {
  150. @Override
  151. public void onFailure(Exception e) {
  152. logger.debug("failure when resolving unicast hosts list", e);
  153. }
  154. @Override
  155. protected void doRun() {
  156. if (lifecycle.started() == false) {
  157. logger.debug("resolveConfiguredHosts.doRun: lifecycle is {}, not proceeding", lifecycle);
  158. return;
  159. }
  160. List<TransportAddress> providedAddresses
  161. = hostsProvider.getSeedAddresses((hosts, limitPortCounts)
  162. -> resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts,
  163. transportService, resolveTimeout));
  164. consumer.accept(providedAddresses);
  165. }
  166. @Override
  167. public void onAfter() {
  168. resolveInProgress.set(false);
  169. }
  170. @Override
  171. public String toString() {
  172. return "SeedHostsResolver resolving unicast hosts list";
  173. }
  174. });
  175. }
  176. }
  177. }
  • SeedHostsResolver继承了AbstractLifecycleComponent,同时实现了ConfiguredHostsResolver接口;它提供了getMaxConcurrentResolvers、getResolveTimeout、resolveHostsLists(使用线程池并发执行transportService.addressesFromString)这几个静态方法
  • doStart方法使用EsExecutors.newScaling创建了EsThreadPoolExecutor;doStop方法则使用ThreadPool.terminate来终止线程池
  • resolveConfiguredHosts方法首先将resolveInProgress从false设置为true,之后通过transportService.getThreadPool()执行hostsProvider.getSeedAddresses,执行完成则设置resolveInProgress为false

小结

  • ConfiguredHostsResolver接口定义了resolveConfiguredHosts方法用于解析配置的transport address列表
  • SeedHostsResolver继承了AbstractLifecycleComponent,同时实现了ConfiguredHostsResolver接口;它提供了getMaxConcurrentResolvers、getResolveTimeout、resolveHostsLists(使用线程池并发执行transportService.addressesFromString)这几个静态方法
  • doStart方法使用EsExecutors.newScaling创建了EsThreadPoolExecutor;doStop方法则使用ThreadPool.terminate来终止线程池;resolveConfiguredHosts方法首先将resolveInProgress从false设置为true,之后通过transportService.getThreadPool()执行hostsProvider.getSeedAddresses,执行完成则设置resolveInProgress为false

doc

转载于:https://my.oschina.net/go4it/blog/3043028

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/68410
推荐阅读
相关标签
  

闽ICP备14008679号