当前位置:   article > 正文

聊聊flink的HistoryServer

failed to access job archive location for path hdfs

本文主要研究一下flink的HistoryServer

HistoryServer

flink-1.7.2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java

  1. public class HistoryServer {
  2. private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class);
  3. private final Configuration config;
  4. private final String webAddress;
  5. private final int webPort;
  6. private final long webRefreshIntervalMillis;
  7. private final File webDir;
  8. private final HistoryServerArchiveFetcher archiveFetcher;
  9. @Nullable
  10. private final SSLHandlerFactory serverSSLFactory;
  11. private WebFrontendBootstrap netty;
  12. private final Object startupShutdownLock = new Object();
  13. private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
  14. private final Thread shutdownHook;
  15. public static void main(String[] args) throws Exception {
  16. ParameterTool pt = ParameterTool.fromArgs(args);
  17. String configDir = pt.getRequired("configDir");
  18. LOG.info("Loading configuration from {}", configDir);
  19. final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir);
  20. try {
  21. FileSystem.initialize(flinkConfig);
  22. } catch (IOException e) {
  23. throw new Exception("Error while setting the default filesystem scheme from configuration.", e);
  24. }
  25. // run the history server
  26. SecurityUtils.install(new SecurityConfiguration(flinkConfig));
  27. try {
  28. SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
  29. @Override
  30. public Integer call() throws Exception {
  31. HistoryServer hs = new HistoryServer(flinkConfig);
  32. hs.run();
  33. return 0;
  34. }
  35. });
  36. System.exit(0);
  37. } catch (Throwable t) {
  38. final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
  39. LOG.error("Failed to run HistoryServer.", strippedThrowable);
  40. strippedThrowable.printStackTrace();
  41. System.exit(1);
  42. }
  43. }
  44. public HistoryServer(Configuration config) throws IOException, FlinkException {
  45. this(config, new CountDownLatch(0));
  46. }
  47. public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException {
  48. Preconditions.checkNotNull(config);
  49. Preconditions.checkNotNull(numFinishedPolls);
  50. this.config = config;
  51. if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
  52. LOG.info("Enabling SSL for the history server.");
  53. try {
  54. this.serverSSLFactory = SSLUtils.createRestServerSSLEngineFactory(config);
  55. } catch (Exception e) {
  56. throw new IOException("Failed to initialize SSLContext for the history server.", e);
  57. }
  58. } else {
  59. this.serverSSLFactory = null;
  60. }
  61. webAddress = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
  62. webPort = config.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT);
  63. webRefreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_WEB_REFRESH_INTERVAL);
  64. String webDirectory = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR);
  65. if (webDirectory == null) {
  66. webDirectory = System.getProperty("java.io.tmpdir") + File.separator + "flink-web-history-" + UUID.randomUUID();
  67. }
  68. webDir = new File(webDirectory);
  69. String refreshDirectories = config.getString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
  70. if (refreshDirectories == null) {
  71. throw new FlinkException(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not configured.");
  72. }
  73. List<RefreshLocation> refreshDirs = new ArrayList<>();
  74. for (String refreshDirectory : refreshDirectories.split(",")) {
  75. try {
  76. Path refreshPath = WebMonitorUtils.validateAndNormalizeUri(new Path(refreshDirectory).toUri());
  77. FileSystem refreshFS = refreshPath.getFileSystem();
  78. refreshDirs.add(new RefreshLocation(refreshPath, refreshFS));
  79. } catch (Exception e) {
  80. // there's most likely something wrong with the path itself, so we ignore it from here on
  81. LOG.warn("Failed to create Path or FileSystem for directory '{}'. Directory will not be monitored.", refreshDirectory, e);
  82. }
  83. }
  84. if (refreshDirs.isEmpty()) {
  85. throw new FlinkException("Failed to validate any of the configured directories to monitor.");
  86. }
  87. long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
  88. archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);
  89. this.shutdownHook = ShutdownHookUtil.addShutdownHook(
  90. HistoryServer.this::stop,
  91. HistoryServer.class.getSimpleName(),
  92. LOG);
  93. }
  94. @VisibleForTesting
  95. int getWebPort() {
  96. return netty.getServerPort();
  97. }
  98. public void run() {
  99. try {
  100. start();
  101. new CountDownLatch(1).await();
  102. } catch (Exception e) {
  103. LOG.error("Failure while running HistoryServer.", e);
  104. } finally {
  105. stop();
  106. }
  107. }
  108. // ------------------------------------------------------------------------
  109. // Life-cycle
  110. // ------------------------------------------------------------------------
  111. void start() throws IOException, InterruptedException {
  112. synchronized (startupShutdownLock) {
  113. LOG.info("Starting history server.");
  114. Files.createDirectories(webDir.toPath());
  115. LOG.info("Using directory {} as local cache.", webDir);
  116. Router router = new Router();
  117. router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));
  118. if (!webDir.exists() && !webDir.mkdirs()) {
  119. throw new IOException("Failed to create local directory " + webDir.getAbsoluteFile() + ".");
  120. }
  121. createDashboardConfigFile();
  122. archiveFetcher.start();
  123. netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLFactory, webAddress, webPort, config);
  124. }
  125. }
  126. void stop() {
  127. if (shutdownRequested.compareAndSet(false, true)) {
  128. synchronized (startupShutdownLock) {
  129. LOG.info("Stopping history server.");
  130. try {
  131. netty.shutdown();
  132. } catch (Throwable t) {
  133. LOG.warn("Error while shutting down WebFrontendBootstrap.", t);
  134. }
  135. archiveFetcher.stop();
  136. try {
  137. LOG.info("Removing web dashboard root cache directory {}", webDir);
  138. FileUtils.deleteDirectory(webDir);
  139. } catch (Throwable t) {
  140. LOG.warn("Error while deleting web root directory {}", webDir, t);
  141. }
  142. LOG.info("Stopped history server.");
  143. // Remove shutdown hook to prevent resource leaks
  144. ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
  145. }
  146. }
  147. }
  148. // ------------------------------------------------------------------------
  149. // File generation
  150. // ------------------------------------------------------------------------
  151. static FileWriter createOrGetFile(File folder, String name) throws IOException {
  152. File file = new File(folder, name + ".json");
  153. if (!file.exists()) {
  154. Files.createFile(file.toPath());
  155. }
  156. FileWriter fr = new FileWriter(file);
  157. return fr;
  158. }
  159. private void createDashboardConfigFile() throws IOException {
  160. try (FileWriter fw = createOrGetFile(webDir, "config")) {
  161. fw.write(createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now())));
  162. fw.flush();
  163. } catch (IOException ioe) {
  164. LOG.error("Failed to write config file.");
  165. throw ioe;
  166. }
  167. }
  168. private static String createConfigJson(DashboardConfiguration dashboardConfiguration) throws IOException {
  169. StringWriter writer = new StringWriter();
  170. JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
  171. gen.writeStartObject();
  172. gen.writeNumberField(DashboardConfiguration.FIELD_NAME_REFRESH_INTERVAL, dashboardConfiguration.getRefreshInterval());
  173. gen.writeNumberField(DashboardConfiguration.FIELD_NAME_TIMEZONE_OFFSET, dashboardConfiguration.getTimeZoneOffset());
  174. gen.writeStringField(DashboardConfiguration.FIELD_NAME_TIMEZONE_NAME, dashboardConfiguration.getTimeZoneName());
  175. gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion());
  176. gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision());
  177. gen.writeEndObject();
  178. gen.close();
  179. return writer.toString();
  180. }
  181. /**
  182. * Container for the {@link Path} and {@link FileSystem} of a refresh directory.
  183. */
  184. static class RefreshLocation {
  185. private final Path path;
  186. private final FileSystem fs;
  187. private RefreshLocation(Path path, FileSystem fs) {
  188. this.path = path;
  189. this.fs = fs;
  190. }
  191. public Path getPath() {
  192. return path;
  193. }
  194. public FileSystem getFs() {
  195. return fs;
  196. }
  197. }
  198. }
  199. 复制代码
  • HistoryServer提供了finished jobs的相关查询功能;构造器从配置中读取historyserver.web.address、historyserver.web.port(默认8082)、historyserver.web.refresh-interval(默认10秒)、historyserver.web.tmpdir、historyserver.archive.fs.dir、historyserver.archive.fs.refresh-interval(默认10秒),然后创建了HistoryServerArchiveFetcher
  • 其run方法主要是调用start方法,该方法主要是启动HistoryServerArchiveFetcher,然后创建WebFrontendBootstrap
  • 构造器使用ShutdownHookUtil.addShutdownHook注册了ShutdownHook,在shutdown时执行stop方法,stop方法主要是调用WebFrontendBootstrap的shutdown方法以及HistoryServerArchiveFetcher的stop方法,然后清理webDir,移除shutdownHook

