当前位置:   article > 正文

Flink常用算子Transformation(转换)_实时 flink kafka map 算子 hdfs offset

实时 flink kafka map 算子 hdfs offset

在之前的《Flink DataStream API》一文中,我们列举了一些Flink自带且常用的transformation算子,例如map、flatMap等。在Flink的编程体系中,我们获取到数据源之后,需要经过一系列的处理即transformation操作,再将最终结果输出到目的Sink(ES、mysql或者hdfs),使数据落地。因此,除了正确的继承重写RichSourceFunction<>和RichSinkFunction<>之外,最终要的就是实时处理这部分,下面的图介绍了Flink代码执行流程以及各模块的组成部分。

在Storm中,我们常常用Bolt的层级关系来表示各个数据的流向关系,组成一个拓扑。在Flink中,Transformation算子就是将一个或多个DataStream转换为新的DataStream,可以将多个转换组合成复杂的数据流拓扑。如下图所示,DataStream会由不同的Transformation操作,转换、过滤、聚合成其他不同的流,从而完成我们的业务要求。

那么以《Flink从kafka中读数据存入Mysql Sink》一文中的业务场景作为基础,在Flink读取Kafka的数据之后,进行不同的算子操作来分别详细介绍一下各个Transformation算子的用法。Flink消费的数据格式依然是JSON格式:{"city":"合肥","loginTime":"2019-04-17 19:04:32","os":"Mac OS","phoneName":"vivo"}

1、map

map:输入一个元素,输出一个元素,可以用来做一些清洗工作。

  1. /**
  2. * create by xiax.xpu on @Date 2019/4/11 20:47
  3. */
  4. public class FlinkSubmitter {
  5. public static void main(String[] args) throws Exception{
  6. //获取运行时环境
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. //checkpoint配置
  9. //为了能够使用支持容错的kafka Consumer,开启checkpoint机制,支持容错,保存某个状态信息
  10. env.enableCheckpointing(5000);
  11. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  12. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  13. env.getCheckpointConfig().setCheckpointTimeout(60000);
  14. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  15. //kafka配置文件
  16. Properties props = new Properties();
  17. props.put("bootstrap.servers", "192.168.83.129:9092");
  18. props.setProperty("group.id","con1");
  19. props.put("zookeeper.connect","192.168.83.129:2181");
  20. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
  21. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value 反序列化
  22. System.out.println("ready to print");
  23. FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
  24. "kafka_flink_mysql",
  25. new SimpleStringSchema(),
  26. props);
  27. consumer.setStartFromGroupOffsets();//默认消费策略
  28. SingleOutputStreamOperator<Entity> StreamRecord = env.addSource(consumer)
  29. .map(string -> JSON.parseObject(string, Entity.class))
  30. .setParallelism(1);
  31. //融合一些transformation算子进来
  32. //map:输入一个元素,输出一个元素,可以用来做一些清洗工作
  33. SingleOutputStreamOperator<Entity> result = StreamRecord.map(new MapFunction<Entity, Entity>() {
  34. @Override
  35. public Entity map(Entity value) throws Exception {
  36. Entity entity1 = new Entity();
  37. entity1.city = value.city+".XPU.Xiax";
  38. entity1.phoneName = value.phoneName.toUpperCase();
  39. entity1.loginTime = value.loginTime;
  40. entity1.os = value.os;
  41. return entity1;
  42. }
  43. });
  44. result.print().setParallelism(1);
  45. env.execute("new one");
  46. }
  47. }

本例中我们将获取的JSON字符串转换到Entity object之后,使用map算子让所有的phoneName编程大写,city后面添加XPU.Xiax后缀。

2、flatMap

flatMap:打平操作,我们可以理解为将输入的元素压平,从而对输出结果的数量不做要求,可以为0、1或者多个都OK。它和Map相似,但引入flatMap的原因是因为一般java方法的返回值结果都是一个,因此引入flatMap来区别这个。

  1. //flatMap, 输入一个元素,返回0个、1个或者多个元素
  2. SingleOutputStreamOperator<Entity> result = StreamRecord
  3. .flatMap(new FlatMapFunction<Entity, Entity>() {
  4. @Override
  5. public void flatMap(Entity entity, Collector<Entity> out) throws Exception {
  6. if (entity.city.equals("北京")) {
  7. out.collect(entity);
  8. }
  9. }
  10. });

