当前位置:   article > 正文

Flink学习系列之七 Flink序列化以及Flink Broadcast & Accumulators & Counters &Distributed Cache_flink source accumulator

flink source accumulator

1 Flink 的序列化

  • Flink自带了针对诸如int,long,String等标准类型的序列化器
  • 针对Flink无法实现序列化的数据类型,我们可以交给Avro和Kryo
    •  使用方法:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    • 使用avro序列化:env.getConfig().enableForceAvro();
    • 使用kryo序列化:env.getConfig().enableForceKryo();
    • 使用自定义序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
    • https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/custom_serializers.html

2 Flink Broadcast 

2.1DataStreaming 中的Broadcast

    把元素广播给所有的分区,数据会被重复处理 类似于storm中的allGrouping dataStream.broadcast().这里我先介绍下这个broadcast分区。在之前的文章中说道Flink DataSet的hash分区以及range分区。他们都是把数据源分给不同的线程执行。而broadcast分区则是共享同一份数据。每一个线程的数据都是相同的,并且每一个map操作的并不是线程复制的副本,而是共享的变量。

因此下面我先演示下brocast的分区,看下他的分区规则。

2.2 Broadcast分区演示

  1. public class BrocastHandler {
  2. public static void main(String[] args) throws Exception{
  3. //设置并行度为4
  4. StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);
  5. DataStreamSource<Long> streamSource = e.addSource(new SelfdDefinitionSerialSourceFunction());
  6. //broadcast分区
  7. streamSource.broadcast().map(new MapFunction<Long, Long>() {
  8. @Override
  9. public Long map(Long value) throws Exception {
  10. System.out.println("接受到的数据:" + value);
  11. return value;
  12. }
  13. }).timeWindowAll(Time.seconds(2)).sum(0).print().setParallelism(1);
  14. e.execute(BrocastHandler.class.getSimpleName());
  15. }
  16. }
  1. public class SelfdDefinitionSerialSourceFunction implements SourceFunction<Long> {
  2. private boolean ISRUNNING=true;
  3. private Long COUNT=0L;
  4. public void run(SourceContext<Long> sourceContext) throws Exception {
  5. while (ISRUNNING){
  6. sourceContext.collect(COUNT);
  7. COUNT++;
  8. Thread.sleep(1000);
  9. }
  10. }
  11. public void cancel() {
  12. ISRUNNING=false;
  13. }
  14. }

结果如下:

  1. 接受到的数据:0
  2. 接受到的数据:0
  3. 接受到的数据:0
  4. 接受到的数据:0
  5. 0
  6. 接受到的数据:1
  7. 接受到的数据:1
  8. 接受到的数据:1
  9. 接受到的数据:1
  10. 接受到的数据:2
  11. 接受到的数据:2
  12. 接受到的数据:2
  13. 接受到的数据:2

2.3 Broadcast变量

  • 广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks
  • 广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的
  • 一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
  • 用法
  • 1:初始化数据 DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
  • 2:广播数据 .withBroadcastSet(toBroadcast, "broadcastSetName");
  • 3:获取数据 Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName"); 注意:   
    • 3.1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束
    • 3.2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

2.4 Broadcast变量代码演示

需求:Flink从数据源获取到用户的姓名,最终需要把用户的姓名和年龄打印出来。

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.common.functions.RichMapFunction;
  3. import org.apache.flink.api.java.DataSet;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.configuration.Configuration;
  8. import java.util.ArrayList;
  9. import java.util.HashMap;
  10. import java.util.List;
  11. import java.util.Map;
  12. /**
  13. * brocast广播变量
  14. *
  15. * @ProjectName: FlinkLearning
  16. * @Package: com.caozg.batch
  17. * @ClassName: BrocastVarHandler
  18. * @Author: GoSaint
  19. * @CreateDate: 19-11-2 下午9:12
  20. * @UpdateDate: 19-11-2 下午9:12
  21. * @Version: 1.0
  22. */
  23. public class BrocastVarHandler {
  24. private static final String BROCASTVALNAME = "employee";
  25. public static void main(String[] args) throws Exception {
  26. ExecutionEnvironment e = ExecutionEnvironment.getExecutionEnvironment();
  27. /**
  28. * Tuple2中存储了用户的姓名和年龄
  29. */
  30. List<Tuple2<String, Integer>> data = new ArrayList<>();
  31. data.add(new Tuple2<>("zs", 18));
  32. data.add(new Tuple2<>("ls", 21));
  33. data.add(new Tuple2<>("wu", 26));
  34. DataSource<Tuple2<String, Integer>> source = e.fromCollection(data);
  35. DataSet<Map<String, Integer>> toBrocastSource = source.map(new MapFunction<Tuple2<String, Integer>, Map<String, Integer>>() {
  36. @Override
  37. public Map<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
  38. Map<String, Integer> maps = new HashMap<>();
  39. maps.put(value.f0, value.f1);
  40. return maps;
  41. }
  42. });
  43. DataSource<String> dataSource = e.fromElements("zs", "ls", "wu");
  44. dataSource.map(new RichMapFunction<String, String>() {
  45. List<Map<String, Integer>> targetList = new ArrayList<>();
  46. Map<String, Integer> targetMap = new HashMap<>();
  47. /**
  48. * open方法只会执行一次,因此可以在这个方法里面做初始化操作
  49. * @param parameters
  50. * @throws Exception
  51. */
  52. @Override
  53. public void open(Configuration parameters) throws Exception {
  54. super.open(parameters);
  55. this.targetList = getRuntimeContext().getBroadcastVariable(BROCASTVALNAME);
  56. for (Map<String, Integer> m : targetList) {
  57. this.targetMap.putAll(m);
  58. }
  59. }
  60. @Override
  61. public String map(String value) throws Exception {
  62. Integer age = targetMap.get(value);
  63. return value+" , "+age;
  64. }
  65. }).withBroadcastSet(toBrocastSource, BROCASTVALNAME).print();
  66. }
  67. }

