赞
踩
Flink支持广播。可以将数据广播到TaskManager上,数据存储到内存中。数据存储在内存中,这样可以减缓大量的
shuwle操作;比如在数据join阶段,不可避免的就是大量的shuwle操作,我们可以把其中一个dataSet广播出去,一直
加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuwle,导致集群性能下降;
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修
改广播变量,这样才能确保每个节点获取到的值都是一致的。
一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上
都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷
贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
可以理解广播就是一个公共的共享变量
将一个数据集广播后,不同的Task都可以在节点上获取到
每个节点 只存一份
如果不使用广播,每一个Task都会拷贝一份数据集,造成内存资源浪费
用法
在需要使用广播的操作后,使用 withBroadcastSet 创建广播
在操作中,使用getRuntimeContext.getBroadcastVariable [广播数据类型] ( 广播名 )获取广播变量
操作步骤:
示例
创建一个 学生 数据集,包含以下数据
将该数据,发布到广播。
1:初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
2:广播数据
.withBroadcastSet(toBroadcast, "broadcastSetName");
3:获取数据
Collection<Integer> broadcastSet =
getRuntimeContext().getBroadcastVariable("broadcastSetName");
package com.ccj.pxj.heima.broad
import java.util import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration object BroadCastVal { def main(args: Array[String]): Unit = { //. 获取批处理运行环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment // 2. 两个数据集 val studentData: DataSet[(Int, String)] = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五"))) val scoreData: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86))) // 3. 使用 RichMapFunction 对 成绩 数据集进行map转换 //实现 RichMapFunction val resultDataSet: DataSet[(String, String, Int)] = scoreData.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] { var studentDataSet: List[(Int, String)] = null // open方法会在map方法之前执行 //写 open 方法中,获取广播数据 override def open(parameters: Configuration): Unit = { //入 scala.collection.JavaConverters._ 隐式转换 import scala.collection.JavaConverters._ //获取广播的变量 // 将广播数据使用 asScala 转换为Scala集合,再使用toList转换为scala List 集合 studentDataSet = getRuntimeContext.getBroadcastVariable[(Int, String)]("studentData").asScala.toList } // 4. 在数据集调用 map 方法后,调用 withBroadcastSet 将 学生 数据集创建广播 override def map(in: (Int, String, Int)): (String, String, Int) = { // map 方法中使用广播进行转换 //获取学生ID val stuId: Int = in._1 //过滤出ID相同的 val tuples: List[(Int, String)] = studentDataSet.filter((x: (Int, String)) => stuId == x._1) //成绩数据(学生ID,学科,成绩) -> (学生姓名,学科,成绩) (tuples(0)._2, in._2, in._3) // } }).withBroadcastSet(studentData, "studentData") resultDataSet.print() //env.execute() } } (张三,语文,50) (李四,数学,70) (王五,英文,86)
1.广播出去的变量存放在每个节点的内存中,直到程序结束,这个数据集不能太大
2.withBroadcastSet 需要在要使用到广播的操作后调用
3.需要手动导入 scala.collection.JavaConverters._将Java集合转换为scala集合
Accumulator 即累加器,与
MapReduce counter 的应用场景差不多,都能很好地观察task在运行期间的数据变化
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Flink现在有以下内置累加器。每个累加器都实现了Accumulator接口。
IntCounter
LongCounter
DoubleCounter
操作步骤:
示例:
遍历下列数据, 打印出单词的总数
开发步骤:
1.获取批处理环境
2.加载本地集合
3. map转换
1.定义累加器
2.注册累加器
3.累加数据
4.数据写入到文件中
5.执行任务,获取任务执行结果对象(JobExecutionResult)
6.获取累加器数值
7.打印数值
代码:
1:创建累加器private IntCounter numLines = new IntCounter();
2:注册累加器getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:使用累加器this.numLines.add(1);
4:获取累加器的结果myJobExecutionResult.getAccumulatorResult("num-lines")
package com.ccj.pxj.heima.broad import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.accumulators.IntCounter import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.FileSystem.WriteMode /** * 1. 获取批处理环境 * 2. 加载本地集合 * 3. map转换 * 1. 定义累加器 * 2. 注册累加器 * 3. 累加数据 * 4. 数据写入到文件中 * 5. 执行任务,获取任务执行结果对象(JobExecutionResult) * 6. 获取累加器数值 * 7. 打印数值 */ object CounterDemo { def main(args: Array[String]): Unit = { //1. 获取批处理环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //2. 加载本地集合 val datas: DataSet[String] = env.fromElements("a", "b", "c", "d") // 3. map转换 val resultDataSet: DataSet[String] = datas.map(new RichMapFunction[String, String] { // 1. 创建累加器 val counter = new IntCounter override def open(parameters: Configuration): Unit = { // 2. 注册累加器 // 参数1: 累加器的名称 参数2:累加器对象 getRuntimeContext.addAccumulator("wordsCount", counter) } override def map(value: String): String = { counter.add(1) value } }) // 4. 输出到文件 resultDataSet.writeAsText("./data/a.txt",WriteMode.OVERWRITE) val jobExecutionResult: JobExecutionResult = env.execute("counterDemo") val value: Int = jobExecutionResult.getAccumulatorResult[Int]("wordsCount") println(value) } } log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 4 Process finished with exit code 0
Flink Broadcast和Accumulators的区别
Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量
可以进行共享,但是不可以进行修改
Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作
Flink的分布式缓存
Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享
外部静态的数据,例如:机器学习的逻辑回归模型等!
缓存的使用流程:
使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓
存文件。当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字
去该节点的本地文件系统中检索该文件!
注意:广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上
package com.ccj.pxj.heima.broad import java.io.File import org.apache.commons.io.FileUtils import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration object CachFile { def main(args: Array[String]): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val data: DataSet[String] = env.fromCollection(List("a", "b", "c", "d")) //注册文件 // 参数1:文件路径,可以是HDFS的路径,参数2:文件的名称,自定义 env.registerCachedFile("./data/d.txt","d.txt") val result: DataSet[String] = data.map(new RichMapFunction[String, String] { // 获取文件 override def open(parameters: Configuration): Unit = { val cacheFile: File = getRuntimeContext.getDistributedCache.getFile("d.txt") // 打印文件内容 val str: String = FileUtils.readFileToString(cacheFile) println(str) } override def map(value: String): String = { value } }) result.print() } }
作者:pxj
日期:2021-07-29 12:46:58
你若安好,便是晴天。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。