当前位置:   article > 正文

flink深入研究(02) flink运行环境的获取(上)_executionenvironment env = executionenvironment.ge

executionenvironment env = executionenvironment.getexecutionenvironment();
  1. // 获取运行环境
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

这行代码会返回一个可用的执行环境,是flink程序执行的上下文,记录了相关配,如并行度等,并提供了一系列方法,如输入流的读入方法,运行整个程序的execute方法等,对于分步式流处理程序来说,flatMap,keyBy等等操作,都可以理解为一种声明,告诉整个程序采用了什么样的算子(这段文字参考自https://www.cnblogs.com/bethunebtj/p/9168274.html),接下来我们开始进入到代码内部,看看运行环境的获取过程。

代码讲解

我们开始看代码:

  1. /**
  2. * Creates an execution environment that represents the context in which the
  3. * program is currently executed. If the program is invoked standalone, this
  4. * method returns a local execution environment, as returned by
  5. * {@link #createLocalEnvironment()}.
  6. *
  7. * @return The execution environment of the context in which the program is
  8. * executed.
  9. */
  10. public static StreamExecutionEnvironment getExecutionEnvironment() {
  11. return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
  12. .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
  13. .orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment);
  14. }

其中threadLocalContextEnvironmentFactory的定义如下:

  1. /** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
  2. private static final ThreadLocal<StreamExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory =
  3. new ThreadLocal<>();

可以看到这是一个ThreadLocal<T>类,这个类用来将变量存储在对应的线程缓存中,主要用到了ThreadLocalMap类,这个类每一个线程类都会维护,变量名称是threadLocals,这是一个map容器,线程的缓存数据存放在这个map中。ThreadLocalMap采用的是数组式存储,而HashMap采用的是拉链式存储,两者是不同的,感兴趣可以去看看源码,这里不做详细分析。

contextEnvironmentFactory变量定义代码如下

  1. /**
  2. * The environment of the context (local by default, cluster if invoked through command line).
  3. */
  4. private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;

 

resolveFactory函数,代码如下:

  1. /**
  2. * Resolves the given factories. The thread local factory has preference over the static factory.
  3. * If none is set, the method returns {@link Optional#empty()}.
  4. *
  5. * @param threadLocalFactory containing the thread local factory
  6. * @param staticFactory containing the global factory
  7. * @param <T> type of factory
  8. * @return Optional containing the resolved factory if it exists, otherwise it's empty
  9. */
  10. public static <T> Optional<T> resolveFactory(ThreadLocal<T> threadLocalFactory, @Nullable T staticFactory) {
  11. //从线程缓存中获取localFactory
  12. final T localFactory = threadLocalFactory.get();
  13. //如果线程缓存中没有找到那么就采用staticFactory
  14. final T factory = localFactory == null ? staticFactory : localFactory;
  15. //创建Optional类对象,值为facory(这里facory为null会抛出异常)
  16. return Optional.ofNullable(factory);
  17. }

map函数,代码如下:

  1. /**
  2. * If a value is present, apply the provided mapping function to it,
  3. * and if the result is non-null, return an {@code Optional} describing the
  4. * result. Otherwise return an empty {@code Optional}.
  5. *
  6. * @apiNote This method supports post-processing on optional values, without
  7. * the need to explicitly check for a return status. For example, the
  8. * following code traverses a stream of file names, selects one that has
  9. * not yet been processed, and then opens that file, returning an
  10. * {@code Optional<FileInputStream>}:
  11. *
  12. * <pre>{@code
  13. * Optional<FileInputStream> fis =
  14. * names.stream().filter(name -> !isProcessedYet(name))
  15. * .findFirst()
  16. * .map(name -> new FileInputStream(name));
  17. * }</pre>
  18. *
  19. * Here, {@code findFirst} returns an {@code Optional<String>}, and then
  20. * {@code map} returns an {@code Optional<FileInputStream>} for the desired
  21. * file if one exists.
  22. *
  23. * @param <U> The type of the result of the mapping function
  24. * @param mapper a mapping function to apply to the value, if present
  25. * @return an {@code Optional} describing the result of applying a mapping
  26. * function to the value of this {@code Optional}, if a value is present,
  27. * otherwise an empty {@code Optional}
  28. * @throws NullPointerException if the mapping function is null
  29. */
  30. public<U> Optional<U> map(Function<? super T, ? extends U> mapper) {
  31. //断言,如果mapper为null就抛出异常
  32. Objects.requireNonNull(mapper);
  33. if (!isPresent())
  34. //如果当前的Optional类对象的value变量值为null,那么就返回一个成员变量value为null的Optional类对象
  35. return empty();
  36. else {
  37. //否则创建一个StreamExecutionEnvironment类对象同时创建一个Optional类对象
  38. return Optional.ofNullable(mapper.apply(value));
  39. }
  40. }

orElseGet函数,代码如下:

  1. /**
  2. * Return the value if present, otherwise invoke {@code other} and return
  3. * the result of that invocation.
  4. *
  5. * @param other a {@code Supplier} whose result is returned if no value
  6. * is present
  7. * @return the value if present otherwise the result of {@code other.get()}
  8. * @throws NullPointerException if value is not present and {@code other} is
  9. * null
  10. */
  11. public T orElseGet(Supplier<? extends T> other) {
  12. //如果value不为null那么就采用value,否则采用other.get()
  13. return value != null ? value : other.get();
  14. }

总结一下,flink中获取环境变量的步骤是:

1、先从本地线程缓存中获取实现StreamExecutionEnvironmentFactory接口的类对象,如果没有那么采用contextEnvironmentFactory变量,并将该类对象封装在Optional类对象中,返回一个value为StreamExecutionEnvironmentFactory接口类对象的OPtional类对象---------resolveFactory函数

2、然后调用Optional类对象的map函数,如果在1中创建了StreamExecutionEnvironmentFactory接口的类对象,那么就调用该接口类对象的createExecutionEnvironment函数创建StreamExecutionEnvironment类对象,如果1中StreamExecutionEnvironmentFactory接口的类对象为null,那么就封装一个value为null的Optional类对象,返回一个value为StreamExecutionEnvironment类对象的Optional类对象-----------map函数

3、如果上面没有获取到StreamExecutionEnvironment类对象,那么就调用StreamExecutionEnvironment类中的静态函数createStreamExecutionEnvironment来获取StreamExecutionEnvironment类对象--------orElseGet函数

createStreamExecutionEnvironment函数代码如下:

  1. private static StreamExecutionEnvironment createStreamExecutionEnvironment() {
  2. // because the streaming project depends on "flink-clients" (and not the other way around)
  3. // we currently need to intercept the data set environment and create a dependent stream env.
  4. // this should be fixed once we rework the project dependencies
  5. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  6. if (env instanceof ContextEnvironment) {
  7. return new StreamContextEnvironment((ContextEnvironment) env);
  8. } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
  9. return new StreamPlanEnvironment(env);
  10. } else {
  11. return createLocalEnvironment();
  12. }
  13. }

createStreamExecutionEnvironment函数我们下篇继续,看看它里面做了些什么。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/767335
推荐阅读
相关标签
  

闽ICP备14008679号