赞
踩
Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。
当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。
其实分布式缓存就相当于spark的广播,把一个变量广播到所有的executor上,也可以看做是Flink的广播流,只不过这里广播的是一个文件.
Step 1 : 注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
Step 2 : 使用文件
注意:
在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据:
java分布式缓存代码如下:
- /**
- * <p/> DataSet 分布式缓存 </li>
- * <li>@author: li.pan</li>
- * <li>Date: 2019/12/29 16:10 下午</li>
- * <li>Version: V1.0</li>
- * <li>Description: </li>
- */
- public class JavaDataSetDistributedCacheApp {
-
- public static void main(String[] args) throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- String filePath = "file:///Users/lipan/workspace/flink_demo/flink-local-train/src/main/resources/sink/java/cache.txt";
-
- // step1: 注册一个本地/HDFS文件
- env.registerCachedFile(filePath, "lp-java-dc");
-
- DataSource<String> data = env.fromElements("hadoop","spark","flink","pyspark","storm");
-
- data.map(new RichMapFunction<String, String>() {
-
- List<String> list = new ArrayList<String>();
- // step2:在open方法中获取到分布式缓存的内容即可
- @Override
- public void open(Configuration parameters) throws Exception {
- File file = getRuntimeContext().getDistributedCache().getFile("lp-java-dc");
- List<String> lines = FileUtils.readLines(file);
- for(String line : lines) {
- list.add(line);
- System.out.println("line = [" + line + "]");
- }
- }
-
- @Override
- public String map(String value) throws Exception {
- return value;
- }
- }).print();
-
- }
- }
-
scala分布式缓存代码如下:
- /**
- * <p/>
- * <li>title: DataSet 分布式缓存</li>
- * <li>@author: li.pan</li>
- * <li>Date: 2019/11/23 2:15 下午</li>
- * <li>Version: V1.0</li>
- * <li>Description:
- * step1: 注册一个本地/HDFS文件
- * step2:在open方法中获取到分布式缓存的内容即可
- * </li>
- */
-
- object DistributedCacheApp {
-
- def main(args: Array[String]): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val filePath = "file:///Users/lipan/workspace/flink_demo/flink-local-train/src/main/resources/sink/scala/cache.txt"
-
- // step1: 注册一个本地/HDFS文件
- env.registerCachedFile(filePath, "pk-scala-dc")
-
- import org.apache.flink.api.scala._
- val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm")
-
- data.map(new RichMapFunction[String, String] {
-
- // step2:在open方法中获取到分布式缓存的内容即可
- override def open(parameters: Configuration): Unit = {
- val dcFile = getRuntimeContext.getDistributedCache().getFile("pk-scala-dc")
-
- val lines = FileUtils.readLines(dcFile) // java
-
- /**
- * 此时会出现一个异常:java集合和scala集合不兼容的问题
- */
- import scala.collection.JavaConverters._
- for (ele <- lines.asScala) {
- println(ele)
- }
- }
-
- override def map(value: String): String = value
- }).print()
- }
-
- }
Accumulators(累加器)是非常简单的,通过一个add操作累加最终的结果,在job执行后可以获取最终结果。
最简单的累加器是counter(计数器):你可以通过Accumulator.add(V value)这个方法进行递增。在任务的最后,flink会把所有的结果进行合并,然后把最终结果发送到client端。累加器在调试或者你想更快了解你的数据的时候是非常有用的。
Flink现在有一下内置累加器,每个累加器都实现了Accumulator接口。 IntCounter, LongCounter 和 DoubleCounter:下面是一个使用计数器的例子。例如:分布式的单词统计程序。
Step 1 : 需要在你想要使用的地方创建一个自定义的transformation算子,在算子中创建一个累加器对象。
val counter = new LongCounter()
Step 2 : 注册这个累加器对象,通常在rich函数的open方法里面。在这里你也可以定义一个名字
getRuntimeContext.addAccumulator("ele-counts-scala", counter)
Step 3 : 操作中使用这个累加器
counter.add(1)
Step 4 : 最终的总结果将会存储在从execute()中返回的JobExecutionResult对象中。(这个操作需要等待任务执行完成)
val num = jobResult.getAccumulatorResult[Long]("ele-counts-scala")
Java累加器代码如下:
- /**
- * <p/>
- * <li>title: flink 计数器</li>
- * <li>@author: li.pan</li>
- * <li>Date: 2019/12/29 2:59 下午</li>
- * <li>Version: V1.0</li>
- * <li>Description:
- * Java实现通过一个add操作累加最终的结果,在job执行后可以获取最终结果
- * </li>
- */
- public class JavaCounterApp {
-
- public static void main(String[] args) throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSource<String> data = env.fromElements("hadoop","spark","flink","pyspark","storm");
-
- DataSet<String> info = data.map(new RichMapFunction<String, String>() {
-
- LongCounter counter = new LongCounter();
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- getRuntimeContext().addAccumulator("ele-counts-java", counter);
- }
-
- @Override
- public String map(String value) throws Exception {
- counter.add(1);
- return value;
- }
- });
-
- String filePath = "file:///Users/lipan/workspace/flink_demo/flink-local-train/src/main/resources/sink/java/";
- info.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE).setParallelism(2);
- JobExecutionResult jobResult = env.execute("CounterApp");
-
- // step3: 获取计数器
- long num = jobResult.getAccumulatorResult("ele-counts-java");
-
- System.out.println("num = [" + num + "]");
- }
- }
Scala累加器代码如下:
- /**
- * <p/>
- * <li>Description: flink 计数器</li>
- * <li>@author: panli@0226@sina.com</li>
- * <li>Date: 2019-04-14 21:53</li>
- * Scala实现通过一个add操作累加最终的结果,在job执行后可以获取最终结果
- */
- object CountApp {
-
- def main(args: Array[String]): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm")
-
-
- val info = data.map(new RichMapFunction[String, String]() {
- // step1:定义计数器
- val counter = new LongCounter()
-
- override def open(parameters: Configuration): Unit = {
- // step2: 注册计数器
- getRuntimeContext.addAccumulator("ele-counts-scala", counter)
- }
-
- override def map(in: String): String = {
- counter.add(1)
- in
- }
- })
-
- val filePath = "file:///Users/lipan/workspace/flink_demo/flink-local-train/src/main/resources/sink/scala/"
- info.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2)
- val jobResult = env.execute("CounterApp")
-
- // step3: 获取计数器
- val num = jobResult.getAccumulatorResult[Long]("ele-counts-scala")
-
- println("num: " + num)
- }
-
- }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。