当前位置:   article > 正文

Flink的累加器和广播变量、广播流、分布式缓存_flink 中accumulator

flink 中accumulator

1、Accumulator累加器 


Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。可以在Flink job任务中的算子函数中使用累加器,但是只能在任务执行结束后才能获得累加器的最终结果。Counter是一个具体的累加器(Accumulator)实现,常用的Counter有IntCounter,LongCounter和DoubleCounter。

用法:

  1. 1:创建累加器
  2. private IntCounter numLines = new IntCounter();
  3. 2:注册累加器
  4. getRuntimeContext().addAccumulator("num-lines",this.numLines);
  5. 3:使用累加器
  6. this.numLines.add(1);
  7. 4:获取累加器的结果
  8. myJobExcutionResult.getAccumulatorResult("num-lines")

 案列:统计map算子处理数据的条数

  1. package Flink_API;
  2. import org.apache.flink.api.common.accumulators.IntCounter;
  3. import org.apache.flink.api.common.functions.RichMapFunction;
  4. import org.apache.flink.api.java.DataSet;
  5. import org.apache.flink.api.java.ExecutionEnvironment;
  6. import org.apache.flink.api.java.operators.DataSource;
  7. import org.apache.flink.api.java.operators.MapOperator;
  8. import org.apache.flink.configuration.Configuration;
  9. /**
  10. * 统计一下map函数处理了多少条数据
  11. */
  12. public class BatchCounterTest {
  13. public static void main(String[] args) throws Exception {
  14. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  15. DataSource<String> dataSource=env.fromElements("1","2","3","4","5");
  16. DataSet<String> map = dataSource.map(new RichMapFunction<String, String>() {
  17. // 1:创建累加器
  18. private IntCounter numLines = new IntCounter();
  19. @Override
  20. public void open(Configuration parameters) throws Exception {
  21. //注册累加器
  22. getRuntimeContext().addAccumulator("num-lines", numLines);
  23. }
  24. @Override
  25. public String map(String s) throws Exception {
  26. //使用累加器
  27. numLines.add(1);
  28. return s;
  29. }
  30. }).setParallelism(5);
  31. map.print();
  32. env.execute("BatchCounterTest");
  33. }
  34. }

2、广播变量:是通过广播将广播变量分发到taskmanager中进行处理

  1. 广播变量的使用步骤:
  2. 1、初始化数据
  3. DataSet<Integer> toBroadcast = env.fromElements(1,2,3);
  4. 2、广播数据(即注册数据,那个算子用,就在那个算子后面进行注册)
  5. 算子.withBroadcastSet(toBroadcast,"broadcastSetName");
  6. 3、获取数据
  7. Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");


实例程序:Flink从数据园中静静可以获取到用户的性命,最终需要将用户的性命和年龄信息打印出来。

  1. package Flink_API;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.common.functions.RichMapFunction;
  4. import org.apache.flink.api.java.DataSet;
  5. import org.apache.flink.api.java.ExecutionEnvironment;
  6. import org.apache.flink.api.java.operators.DataSource;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.configuration.Configuration;
  9. import java.util.ArrayList;
  10. import java.util.HashMap;
  11. import java.util.List;
  12. public class BatchBroadcastTest {
  13. public static void main(String[] args){
  14. //获取Flink的运行环境
  15. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  16. //准备需要的广播数据
  17. ArrayList<Tuple2<String,Integer>> broadData=new ArrayList<>();
  18. broadData.add(new Tuple2<>("wtt",29));
  19. broadData.add(new Tuple2<>("lidong",30));
  20. broadData.add(new Tuple2<>("hengda",40));
  21. DataSource<Tuple2<String,Integer>> tupleData=env.fromCollection(broadData);
  22. //处理需要广播的数据,将数据集转换成Map类型,Map中的key就是用户的性命,value就是用户年龄。
  23. DataSet<HashMap<String,Integer>> toBroadCast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String,Integer>>() {
  24. @Override
  25. public HashMap<String, Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
  26. HashMap<String,Integer> hashMap=new HashMap<>();
  27. hashMap.put(stringIntegerTuple2.f0,stringIntegerTuple2.f1);
  28. return hashMap;
  29. }
  30. }).setParallelism(3);//到此,广播的数据已经准备好了
  31. //注意:在这里使用RichMapFunction获取广播变量
  32. //数据源单纯的姓名信息
  33. DataSource<String> nameDataSource = env.fromElements("wtt","lidong","hengda");
  34. DataSet<String> data=nameDataSource.map(new RichMapFunction<String, String>() {
  35. List<HashMap<String,Integer>> broadCastMap=new ArrayList<HashMap<String,Integer>>();
  36. HashMap<String,Integer> allMap=new HashMap<String,Integer>();
  37. /**
  38. * 1、类似MR当中的setup方法,只会执行一次
  39. * 2、可以在这里进行一些初始化操作
  40. * 3、可以在open方法当中获取广播变量
  41. */
  42. @Override
  43. public void open(Configuration parameters) throws Exception {
  44. super.open(parameters);
  45. //获取广播数据
  46. broadCastMap = getRuntimeContext().getBroadcastVariable("toBroadCastMapName");
  47. for(HashMap map:broadCastMap){
  48. allMap.putAll(map);//最终保存的格式就是{"name":"age"}
  49. }
  50. }
  51. /**
  52. *
  53. *每次条用map方法从allMap中获取数据即可
  54. */
  55. @Override
  56. public String map(String s) throws Exception {
  57. return s;
  58. }
  59. });
  60. }
  61. }