HistoryServerArchiveFetcher

flink-1.7.2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java

  1. class HistoryServerArchiveFetcher {
  2. private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
  3. private static final JsonFactory jacksonFactory = new JsonFactory();
  4. private static final ObjectMapper mapper = new ObjectMapper();
  5. private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
  6. new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
  7. private final JobArchiveFetcherTask fetcherTask;
  8. private final long refreshIntervalMillis;
  9. HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
  10. this.refreshIntervalMillis = refreshIntervalMillis;
  11. this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numFinishedPolls);
  12. if (LOG.isInfoEnabled()) {
  13. for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
  14. LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath());
  15. }
  16. }
  17. }
  18. void start() {
  19. executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS);
  20. }
  21. void stop() {
  22. executor.shutdown();
  23. try {
  24. if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
  25. executor.shutdownNow();
  26. }
  27. } catch (InterruptedException ignored) {
  28. executor.shutdownNow();
  29. }
  30. }
  31. /**
  32. * {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for
  33. * new job archives.
  34. */
  35. static class JobArchiveFetcherTask extends TimerTask {
  36. private final List<HistoryServer.RefreshLocation> refreshDirs;
  37. private final CountDownLatch numFinishedPolls;
  38. /** Cache of all available jobs identified by their id. */
  39. private final Set<String> cachedArchives;
  40. private final File webDir;
  41. private final File webJobDir;
  42. private final File webOverviewDir;
  43. private static final String JSON_FILE_ENDING = ".json";
  44. JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
  45. this.refreshDirs = checkNotNull(refreshDirs);
  46. this.numFinishedPolls = numFinishedPolls;
  47. this.cachedArchives = new HashSet<>();
  48. this.webDir = checkNotNull(webDir);
  49. this.webJobDir = new File(webDir, "jobs");
  50. webJobDir.mkdir();
  51. this.webOverviewDir = new File(webDir, "overviews");
  52. webOverviewDir.mkdir();
  53. }
  54. @Override
  55. public void run() {
  56. try {
  57. for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
  58. Path refreshDir = refreshLocation.getPath();
  59. FileSystem refreshFS = refreshLocation.getFs();
  60. // contents of /:refreshDir
  61. FileStatus[] jobArchives;
  62. try {
  63. jobArchives = refreshFS.listStatus(refreshDir);
  64. } catch (IOException e) {
  65. LOG.error("Failed to access job archive location for path {}.", refreshDir, e);
  66. continue;
  67. }
  68. if (jobArchives == null) {
  69. continue;
  70. }
  71. boolean updateOverview = false;
  72. for (FileStatus jobArchive : jobArchives) {
  73. Path jobArchivePath = jobArchive.getPath();
  74. String jobID = jobArchivePath.getName();
  75. try {
  76. JobID.fromHexString(jobID);
  77. } catch (IllegalArgumentException iae) {
  78. LOG.debug("Archive directory {} contained file with unexpected name {}. Ignoring file.",
  79. refreshDir, jobID, iae);
  80. continue;
  81. }
  82. if (cachedArchives.add(jobID)) {
  83. try {
  84. for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) {
  85. String path = archive.getPath();
  86. String json = archive.getJson();
  87. File target;
  88. if (path.equals(JobsOverviewHeaders.URL)) {
  89. target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
  90. } else if (path.equals("/joboverview")) { // legacy path
  91. json = convertLegacyJobOverview(json);
  92. target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
  93. } else {
  94. target = new File(webDir, path + JSON_FILE_ENDING);
  95. }
  96. java.nio.file.Path parent = target.getParentFile().toPath();
  97. try {
  98. Files.createDirectories(parent);
  99. } catch (FileAlreadyExistsException ignored) {
  100. // there may be left-over directories from the previous attempt
  101. }
  102. java.nio.file.Path targetPath = target.toPath();
  103. // We overwrite existing files since this may be another attempt at fetching this archive.
  104. // Existing files may be incomplete/corrupt.
  105. Files.deleteIfExists(targetPath);
  106. Files.createFile(target.toPath());
  107. try (FileWriter fw = new FileWriter(target)) {
  108. fw.write(json);
  109. fw.flush();
  110. }
  111. }
  112. updateOverview = true;
  113. } catch (IOException e) {
  114. LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
  115. // Make sure we attempt to fetch the archive again
  116. cachedArchives.remove(jobID);
  117. // Make sure we do not include this job in the overview
  118. try {
  119. Files.delete(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
  120. } catch (IOException ioe) {
  121. LOG.debug("Could not delete file from overview directory.", ioe);
  122. }
  123. // Clean up job files we may have created
  124. File jobDirectory = new File(webJobDir, jobID);
  125. try {
  126. FileUtils.deleteDirectory(jobDirectory);
  127. } catch (IOException ioe) {
  128. LOG.debug("Could not clean up job directory.", ioe);
  129. }
  130. }
  131. }
  132. }
  133. if (updateOverview) {
  134. updateJobOverview(webOverviewDir, webDir);
  135. }
  136. }
  137. } catch (Exception e) {
  138. LOG.error("Critical failure while fetching/processing job archives.", e);
  139. }
  140. numFinishedPolls.countDown();
  141. }
  142. }
  143. private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
  144. JsonNode root = mapper.readTree(legacyOverview);
  145. JsonNode finishedJobs = root.get("finished");
  146. JsonNode job = finishedJobs.get(0);
  147. JobID jobId = JobID.fromHexString(job.get("jid").asText());
  148. String name = job.get("name").asText();
  149. JobStatus state = JobStatus.valueOf(job.get("state").asText());
  150. long startTime = job.get("start-time").asLong();
  151. long endTime = job.get("end-time").asLong();
  152. long duration = job.get("duration").asLong();
  153. long lastMod = job.get("last-modification").asLong();
  154. JsonNode tasks = job.get("tasks");
  155. int numTasks = tasks.get("total").asInt();
  156. int pending = tasks.get("pending").asInt();
  157. int running = tasks.get("running").asInt();
  158. int finished = tasks.get("finished").asInt();
  159. int canceling = tasks.get("canceling").asInt();
  160. int canceled = tasks.get("canceled").asInt();
  161. int failed = tasks.get("failed").asInt();
  162. int[] tasksPerState = new int[ExecutionState.values().length];
  163. // pending is a mix of CREATED/SCHEDULED/DEPLOYING
  164. // to maintain the correct number of task states we have to pick one of them
  165. tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
  166. tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
  167. tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
  168. tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
  169. tasksPerState[ExecutionState.CANCELED.ordinal()] = canceled;
  170. tasksPerState[ExecutionState.FAILED.ordinal()] = failed;
  171. JobDetails jobDetails = new JobDetails(jobId, name, startTime, endTime, duration, state, lastMod, tasksPerState, numTasks);
  172. MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(jobDetails));
  173. StringWriter sw = new StringWriter();
  174. mapper.writeValue(sw, multipleJobsDetails);
  175. return sw.toString();
  176. }
  177. /**
  178. * This method replicates the JSON response that would be given by the JobsOverviewHandler when
  179. * listing both running and finished jobs.
  180. *
  181. * <p>Every job archive contains a joboverview.json file containing the same structure. Since jobs are archived on
  182. * their own however the list of finished jobs only contains a single job.
  183. *
  184. * <p>For the display in the HistoryServer WebFrontend we have to combine these overviews.
  185. */
  186. private static void updateJobOverview(File webOverviewDir, File webDir) {
  187. try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) {
  188. File[] overviews = new File(webOverviewDir.getPath()).listFiles();
  189. if (overviews != null) {
  190. Collection<JobDetails> allJobs = new ArrayList<>(overviews.length);
  191. for (File overview : overviews) {
  192. MultipleJobsDetails subJobs = mapper.readValue(overview, MultipleJobsDetails.class);
  193. allJobs.addAll(subJobs.getJobs());
  194. }
  195. mapper.writeValue(gen, new MultipleJobsDetails(allJobs));
  196. }
  197. } catch (IOException ioe) {
  198. LOG.error("Failed to update job overview.", ioe);
  199. }
  200. }
  201. }
  202. 复制代码
  • HistoryServerArchiveFetcher主要是以historyserver.archive.fs.refresh-interval的时间间隔从historyserver.archive.fs.dir目录拉取job archives;它内部创建了JobArchiveFetcherTask来执行这个任务
  • JobArchiveFetcherTask继承了jdk的TimerTask,其run方法就是遍历refreshDirs,然后执行FileSystem.listStatus,然后使用FsJobArchivist.getArchivedJsons获取ArchivedJson根据不同path写入到指定文件
  • 如果path是/jobs/overview,则写入webDir/overviews/jobID.json文件;如果path是/joboverview,则先调用convertLegacyJobOverview转换json,然后再写入webDir/overviews/jobID.json文件;其他的path则写入webDir/path.json文件