3 Accumulators & Counters

  • Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化
  • 可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
  • Counter是一个具体的累加器(Accumulator)实现 IntCounter, LongCounter 和 DoubleCounter
  • 用法
    • 1:创建累加器 private IntCounter numLines = new IntCounter();
    • 2:注册累加器 getRuntimeContext().addAccumulator("num-lines", this.numLines);
    • 3:使用累加器 this.numLines.add(1);
    • 4:获取累加器的结果 myJobExecutionResult.getAccumulatorResult("num-lines")
  1. import org.apache.flink.api.common.JobExecutionResult;
  2. import org.apache.flink.api.common.accumulators.IntCounter;
  3. import org.apache.flink.api.common.functions.RichMapFunction;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.configuration.Configuration;
  7. /**
  8. * 累加器的演示代码
  9. * 统计map操作的次数
  10. */
  11. public class CounterHandler {
  12. private static final String COUNTERNAME = "num-lines";
  13. public static void main(String[] args) throws Exception {
  14. ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
  15. DataSource<String> dataSource = environment.fromElements("a", "b", "c", "d");
  16. dataSource.map(new RichMapFunction<String, String>() {
  17. private IntCounter numLines = new IntCounter();
  18. @Override
  19. public String map(String value) throws Exception {
  20. //使用累加器
  21. numLines.add(1);
  22. return value;
  23. }
  24. @Override
  25. public void open(Configuration parameters) throws Exception {
  26. super.open(parameters);
  27. //注册累加器
  28. getRuntimeContext().addAccumulator(COUNTERNAME, this.numLines);
  29. }
  30. //这里不能使用print sink输出
  31. }).setParallelism(4).writeAsText("/home/caozg/Desktop/data/count");
  32. JobExecutionResult job = environment.execute(CounterHandler.class.getSimpleName());
  33. int result = job.getAccumulatorResult(COUNTERNAME);
  34. System.out.println(result);//结果为4
  35. }
  36. }

4 Flink Broadcast和Accumulators的区别

Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改 Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。

5 Distributed Cache(分布式缓存)

  • Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件
  • 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它
  • 用法
    • 1:注册一个文件 env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")  
    • 2:访问数据 File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
  1. import org.apache.commons.io.FileUtils;
  2. import org.apache.flink.api.common.functions.RichMapFunction;
  3. import org.apache.flink.api.java.DataSet;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.configuration.Configuration;
  7. import java.io.File;
  8. import java.util.List;
  9. /**
  10. * 分布式缓存
  11. */
  12. public class CacheHandler {
  13. private static final String CACHEFILENAME = "cache";
  14. public static void main(String[] args) throws Exception {
  15. ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
  16. //注册缓存
  17. environment.registerCachedFile("/home/caozg/Desktop/data/a.txt", CACHEFILENAME);
  18. DataSource<String> source = environment.fromElements("a", "b", "c", "d");
  19. DataSet<String> res = source.map(new RichMapFunction<String, String>() {
  20. @Override
  21. public String map(String value) throws Exception {
  22. // TODO 这里可以获取缓存数据做业务操作
  23. return value;
  24. }
  25. @Override
  26. public void open(Configuration parameters) throws Exception {
  27. File file = getRuntimeContext().getDistributedCache().getFile(CACHEFILENAME);
  28. List<String> list = FileUtils.readLines(file);
  29. for (String s : list) {
  30. System.out.println("line:" + s);
  31. }
  32. }
  33. });
  34. res.print();
  35. }
  36. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/413107
推荐阅读
相关标签
  

闽ICP备14008679号