3、广播流:批处理当中就是广播变量,流处理当中就是广播流

  1. package Flink_API;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.api.common.state.BroadcastState;
  4. import org.apache.flink.api.common.state.MapStateDescriptor;
  5. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
  6. import org.apache.flink.streaming.api.TimeCharacteristic;
  7. import org.apache.flink.streaming.api.datastream.BroadcastStream;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.ProcessFunction;
  12. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
  13. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
  14. import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
  15. import org.apache.flink.util.Collector;
  16. import java.io.Serializable;
  17. import java.util.Properties;
  18. //广播流
  19. public class FlinkBroadcastStream {
  20. public static void main(String[] args) throws Exception {
  21. //创建运行环境
  22. StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
  23. //Flink是以数据自带的时间戳字段为准
  24. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  25. //设置并行度
  26. env.setParallelism(1);
  27. //1、获取第一个流,获取用户的浏览信息
  28. DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
  29. //获取用户的黑名单流信息
  30. //2、获取用户的点击信息
  31. DataStream<BlackUser> blackUserDataStream = getUserBlackUserDataStream(env);
  32. //1定义一个MapStateDescriptor来描述我们要广播的数据的格式
  33. MapStateDescriptor<String,BlackUser> descriptor=new MapStateDescriptor<String, BlackUser>("userBlackList",String.class,BlackUser.class);
  34. //2将其中的配置数据源注册成广播流
  35. BroadcastStream<BlackUser> broadcastStream = blackUserDataStream.broadcast(descriptor);
  36. //3通过connect连接主流和广播流
  37. DataStream<UserBrowseLog> filterDataStream = browseStream.connect(broadcastStream)
  38. .process(new BroadcastProcessFunction<UserBrowseLog, BlackUser, UserBrowseLog>(){
  39. @Override
  40. public void processElement(UserBrowseLog value, ReadOnlyContext readOnlyContext, Collector<UserBrowseLog> collector) throws Exception {
  41. //从广播中获取对应的keyvalue
  42. ReadOnlyBroadcastState<String,BlackUser> broadcastState=readOnlyContext.getBroadcastState(descriptor);
  43. BlackUser blackUser=broadcastState.get(value.userID);
  44. if(blackUser !=null){
  45. System.out.print("用户"+value.userID + "在黑名单中,过滤掉该用户的浏览信息");
  46. }else{
  47. collector.collect(value);
  48. }
  49. }
  50. @Override
  51. public void processBroadcastElement(BlackUser value, Context context, Collector<UserBrowseLog> collector) throws Exception {
  52. //实时更新广播流当中的数据
  53. BroadcastState<String,BlackUser> broadcastState=context.getBroadcastState(descriptor);
  54. broadcastState.put(value.userID,value);
  55. System.out.print("------------------>广播流当前的数据是:---------------->");
  56. System.out.print(broadcastState);
  57. }
  58. });
  59. filterDataStream.print();
  60. env.execute("FlinkBroadcastStream");
  61. }
  62. private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) {
  63. Properties consumerProperties = new Properties();
  64. consumerProperties.setProperty("bootstrap.severs","page01:9001");
  65. consumerProperties.setProperty("grop.id","browsegroup");
  66. DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
  67. DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {
  68. @Override
  69. public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {
  70. try{
  71. UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
  72. if(browseLog !=null){
  73. collector.collect(browseLog);
  74. }
  75. }catch(Exception e){
  76. System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
  77. }
  78. }
  79. });
  80. //设置watermark
  81. return processData;
  82. }
  83. private static DataStream<BlackUser> getUserBlackUserDataStream(StreamExecutionEnvironment env) {
  84. Properties consumerProperties = new Properties();
  85. consumerProperties.setProperty("bootstrap.severs","page01:9002");
  86. consumerProperties.setProperty("grop.id","browsegroup");
  87. DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
  88. DataStream<BlackUser> processData=dataStreamSource.process(new ProcessFunction<String, BlackUser>() {
  89. @Override
  90. public void processElement(String s, Context context, Collector<BlackUser> collector) throws Exception {
  91. try{
  92. BlackUser blackUser = com.alibaba.fastjson.JSON.parseObject(s, BlackUser.class);
  93. if(blackUser !=null){
  94. collector.collect(blackUser);
  95. }
  96. }catch(Exception e){
  97. System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
  98. }
  99. }
  100. });
  101. return processData;
  102. }
  103. //定义用户黑名单的配置信息
  104. public static class BlackUser implements Serializable{
  105. private String userID;
  106. private String userName;
  107. public BlackUser(){
  108. }
  109. public BlackUser(String userID, String userName) {
  110. this.userID = userID;
  111. this.userName = userName;
  112. }
  113. public String getUserID() {
  114. return userID;
  115. }
  116. public void setUserID(String userID) {
  117. this.userID = userID;
  118. }
  119. public String getUserName() {
  120. return userName;
  121. }
  122. public void setUserName(String userName) {
  123. this.userName = userName;
  124. }
  125. }
  126. //浏览类
  127. public static class UserBrowseLog implements Serializable {
  128. private String userID;
  129. private String eventTime;
  130. private String eventType;
  131. private String productID;
  132. private Integer productPrice;
  133. public String getUserID() {
  134. return userID;
  135. }
  136. public void setUserID(String userID) {
  137. this.userID = userID;
  138. }
  139. public String getEventTime() {
  140. return eventTime;
  141. }
  142. public void setEventTime(String eventTime) {
  143. this.eventTime = eventTime;
  144. }
  145. public String getEventType() {
  146. return eventType;
  147. }
  148. public void setEventType(String eventType) {
  149. this.eventType = eventType;
  150. }
  151. public String getProductID() {
  152. return productID;
  153. }
  154. public void setProductID(String productID) {
  155. this.productID = productID;
  156. }
  157. public Integer getProductPrice() {
  158. return productPrice;
  159. }
  160. public void setProductPrice(Integer productPrice) {
  161. this.productPrice = productPrice;
  162. }
  163. @Override
  164. public String toString() {
  165. return "UserBrowseLog{" +
  166. "userID='" + userID + '\'' +
  167. ", eventTime='" + eventTime + '\'' +
  168. ", eventType='" + eventType + '\'' +
  169. ", productID='" + productID + '\'' +
  170. ", productPrice=" + productPrice +
  171. '}';
  172. }
  173. }
  174. }