WebFrontendBootstrap

flink-1.7.2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java

  1. public class WebFrontendBootstrap {
  2. private final Router router;
  3. private final Logger log;
  4. private final File uploadDir;
  5. private final ServerBootstrap bootstrap;
  6. private final Channel serverChannel;
  7. private final String restAddress;
  8. public WebFrontendBootstrap(
  9. Router router,
  10. Logger log,
  11. File directory,
  12. @Nullable SSLHandlerFactory serverSSLFactory,
  13. String configuredAddress,
  14. int configuredPort,
  15. final Configuration config) throws InterruptedException, UnknownHostException {
  16. this.router = Preconditions.checkNotNull(router);
  17. this.log = Preconditions.checkNotNull(log);
  18. this.uploadDir = directory;
  19. ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
  20. @Override
  21. protected void initChannel(SocketChannel ch) {
  22. RouterHandler handler = new RouterHandler(WebFrontendBootstrap.this.router, new HashMap<>());
  23. // SSL should be the first handler in the pipeline
  24. if (serverSSLFactory != null) {
  25. ch.pipeline().addLast("ssl", serverSSLFactory.createNettySSLHandler());
  26. }
  27. ch.pipeline()
  28. .addLast(new HttpServerCodec())
  29. .addLast(new ChunkedWriteHandler())
  30. .addLast(new HttpRequestHandler(uploadDir))
  31. .addLast(handler.getName(), handler)
  32. .addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log));
  33. }
  34. };
  35. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  36. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  37. this.bootstrap = new ServerBootstrap();
  38. this.bootstrap
  39. .group(bossGroup, workerGroup)
  40. .channel(NioServerSocketChannel.class)
  41. .childHandler(initializer);
  42. ChannelFuture ch;
  43. if (configuredAddress == null) {
  44. ch = this.bootstrap.bind(configuredPort);
  45. } else {
  46. ch = this.bootstrap.bind(configuredAddress, configuredPort);
  47. }
  48. this.serverChannel = ch.sync().channel();
  49. InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
  50. InetAddress inetAddress = bindAddress.getAddress();
  51. final String address;
  52. if (inetAddress.isAnyLocalAddress()) {
  53. address = config.getString(JobManagerOptions.ADDRESS, InetAddress.getLocalHost().getHostName());
  54. } else {
  55. address = inetAddress.getHostAddress();
  56. }
  57. int port = bindAddress.getPort();
  58. this.log.info("Web frontend listening at {}" + ':' + "{}", address, port);
  59. final String protocol = serverSSLFactory != null ? "https://" : "http://";
  60. this.restAddress = protocol + address + ':' + port;
  61. }
  62. public ServerBootstrap getBootstrap() {
  63. return bootstrap;
  64. }
  65. public int getServerPort() {
  66. Channel server = this.serverChannel;
  67. if (server != null) {
  68. try {
  69. return ((InetSocketAddress) server.localAddress()).getPort();
  70. }
  71. catch (Exception e) {
  72. log.error("Cannot access local server port", e);
  73. }
  74. }
  75. return -1;
  76. }
  77. public String getRestAddress() {
  78. return restAddress;
  79. }
  80. public void shutdown() {
  81. if (this.serverChannel != null) {
  82. this.serverChannel.close().awaitUninterruptibly();
  83. }
  84. if (bootstrap != null) {
  85. if (bootstrap.group() != null) {
  86. bootstrap.group().shutdownGracefully();
  87. }
  88. if (bootstrap.childGroup() != null) {
  89. bootstrap.childGroup().shutdownGracefully();
  90. }
  91. }
  92. }
  93. }
  94. 复制代码
  • WebFrontendBootstrap使用netty启动了一个http server,其pipeline有HttpServerCodec、ChunkedWriteHandler、HttpRequestHandler、RouterHandler、PipelineErrorHandler;其中这里的RouterHandler的Router有个GET的route,其使用的是HistoryServerStaticFileServerHandler,用于给HistoryServer提供静态文件服务

小结

  • HistoryServer提供了finished jobs的相关查询功能;其主要由HistoryServerArchiveFetcher以及WebFrontendBootstrap两部分组成;其run方法主要是调用start方法,该方法主要是启动HistoryServerArchiveFetcher,然后创建WebFrontendBootstrap
  • HistoryServerArchiveFetcher主要是以historyserver.archive.fs.refresh-interval的时间间隔从historyserver.archive.fs.dir目录拉取job archives;它内部创建了JobArchiveFetcherTask来执行这个任务;JobArchiveFetcherTask继承了jdk的TimerTask,其run方法就是遍历refreshDirs,然后执行FileSystem.listStatus,然后使用FsJobArchivist.getArchivedJsons获取ArchivedJson根据不同path写入到指定文件
  • WebFrontendBootstrap使用netty启动了一个http server,其pipeline有HttpServerCodec、ChunkedWriteHandler、HttpRequestHandler、RouterHandler、PipelineErrorHandler;其中这里的RouterHandler的Router有个GET的route,其使用的是HistoryServerStaticFileServerHandler,用于给HistoryServer提供静态文件服务

doc

转载于:https://juejin.im/post/5c8482ce5188257a323f52b9

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/558643
推荐阅读
相关标签
  

闽ICP备14008679号