赞
踩
1、Accumulator累加器
Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。可以在Flink job任务中的算子函数中使用累加器,但是只能在任务执行结束后才能获得累加器的最终结果。Counter是一个具体的累加器(Accumulator)实现,常用的Counter有IntCounter,LongCounter和DoubleCounter。
用法:
- 1:创建累加器
- private IntCounter numLines = new IntCounter();
- 2:注册累加器
- getRuntimeContext().addAccumulator("num-lines",this.numLines);
- 3:使用累加器
- this.numLines.add(1);
- 4:获取累加器的结果
- myJobExcutionResult.getAccumulatorResult("num-lines")
案列:统计map算子处理数据的条数
- package Flink_API;
-
- import org.apache.flink.api.common.accumulators.IntCounter;
- import org.apache.flink.api.common.functions.RichMapFunction;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.operators.MapOperator;
- import org.apache.flink.configuration.Configuration;
-
- /**
- * 统计一下map函数处理了多少条数据
- */
- public class BatchCounterTest {
- public static void main(String[] args) throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSource<String> dataSource=env.fromElements("1","2","3","4","5");
-
- DataSet<String> map = dataSource.map(new RichMapFunction<String, String>() {
- // 1:创建累加器
- private IntCounter numLines = new IntCounter();
-
- @Override
- public void open(Configuration parameters) throws Exception {
- //注册累加器
- getRuntimeContext().addAccumulator("num-lines", numLines);
- }
-
- @Override
- public String map(String s) throws Exception {
- //使用累加器
- numLines.add(1);
- return s;
- }
- }).setParallelism(5);
- map.print();
- env.execute("BatchCounterTest");
- }
- }
2、广播变量:是通过广播将广播变量分发到taskmanager中进行处理
- 广播变量的使用步骤:
- 1、初始化数据
- DataSet<Integer> toBroadcast = env.fromElements(1,2,3);
- 2、广播数据(即注册数据,那个算子用,就在那个算子后面进行注册)
- 算子.withBroadcastSet(toBroadcast,"broadcastSetName");
- 3、获取数据
- Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
实例程序:Flink从数据园中静静可以获取到用户的性命,最终需要将用户的性命和年龄信息打印出来。
- package Flink_API;
-
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.RichMapFunction;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.configuration.Configuration;
-
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
-
- public class BatchBroadcastTest {
- public static void main(String[] args){
- //获取Flink的运行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- //准备需要的广播数据
- ArrayList<Tuple2<String,Integer>> broadData=new ArrayList<>();
- broadData.add(new Tuple2<>("wtt",29));
- broadData.add(new Tuple2<>("lidong",30));
- broadData.add(new Tuple2<>("hengda",40));
- DataSource<Tuple2<String,Integer>> tupleData=env.fromCollection(broadData);
- //处理需要广播的数据,将数据集转换成Map类型,Map中的key就是用户的性命,value就是用户年龄。
- DataSet<HashMap<String,Integer>> toBroadCast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String,Integer>>() {
- @Override
- public HashMap<String, Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
- HashMap<String,Integer> hashMap=new HashMap<>();
- hashMap.put(stringIntegerTuple2.f0,stringIntegerTuple2.f1);
- return hashMap;
- }
- }).setParallelism(3);//到此,广播的数据已经准备好了
-
- //注意:在这里使用RichMapFunction获取广播变量
- //数据源单纯的姓名信息
- DataSource<String> nameDataSource = env.fromElements("wtt","lidong","hengda");
-
- DataSet<String> data=nameDataSource.map(new RichMapFunction<String, String>() {
-
- List<HashMap<String,Integer>> broadCastMap=new ArrayList<HashMap<String,Integer>>();
- HashMap<String,Integer> allMap=new HashMap<String,Integer>();
-
- /**
- * 1、类似MR当中的setup方法,只会执行一次
- * 2、可以在这里进行一些初始化操作
- * 3、可以在open方法当中获取广播变量
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- //获取广播数据
- broadCastMap = getRuntimeContext().getBroadcastVariable("toBroadCastMapName");
- for(HashMap map:broadCastMap){
- allMap.putAll(map);//最终保存的格式就是{"name":"age"}
-
- }
- }
- /**
- *
- *每次条用map方法从allMap中获取数据即可
- */
- @Override
- public String map(String s) throws Exception {
- return s;
- }
- });
- }
- }
3、广播流:批处理当中就是广播变量,流处理当中就是广播流
- package Flink_API;
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.api.common.state.BroadcastState;
- import org.apache.flink.api.common.state.MapStateDescriptor;
- import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.BroadcastStream;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
- import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
- import org.apache.flink.util.Collector;
-
- import java.io.Serializable;
- import java.util.Properties;
-
- //广播流
- public class FlinkBroadcastStream {
-
- public static void main(String[] args) throws Exception {
- //创建运行环境
- StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
- //Flink是以数据自带的时间戳字段为准
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- //设置并行度
- env.setParallelism(1);
-
- //1、获取第一个流,获取用户的浏览信息
- DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);
- //获取用户的黑名单流信息
- //2、获取用户的点击信息
- DataStream<BlackUser> blackUserDataStream = getUserBlackUserDataStream(env);
-
- //1定义一个MapStateDescriptor来描述我们要广播的数据的格式
- MapStateDescriptor<String,BlackUser> descriptor=new MapStateDescriptor<String, BlackUser>("userBlackList",String.class,BlackUser.class);
-
- //2将其中的配置数据源注册成广播流
- BroadcastStream<BlackUser> broadcastStream = blackUserDataStream.broadcast(descriptor);
-
-
- //3通过connect连接主流和广播流
- DataStream<UserBrowseLog> filterDataStream = browseStream.connect(broadcastStream)
- .process(new BroadcastProcessFunction<UserBrowseLog, BlackUser, UserBrowseLog>(){
- @Override
- public void processElement(UserBrowseLog value, ReadOnlyContext readOnlyContext, Collector<UserBrowseLog> collector) throws Exception {
- //从广播中获取对应的key的value
- ReadOnlyBroadcastState<String,BlackUser> broadcastState=readOnlyContext.getBroadcastState(descriptor);
- BlackUser blackUser=broadcastState.get(value.userID);
- if(blackUser !=null){
- System.out.print("用户"+value.userID + "在黑名单中,过滤掉该用户的浏览信息");
- }else{
- collector.collect(value);
- }
- }
-
- @Override
- public void processBroadcastElement(BlackUser value, Context context, Collector<UserBrowseLog> collector) throws Exception {
- //实时更新广播流当中的数据
- BroadcastState<String,BlackUser> broadcastState=context.getBroadcastState(descriptor);
- broadcastState.put(value.userID,value);
- System.out.print("------------------>广播流当前的数据是:---------------->");
- System.out.print(broadcastState);
- }
- });
- filterDataStream.print();
- env.execute("FlinkBroadcastStream");
- }
-
- private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) {
- Properties consumerProperties = new Properties();
- consumerProperties.setProperty("bootstrap.severs","page01:9001");
- consumerProperties.setProperty("grop.id","browsegroup");
-
- DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
-
- DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {
- @Override
- public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {
- try{
- UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
- if(browseLog !=null){
- collector.collect(browseLog);
- }
- }catch(Exception e){
- System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
- }
- }
- });
- //设置watermark
- return processData;
- }
-
- private static DataStream<BlackUser> getUserBlackUserDataStream(StreamExecutionEnvironment env) {
- Properties consumerProperties = new Properties();
- consumerProperties.setProperty("bootstrap.severs","page01:9002");
- consumerProperties.setProperty("grop.id","browsegroup");
-
- DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));
-
- DataStream<BlackUser> processData=dataStreamSource.process(new ProcessFunction<String, BlackUser>() {
- @Override
- public void processElement(String s, Context context, Collector<BlackUser> collector) throws Exception {
- try{
- BlackUser blackUser = com.alibaba.fastjson.JSON.parseObject(s, BlackUser.class);
- if(blackUser !=null){
- collector.collect(blackUser);
- }
- }catch(Exception e){
- System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
- }
- }
- });
- return processData;
- }
- //定义用户黑名单的配置信息
- public static class BlackUser implements Serializable{
- private String userID;
- private String userName;
- public BlackUser(){
-
- }
-
- public BlackUser(String userID, String userName) {
- this.userID = userID;
- this.userName = userName;
- }
-
- public String getUserID() {
- return userID;
- }
-
- public void setUserID(String userID) {
- this.userID = userID;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
- }
- //浏览类
- public static class UserBrowseLog implements Serializable {
- private String userID;
- private String eventTime;
- private String eventType;
- private String productID;
- private Integer productPrice;
-
- public String getUserID() {
- return userID;
- }
-
- public void setUserID(String userID) {
- this.userID = userID;
- }
-
- public String getEventTime() {
- return eventTime;
- }
-
- public void setEventTime(String eventTime) {
- this.eventTime = eventTime;
- }
-
- public String getEventType() {
- return eventType;
- }
-
- public void setEventType(String eventType) {
- this.eventType = eventType;
- }
-
- public String getProductID() {
- return productID;
- }
-
- public void setProductID(String productID) {
- this.productID = productID;
- }
-
- public Integer getProductPrice() {
- return productPrice;
- }
-
- public void setProductPrice(Integer productPrice) {
- this.productPrice = productPrice;
- }
-
- @Override
- public String toString() {
- return "UserBrowseLog{" +
- "userID='" + userID + '\'' +
- ", eventTime='" + eventTime + '\'' +
- ", eventType='" + eventType + '\'' +
- ", productID='" + productID + '\'' +
- ", productPrice=" + productPrice +
- '}';
- }
- }
- }
4、Flink分布式缓存Distributed Cache
- Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
- 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它
注册:
- //获取运行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
- env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
使用:
File myFile = getRuntimeContext().getDistributedCache().getFile("a.text");
a.text文件
hello flink hello FLINK
完整代码:
- public class DisCacheTest {
- public static void main(String[] args) throws Exception{
- //获取运行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
-
- env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");
- DataSource<String> data = env.fromElements("a", "b", "c", "d");
- DataSet<String> result = data.map(new RichMapFunction<String, String>() {
- private ArrayList<String> dataList = new ArrayList<String>();
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- //2:使用文件
- File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
- List<String> lines = FileUtils.readLines(myFile);
- for (String line : lines) {
- this.dataList.add(line);
- System.err.println("分布式缓存为:" + line);
- }
- }
- @Override
- public String map(String value) throws Exception {
- //在这里就可以使用dataList
- System.err.println("使用datalist:" + dataList + "------------" +value);
- //业务逻辑
- return dataList +":" + value;
- }
- });
- result.printToErr();
- }
- }//
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。