赞
踩
特性:接收一个数据,经过处理之后,就返回一个数据
那么我们现在要实现一个功能,就是从给一个文件中读取数据,返回每一行的字符串长度。
我们要读取的文件内容如下
代码贴在这里(为了让打击不看迷糊,导包什么的我就省略了)
public class TransformTest1_Base { public static void main(String[] args) throws Exception { // 1. 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 将并行度设为1 env.setParallelism(1); // 3. 读取文件夹 DataStreamSource<String> inputDataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor"); // 4. 将文件夹每一行的数据都返回它的长度 // 在这里我们用匿名内部类的方式创建了一个MapFunction对象 SingleOutputStreamOperator<Integer> dataStream = inputDataStream.map(new MapFunction<String, Integer>() { // 5. 重写map方法,参数s是接收到的一个数据,我们只需要返回它的长度就行了。 @Override public Integer map(String s) throws Exception { return s.length(); } }); // 6. 打印输出 dataStream.print(); // 7. 启动执行环境 env.execute(); } }
显示
map的使用范围就是需要对的那个数据进行处理,并且每次返回一个数据的时候,map就比较方便了。
我们发现,它需要传入一个FlatMapFunction的一个对象
我们继续点进去,看看FlatMapFunction的源码,可以发现,FlatMapFunction<T,R>也是一个接口,并且接口里面的方法的返回值是一个Collector,也就是多个值的集合。
我们还是读取那个文件,这次我们要做的处理是,将文件的每一行数据按照逗号隔开,给出代码:
public class TransformTest2_Base { public static void main(String[] args) throws Exception { // 1. 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置并行度 env.setParallelism(1); // 3. 读取文件夹 DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor"); // 4. 用匿名内部类的方式重写FlatMapFuncction,将每行字符按","隔开 SingleOutputStreamOperator<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { // 5. 分割一行字符,获得对应的字符串数组 String[] split = s.split(","); for (String slt : split) { // 6. 将这些数据返回 collector.collect(slt); } } }); // 7. 打印输出处理后的数据 flatMapStream.print(); // 8. 启动执行环境 env.execute(); } }
可以看到执行的结果
听这个名字就知道是个过滤器,用来过滤数据。
我们看看filer的源码,继承子FilterFunction,可以看到,这次泛型就只有一个值了,因为filter只允许返回的数据<=原来的数据,所以只做过滤,并不能改变数据蕾西,没必要设置返回的类型
我们继续点进去,看看FilterFunction的源码
果不其然,也是一个接口,而里面的filter方法只有一个参数,并且返回的是一个boolean类型,若返回true则var1原样返回,若返回false,则var1会被过滤掉。
我们还是读取以上文件,这一次我们返回以"sensor_1"开头的字符串,其余的一律不返回,给出代码
public class TransformTest3_Base { public static void main(String[] args) throws Exception { // 1. 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置并行度 env.setParallelism(1); // 3. 读取文件 DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor"); // 4. 用匿名内部类的方式重写FilterFunction SingleOutputStreamOperator<String> filterDataStream = dataStream.filter(new FilterFunction<String>() { @Override public boolean filter(String s) throws Exception { // 5. 若s以"sensor_1"开头,则返回true return s.startsWith("\"sensor_1\""); } }); // 6. 打印处理后的数据 filterDataStream.print(); // 7. 启动执行环境 env.execute(); } }
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
这些算子可以针对 KeyedStream 的每一个支流做聚合。
⚫ sum():对每个支流求和
⚫ min():对每个支流求最小值
⚫ max():对每个支流求最大值
⚫ minBy()
⚫ maxBy()
我们来看看max()的源码
这也是传一个属性名,也就是求对应的属性名的最大值。
public class TransformTest1_RollingAggreation { public static void main(String[] args) throws Exception { // 1. 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置并行度 env.setParallelism(1); // 3. 读取文件 DataStreamSource<String> stringDataStreamSource = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor"); // 4. 用map将每行数据变成一个对象 SingleOutputStreamOperator<SensorReading> map = stringDataStreamSource.map(new MapFunction<String, SensorReading>() { @Override public SensorReading map(String s) throws Exception { String[] split = s.split(","); return new SensorReading(split[0], new Long(split[1]), new Double(split[2])); } }); // 5. 分组操作,以id属性分组 KeyedStream<SensorReading, Tuple> keyedstream = map.keyBy("id"); // 6. 聚合操作,求每个分组的温度最大值 SingleOutputStreamOperator<SensorReading> resultStream = keyedstream.max("temperature"); // 7. 打印输出 resultStream.print(); // 8. 启动执行环境 env.execute(); } }
运行结果
诶,这有人就要问了,不是求每一个分组的温度最大值么?为什么sensor_1的这个分组所有的数据都有?
答:flink是一个流处理分布式框架,这是一条数据流,每来一个数据就得处理一次,所以输出的都是当前状态下的最大值。
在实际生产中,不可能让我们完成这么简单的操作就行了,所以我们需要更复杂的操作,而reduce就是满足这个条件,它可以让我们自定义聚合的方式。
我们这一次要实现一个实时的温度最大值,也就是返回的数据中的时间戳是当前的。
public class TransformTest1_Reduce { public static void main(String[] args) throws Exception { // 1. 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置并行度 env.setParallelism(1); // 3. 读取文件 DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor"); // 4. 通过map将每行数据转换为一个对象 SingleOutputStreamOperator<SensorReading> map = dataStream.map(new MapFunction<String, SensorReading>() { @Override public SensorReading map(String s) throws Exception { String[] split = s.split(","); return new SensorReading(split[0], new Long(split[1]), new Double(split[2])); } }); // 5. 按对象的id分组 KeyedStream<SensorReading, Tuple> keyStream = map.keyBy("id"); // 6. reduce自定义聚合 SingleOutputStreamOperator<SensorReading> reduce = keyStream.reduce(new ReduceFunction<SensorReading>() { @Override public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception { // 7. 获取当前时间为止接收到的最大温度 return new SensorReading(sensorReading.getId(), System.currentTimeMillis(), Math.max(sensorReading.getTemperature(),t1.getTemperature())); } }); // 8. 打印输出 reduce.print(); // 9. 启动运行环境 env.execute(); } }
这一次的输出我们就得你好好研究一下了。
从这块可以发现,我们获取的都是当前的时间戳,而且时间戳也在改变,这一点很好理解,但是下面这个数据就很诡异了。
public class TransformTest4_MultipleStreams { public static void main(String[] args) throws Exception { // 1. 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置并行度 env.setParallelism(1); // 3. 读取文件 DataStreamSource<String> dataStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor"); // 4. 通过map将每行数据转换为一个对象 SingleOutputStreamOperator<SensorReading> map = dataStream.map(new MapFunction<String, SensorReading>() { @Override public SensorReading map(String s) throws Exception { String[] split = s.split(","); return new SensorReading(split[0], new Long(split[1]), new Double(split[2])); } }); // 5. 按条件贴标签 SplitStream<SensorReading> split = map.split(new OutputSelector<SensorReading>() { @Override public Iterable<String> select(SensorReading value) { return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low"); } }); // 6. 按标签选择,生成不同的数据流 DataStream<SensorReading> high = split.select("high"); DataStream<SensorReading> low = split.select("low"); DataStream<SensorReading> all = split.select("high", "low"); high.print("high"); low.print("low"); all.print("all"); env.execute(); } }
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数
据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。
类似于一国两制,看似两条流合并在了一起,其实内部依旧是按照自己的约定运行,类型并没有改变。
public class TransformTest5_MultipleStreams { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. 读取文件 DataStreamSource<String> dataStreamSource = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor"); // 2. 将每行数据变成一个对象 SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(new MapFunction<String, SensorReading>() { @Override public SensorReading map(String s) throws Exception { String[] split = s.split(","); return new SensorReading(split[0], new Long(split[1]), new Double(split[2])); } }); // 3. 将数据打上标签 SplitStream<SensorReading> split = map.split(new OutputSelector<SensorReading>() { @Override public Iterable<String> select(SensorReading value) { return value.getTemperature() > 30 ? Collections.singletonList("high") : Collections.singletonList("low"); } }); // 4. 按照高温和低温的标签分成两条流 DataStream<SensorReading> high = split.select("high"); DataStream<SensorReading> low = split.select("low"); // 5. 将high流的数据转换为二元组 SingleOutputStreamOperator<Tuple2<String, Double>> tuple2SingleOutputStreamOperator = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature()); } }); // 6. 将tuple2SingleOutputStreamOperator和low连接 ConnectedStreams<Tuple2<String, Double>, SensorReading> connect = tuple2SingleOutputStreamOperator.connect(low); // 7. 调用map传参CoMapFunction将两条流合并成一条流objectSingleOutputStreamOperator SingleOutputStreamOperator<Object> objectSingleOutputStreamOperator = connect.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() { // 这是处理high流的方法 @Override public Object map1(Tuple2<String, Double> value) throws Exception { return new Tuple3<>(value.getField(0), value.getField(1), "temp is too high"); } // 这是处理low流的方法 @Override public Object map2(SensorReading value) throws Exception { return new Tuple2<>(value.getTemperature(), "normal"); } }); objectSingleOutputStreamOperator.print(); env.execute(); } }
之前我们只能合并两条流,那我们要合并多条流呢?这里我们就需要用到union方法。
若我们给出以下代码:
high.union(low,all);
那么high,low,all三条流都会合并在一起。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。