当前位置:   article > 正文

Flink分布式缓存与累加器_flink map获取累加值

flink map获取累加值

分布式缓存

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。

当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。

其实分布式缓存就相当于spark的广播,把一个变量广播到所有的executor上,也可以看做是Flink的广播流,只不过这里广播的是一个文件.

示例

Step 1 : 注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试

Step 2 : 使用文件

注意:

在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据:

java分布式缓存代码如下:

  1. /**
  2. * <p/> DataSet 分布式缓存 </li>
  3. * <li>@author: li.pan</li>
  4. * <li>Date: 2019/12/29 16:10 下午</li>
  5. * <li>Version: V1.0</li>
  6. * <li>Description: </li>
  7. */
  8. public class JavaDataSetDistributedCacheApp {
  9.    public static void main(String[] args) throws Exception {
  10.        ExecutionEnvironment env =  ExecutionEnvironment.getExecutionEnvironment();
  11.        String filePath = "file:///Users/lipan/workspace/flink_demo/flink-local-train/src/main/resources/sink/java/cache.txt";
  12.        // step1: 注册一个本地/HDFS文件
  13.        env.registerCachedFile(filePath, "lp-java-dc");
  14.        DataSource<String> data = env.fromElements("hadoop","spark","flink","pyspark","storm");
  15.        data.map(new RichMapFunction<String, String>() {
  16.            List<String> list = new ArrayList<String>();
  17.            // step2:在open方法中获取到分布式缓存的内容即可
  18.            @Override
  19.            public void open(Configuration parameters) throws Exception {
  20.                File file = getRuntimeContext().getDistributedCache().getFile("lp-java-dc");
  21.                List<String> lines = FileUtils.readLines(file);
  22.                for(String line : lines) {
  23.                    list.add(line);
  24.                    System.out.println("line = [" + line + "]");
  25.               }
  26.           }
  27.            @Override
  28.            public String map(String value) throws Exception {
  29.                return value;
  30.           }
  31.       }).print();
  32.   }
  33. }

scala分布式缓存代码如下:

  1. /**
  2. * <p/>
  3. * <li>title: DataSet 分布式缓存</li>
  4. * <li>@author: li.pan</li>
  5. * <li>Date: 2019/11/23 2:15 下午</li>
  6. * <li>Version: V1.0</li>
  7. * <li>Description:
  8. * step1: 注册一个本地/HDFS文件
  9. * step2:在open方法中获取到分布式缓存的内容即可
  10. * </li>
  11. */
  12. object DistributedCacheApp {
  13. def main(args: Array[String]): Unit = {
  14. val env = ExecutionEnvironment.getExecutionEnvironment
  15. val filePath = "file:///Users/lipan/workspace/flink_demo/flink-local-train/src/main/resources/sink/scala/cache.txt"
  16. // step1: 注册一个本地/HDFS文件
  17. env.registerCachedFile(filePath, "pk-scala-dc")
  18. import org.apache.flink.api.scala._
  19. val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm")
  20. data.map(new RichMapFunction[String, String] {
  21. // step2:在open方法中获取到分布式缓存的内容即可
  22. override def open(parameters: Configuration): Unit = {
  23. val dcFile = getRuntimeContext.getDistributedCache().getFile("pk-scala-dc")
  24. val lines = FileUtils.readLines(dcFile) // java
  25. /**
  26. * 此时会出现一个异常:java集合和scala集合不兼容的问题
  27. */
  28. import scala.collection.JavaConverters._
  29. for (ele <- lines.asScala) {
  30. println(ele)
  31. }
  32. }
  33. override def map(value: String): String = value
  34. }).print()
  35. }
  36. }

累加器(Accumulators)

Accumulators(累加器)是非常简单的,通过一个add操作累加最终的结果,在job执行后可以获取最终结果。