这里我们将所有city是北京的结果集聚合输出,注意这里并不是过滤,有些人可能会困惑这不是起了过滤filter的作用吗,其实不然,只是这里的用法刚好相似而已。简单分析一下,new FlatMapFunction<Entity, Entity>,接收的输入是Entity实体,发出的也是Entity实体类,看到这就可以与Map对应上了。

3、filter

filter:过滤筛选,将所有符合判断条件的结果集输出

  1. //filter 判断条件输出
  2. SingleOutputStreamOperator<Entity> result = StreamRecord
  3. .filter(new FilterFunction<Entity>() {
  4. @Override
  5. public boolean filter(Entity entity) throws Exception {
  6. if (entity.phoneName.equals("HUAWEI")) {
  7. return true;
  8. }
  9. return false;
  10. }
  11. });

这里我们将所有phoneName是HUAWEI的值过滤,在直接输出。

4、keyBy

keyBy:在逻辑上将Stream根据指定的Key进行分区,是根据key的Hash值进行分区的。

  1. //keyBy 从逻辑上对逻辑分区
  2. KeyedStream<Entity, String> result = StreamRecord
  3. .keyBy(new KeySelector<Entity, String>() {
  4. @Override
  5. public String getKey(Entity entity) throws Exception {
  6. return entity.os;
  7. }
  8. });

这里只是对DataStream进行分区而已,按照os进行分区,然而这输出的效果其实没什么变化


由于下面这些操作,在之前模拟生成的数据,去做转换操作不太适合。因此每个操作附上其他demo

5、reduce

reduce:属于归并操作,它能将3的keyedStream转换成DataStream,Reduce 返回单个的结果值,并且 reduce 操作每处理每一天新数据时总是创建一个新值。常用聚合操作例如min、max等都可使用reduce方法实现。这里通过实现一个Socket的wordCount简单例子,来帮助了解flatMap/keyBy/reduce/window等操作的过程。

  1. package com.bigdata.flink.Stream;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.functions.ReduceFunction;
  4. import org.apache.flink.api.java.utils.ParameterTool;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.util.Collector;
  10. /**
  11. * 滑动窗口的计算
  12. *
  13. * 通过socket模拟产生单词数据 flink对其进行统计计数
  14. * 实现时间窗口:
  15. * 每隔1秒统计前两秒的数据
  16. */
  17. public class SocketWindowWordCount {
  18. public static void main(String[] args) throws Exception{
  19. //定义端口号,通过cli接收
  20. int port;
  21. try{
  22. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  23. port = parameterTool.getInt("port");
  24. }catch(Exception e){
  25. System.err.println("No port Set, use default port---java");
  26. port = 9000;
  27. }
  28. //获取运行时环境,必须要
  29. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  30. //绑定Source,通过master的nc -l 900 产生单词
  31. String hostname = "192.168.83.129";
  32. String delimiter = "\n";
  33. //连接socket 绑定数据源
  34. DataStreamSource<String> socketWord = env.socketTextStream(hostname, port, delimiter);
  35. DataStream<WordWithCount> windowcounts = socketWord.flatMap(new FlatMapFunction<String, WordWithCount>() {
  36. public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
  37. String[] splits = value.split("\\s");
  38. for (String word : splits) {
  39. out.collect(new WordWithCount(word, 1));
  40. }
  41. }
  42. }).keyBy("word")
  43. //.sum("count");//这里求聚合 可以用reduce和sum两种方式
  44. .reduce(new ReduceFunction<WordWithCount>() {
  45. public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
  46. return new WordWithCount(a.word, a.count + b.count);
  47. }
  48. });
  49. windowcounts.print().setParallelism(1);
  50. env.execute("socketWindow");
  51. }
  52. public static class WordWithCount{
  53. public String word;
  54. public int count;
  55. //无参的构造函数
  56. public WordWithCount(){
  57. }
  58. //有参的构造函数
  59. public WordWithCount(String word, int count){
  60. this.count = count;
  61. this.word = word;
  62. }
  63. @Override
  64. public String toString() {
  65. return "WordWithCount{" +
  66. "word='" + word + '\'' +
  67. ", count=" + count +
  68. '}';
  69. }
  70. }
  71. }

 