4、Flink分布式缓存Distributed Cache

  • Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
  • 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它

注册: 

  1. //获取运行环境
  2. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  3. //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
  4. env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");

使用: 

 File myFile = getRuntimeContext().getDistributedCache().getFile("a.text");

 a.text文件


hello flink hello FLINK

完整代码:

  1. public class DisCacheTest {
  2. public static void main(String[] args) throws Exception{
  3. //获取运行环境
  4. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  5. //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
  6. env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
  7. DataSource<String> data = env.fromElements("a", "b", "c", "d");
  8. DataSet<String> result = data.map(new RichMapFunction<String, String>() {
  9. private ArrayList<String> dataList = new ArrayList<String>();
  10. @Override
  11. public void open(Configuration parameters) throws Exception {
  12. super.open(parameters);
  13. //2:使用文件
  14. File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
  15. List<String> lines = FileUtils.readLines(myFile);
  16. for (String line : lines) {
  17. this.dataList.add(line);
  18. System.err.println("分布式缓存为:" + line);
  19. }
  20. }
  21. @Override
  22. public String map(String value) throws Exception {
  23. //在这里就可以使用dataList
  24. System.err.println("使用datalist:" + dataList + "------------" +value);
  25. //业务逻辑
  26. return dataList +":" + value;
  27. }
  28. });
  29. result.printToErr();
  30. }
  31. }//

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/696198
推荐阅读
相关标签
  

闽ICP备14008679号