最简单的累加器是counter(计数器):你可以通过Accumulator.add(V value)这个方法进行递增。在任务的最后,flink会把所有的结果进行合并,然后把最终结果发送到client端。累加器在调试或者你想更快了解你的数据的时候是非常有用的。

Flink现在有一下内置累加器,每个累加器都实现了Accumulator接口。 IntCounter, LongCounter 和 DoubleCounter:下面是一个使用计数器的例子。例如:分布式的单词统计程序。

示例:

Step 1 : 需要在你想要使用的地方创建一个自定义的transformation算子,在算子中创建一个累加器对象。

val counter = new LongCounter()

Step 2 : 注册这个累加器对象,通常在rich函数的open方法里面。在这里你也可以定义一个名字

 getRuntimeContext.addAccumulator("ele-counts-scala", counter)

Step 3 : 操作中使用这个累加器

counter.add(1)

Step 4 : 最终的总结果将会存储在从execute()中返回的JobExecutionResult对象中。(这个操作需要等待任务执行完成)

val num = jobResult.getAccumulatorResult[Long]("ele-counts-scala")

Java累加器代码如下:

  1. /**
  2. * <p/>
  3. * <li>title: flink 计数器</li>
  4. * <li>@author: li.pan</li>
  5. * <li>Date: 2019/12/29 2:59 下午</li>
  6. * <li>Version: V1.0</li>
  7. * <li>Description:
  8. * Java实现通过一个add操作累加最终的结果,在job执行后可以获取最终结果
  9. * </li>
  10. */
  11. public class JavaCounterApp {
  12. public static void main(String[] args) throws Exception {
  13. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  14. DataSource<String> data = env.fromElements("hadoop","spark","flink","pyspark","storm");
  15. DataSet<String> info = data.map(new RichMapFunction<String, String>() {
  16. LongCounter counter = new LongCounter();
  17. @Override
  18. public void open(Configuration parameters) throws Exception {
  19. super.open(parameters);
  20. getRuntimeContext().addAccumulator("ele-counts-java", counter);
  21. }
  22. @Override
  23. public String map(String value) throws Exception {
  24. counter.add(1);
  25. return value;
  26. }
  27. });
  28. String filePath = "file:///Users/lipan/workspace/flink_demo/flink-local-train/src/main/resources/sink/java/";
  29. info.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE).setParallelism(2);
  30. JobExecutionResult jobResult = env.execute("CounterApp");
  31. // step3: 获取计数器
  32. long num = jobResult.getAccumulatorResult("ele-counts-java");
  33. System.out.println("num = [" + num + "]");
  34. }
  35. }

Scala累加器代码如下:

  1. /**
  2. * <p/>

  3. * <li>Description: flink 计数器</li>
  4. * <li>@author: panli@0226@sina.com</li>

  5. * <li>Date: 2019-04-14 21:53</li>

  6. * Scala实现通过一个add操作累加最终的结果,在job执行后可以获取最终结果
  7. */
  8. object CountApp {
  9. def main(args: Array[String]): Unit = {
  10. val env = ExecutionEnvironment.getExecutionEnvironment
  11. val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm")
  12. val info = data.map(new RichMapFunction[String, String]() {
  13. // step1:定义计数器
  14. val counter = new LongCounter()
  15. override def open(parameters: Configuration): Unit = {
  16. // step2: 注册计数器
  17. getRuntimeContext.addAccumulator("ele-counts-scala", counter)
  18. }
  19. override def map(in: String): String = {
  20. counter.add(1)
  21. in
  22. }
  23. })
  24. val filePath = "file:///Users/lipan/workspace/flink_demo/flink-local-train/src/main/resources/sink/scala/"
  25. info.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2)
  26. val jobResult = env.execute("CounterApp")
  27. // step3: 获取计数器
  28. val num = jobResult.getAccumulatorResult[Long]("ele-counts-scala")
  29. println("num: " + num)
  30. }
  31. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/696212
推荐阅读
相关标签
  

闽ICP备14008679号