这里只是做单词计数,至于为什么有的单词重复出现,但是请注意它后面的count值都不一样,我们直接生成了toString方法打印出的结果。

6、aggregations

aggregations:进行一些聚合操作,例如sum(),min(),max()等,这些可以用于keyedStream从而获得聚合。用法如下

KeyedStream.sum(0)或者KeyedStream.sum(“Key”)

7、unoin

union:可以将多个流合并到一个流中,以便对合并的流进行统一处理,有点类似于Storm中的将上一级的两个Bolt数据汇聚到这一级同一个Bolt中。注意,合并的流类型需要一致

  1. //1.获取执行环境配置信息
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. //2.定义加载或创建数据源(source),监听9000端口的socket消息
  4. DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
  5. DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
  6. DataStream<String> textStream9002 = env.socketTextStream("localhost", 9002, "\n");
  7. DataStream<String> mapStream9000=textStream9000.map(s->"来自9000端口:"+s);
  8. DataStream<String> mapStream9001=textStream9001.map(s->"来自9001端口:"+s);
  9. DataStream<String> mapStream9002=textStream9002.map(s->"来自9002端口:"+s);
  10. //3.union用来合并两个或者多个流的数据,统一到一个流中
  11. DataStream<String> result = mapStream9000.union(mapStream9001,mapStream9002);
  12. //4.打印输出sink
  13. result.print();
  14. //5.开始执行
  15. env.execute();

8、connect

connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。

  1. //获取Flink运行环境
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. //绑定数据源
  4. DataStreamSource<Long> text1 = env.addSource(new MyParalleSource()).setParallelism(1);
  5. DataStreamSource<Long> text2 = env.addSource(new MyParalleSource()).setParallelism(1);
  6. //为了演示connect的不同,将第二个source的值转换为string
  7. SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
  8. @Override
  9. public String map(Long value) throws Exception {
  10. return "str" + value;
  11. }
  12. });
  13. ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
  14. SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
  15. @Override
  16. public Object map1(Long value) throws Exception {
  17. return value;
  18. }
  19. @Override
  20. public Object map2(String value) throws Exception {
  21. return value;
  22. }
  23. });
  24. //打印到控制台,并行度为1
  25. result.print().setParallelism(1);
  26. env.execute( "StreamingDemoWithMyNoParalleSource");

9、split

split:根据规则吧一个数据流切分成多个流,可能在实际场景中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以根据一定的规则把一个数据流切分成多个数据流。

  1. //获取Flink运行环境
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. //绑定数据源
  4. DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(1);
  5. //对流进行切分 奇数偶数进行区分
  6. SplitStream<Long> splitString = text.split(new OutputSelector<Long>() {
  7. @Override
  8. public Iterable<String> select(Long value) {
  9. ArrayList<String> output = new ArrayList<>();
  10. if (value % 2 == 0) {
  11. output.add("even");//偶数
  12. } else {
  13. output.add("odd");//奇数
  14. }
  15. return output;
  16. }
  17. });
  18. //选择一个或者多个切分后的流
  19. DataStream<Long> evenStream = splitString.select("even");//选择偶数
  20. DataStream<Long> oddStream = splitString.select("odd");//选择奇数
  21. DataStream<Long> moreStream = splitString.select("odd","even");//选择多个流
  22. //打印到控制台,并行度为1
  23. evenStream.print().setParallelism(1);
  24. env.execute( "StreamingDemoWithMyNoParalleSource");

10、window以及windowAll

window:按时间进行聚合或者其他条件对KeyedStream进行分组,用法:inputStream.keyBy(0).window(Time.seconds(10));

windowAll: 函数允许对常规数据流进行分组。通常,这是非并行数据转换,因为它在非分区数据流上运行。用法:inputStream.keyBy(0).windowAll(Time.seconds(10));

关于时间窗口,这个我们后期会详细说一下,敬请关注。

 

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/972487
推荐阅读
相关标签
  

闽ICP备14008679号