当前位置:   article > 正文

Flink 内容分享(四):Fink原理、实战与性能优化(四)_flink原理、实战与性能优化

flink原理、实战与性能优化

目录

Transformations

Sink

分区策略


Transformations

Transformations算子可以将一个或者多个算子转换成一个新的数据流,使用Transformations算子组合可以处理复杂的业务处理。

Map

DataStream → DataStream

遍历数据流中的每一个元素,产生一个新的元素。

FlatMap

DataStream → DataStream

遍历数据流中的每一个元素,产生N个元素 N=0,1,2......。

Filter

DataStream → DataStream

过滤算子,根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉。

KeyBy

DataStream → KeyedStream

根据数据流中指定的字段来分区,相同指定字段值的数据一定是在同一个分区中,内部分区使用的是HashPartitioner。

指定分区字段的方式有三种:

  • 根据索引号指定。

  • 通过匿名函数来指定。

  • 通过实现KeySelector接口  指定分区字段。

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStreamSource<Long> stream = env.fromSequence(1100);
  4.         stream.map((MapFunction<Long, Tuple2<Long, Integer>>) (Long x) -> new Tuple2<>(x % 31), TypeInformation.of(new TypeHint<Tuple2<Long, Integer>>() {}))
  5.                 //根据索引号来指定分区字段:.keyBy(0)
  6.                 //通过传入匿名函数 指定分区字段:.keyBy(x=>x._1)
  7.                 //通过实现KeySelector接口  指定分区字段                
  8.                 .keyBy((KeySelector<Tuple2<Long, Integer>, Long>) (Tuple2<Long, Integer> value) -> value.f0, BasicTypeInfo.LONG_TYPE_INFO)
  9.                 .sum(1).print();
  10.         env.execute("Flink Job");
  11.     }

Reduce

适用于KeyedStream

KeyedStream:根据key分组 → DataStream

注意,reduce是基于分区后的流对象进行聚合,也就是说,DataStream类型的对象无法调用reduce方法

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(
  4.                 new Tuple2<>("apple"3),
  5.                 new Tuple2<>("banana"1),
  6.                 new Tuple2<>("apple"5),
  7.                 new Tuple2<>("banana"2),
  8.                 new Tuple2<>("apple"4)
  9.         );
  10.         // 使用reduce操作,将input中的所有元素合并到一起
  11.         DataStream<Tuple2<String, Integer>> result = dataStream
  12.                 .keyBy(0)
  13.                 .reduce((ReduceFunction<Tuple2<String, Integer>>) (value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1));
  14.         result.print();
  15.         env.execute();
  16.     }

Aggregations

KeyedStream → DataStream

Aggregations代表的是一类聚合算子,上面说的reduce就属于Aggregations,以下是一些常用的:

  • sum(): 计算数字类型字段的总和。

  • min(): 计算最小值。

  • max(): 计算最大值。

  • count(): 计数元素个数。

  • avg(): 计算平均值。

另外,Flink 还支持自定义聚合函数,即使用 AggregateFunction 接口实现更复杂的聚合逻辑。

Union 真合并

DataStream → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams

合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素

注意:需要保证数据流中元素类型一致

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Tuple2<String, Integer>> ds1 = env.fromCollection(Arrays.asList(Tuple2.of("a",1),Tuple2.of("b",2),Tuple2.of("c",3)));
  4.         DataStream<Tuple2<String, Integer>> ds2 = env.fromCollection(Arrays.asList(Tuple2.of("d",4),Tuple2.of("e",5),Tuple2.of("f",6)));
  5.         DataStream<Tuple2<String, Integer>> ds3 = env.fromCollection(Arrays.asList(Tuple2.of("g",7),Tuple2.of("h",8)));
  6.         DataStream<Tuple2<String, Integer>> unionStream = ds1.union(ds2,ds3);
  7.         unionStream.print();
  8.         env.execute();
  9.     }

在 Flink 中,Union 操作被称为 "真合并" 是因为它将两个或多个数据流完全融合在一起,没有特定的顺序,并且不会去除重复项。这种操作方式类似于在数学概念中的集合联合(Union)操作,所以被称为 "真合并"。

请注意,与其他一些数据处理框架中的 Union 操作相比,例如 Spark 中的 Union 会根据某些条件去除重复的元素,Flink 的 Union 行为更接近于数学上的集合联合理论。

Connect 假合并

DataStream,DataStream → ConnectedStreams

合并两个数据流并且保留两个数据流的数据类型,能够共享两个流的状态

  1. public static void main(String[] args) throws Exception {
  2.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<String> ds1 = env.socketTextStream("localhost"8888);
  4.         DataStream<String> ds2 = env.socketTextStream("localhost"9999);
  5.         DataStream<Tuple2<String, Integer>> wcStream1 = ds1
  6.                 .flatMap(new Tokenizer())
  7.                 .keyBy(value -> value.f0)
  8.                 .sum(1);
  9.         DataStream<Tuple2<String, Integer>> wcStream2 = ds2
  10.                 .flatMap(new Tokenizer())
  11.                 .keyBy(value -> value.f0)
  12.                 .sum(1);
  13.         ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectedStreams = wcStream1.connect(wcStream2);
  14.     }

union不同,connect只能连接两个流,并且这两个流的类型可以不同。connect后的两个流会被看作是两个不同的流,可以使用CoMap或者CoFlatMap函数分别处理这两个流。

CoMap, CoFlatMap

ConnectedStreams → DataStream

CoMap, CoFlatMap并不是具体算子名字,而是一类操作的名称

  • 凡是基于ConnectedStreams数据流做map遍历,这类操作叫做CoMap。

  • 凡是基于ConnectedStreams数据流做flatMap遍历,这类操作叫做CoFlatMap。

CoMap实现:

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         // 创建两个不同的数据流
  4.         DataStream<Integer> nums = env.fromElements(12345);
  5.         DataStream<String> text = env.fromElements("a""b""c");
  6.         // 连接两个数据流
  7.         ConnectedStreams<Integer, String> connected = nums.connect(text);
  8.         // 使用 CoMap 处理连接的流
  9.         DataStream<String> result = connected.map(new CoMapFunction<Integer, String, String>() {
  10.             @Override
  11.             public String map1(Integer value) {
  12.                 return String.valueOf(value*2);
  13.             }
  14.             @Override
  15.             public String map2(String value) {
  16.                 return "hello " + value;
  17.             }
  18.         });
  19.         result.print();
  20.         env.execute("CoMap example");
  21.     }

CoFlatMap实现方式:

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Integer> nums = env.fromElements(12345);
  4.         DataStream<String> text = env.fromElements("a""b""c");
  5.         ConnectedStreams<Integer, String> connected = nums.connect(text);
  6.         DataStream<String> result = connected.flatMap(new CoFlatMapFunction<Integer, String, String>() {
  7.             @Override
  8.             public void flatMap1(Integer value, Collector<String> out) {
  9.                 out.collect(String.valueOf(value*2));
  10.                 out.collect(String.valueOf(value*3));
  11.             }
  12.             @Override
  13.             public void flatMap2(String value, Collector<String> out) {
  14.                 out.collect("hello " + value);
  15.                 out.collect("hi " + value);
  16.             }
  17.         });
  18.         result.print();
  19.         env.execute("CoFlatMap example");
  20.     }

Split/Select

DataStream → SplitStream

根据条件将一个流分成多个流,示例代码如下:

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStreamSource<Long> data = env.generateSequence(010);
  4.         SplitStream<Long> split = data.split((OutputSelector<Long>) value -> {
  5.             List<String> output = new ArrayList<>();
  6.             if (value % 2 == 0) {
  7.                 output.add("even");
  8.             } else {
  9.                 output.add("odd");
  10.             }
  11.             return output;
  12.         });
  13.         split.select("odd").print();
  14.         env.execute("Flink SplitStream Example");
  15.     }

select()用于从SplitStream中选择一个或者多个数据流。

split.select("odd").print();

SideOutput

注意:在Flink 1.12 及之后的版本中,SplitStream 已经被弃用并移除,一般推荐使用 Side Outputs(侧输出流)来替代 Split和Select

示例代码如下:

  1. private static final OutputTag<String> evenOutput = new OutputTag<String>("even"){};
  2. private static final OutputTag<String> oddOutput = new OutputTag<String>("odd"){};
  3. public static void main(String[] args) throws Exception {
  4.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         DataStream<String> input = env.fromElements("1""2""3""4""5");
  6.         SingleOutputStreamOperator<String> processed = input.process(new ProcessFunction<String, String>() {
  7.             @Override
  8.             public void processElement(String value, Context ctx, Collector<String> out){
  9.                 int i = Integer.parseInt(value);
  10.                 if (i % 2 == 0) {
  11.                     ctx.output(evenOutput, value);
  12.                 } else {
  13.                     ctx.output(oddOutput, value);
  14.                 }
  15.             }
  16.         });
  17.         DataStream<String> evenStream = processed.getSideOutput(evenOutput);
  18.         DataStream<String> oddStream = processed.getSideOutput(oddOutput);
  19.         evenStream.print("even");
  20.         oddStream.print("odd");
  21.         env.execute("Side Output Example");
  22.     }

Iterate

DataStream → IterativeStream → DataStream

Iterate算子提供了对数据流迭代的支持

一个数据集通过迭代运算符被划分为两部分:“反馈”部分(feedback)和“输出”部分(output)。反馈部分被反馈到迭代头(iteration head),从而形成下一次迭代。输出部分则构成该迭代的结果:

  1. public static void main(String[] args) throws Exception {
  2.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Long> input = env.fromElements(10L);
  4.         // 定义迭代流,最大迭代10次
  5.         IterativeStream<Long> iteration = input.iterate(10000L);
  6.         // 定义迭代逻辑
  7.         DataStream<Long> minusOne = iteration.map((MapFunction<Long, Long>) value -> value - 1);
  8.         // 定义反馈流(满足条件继续迭代)和输出流(不满足条件的结果)
  9.         DataStream<Long> stillGreaterThanZero = minusOne.filter(value -> value > 0).setParallelism(1);;
  10.         DataStream<Long> lessThanZero = minusOne.filter(value -> value <= 0);
  11.         // 关闭迭代,定义反馈流
  12.         iteration.closeWith(stillGreaterThanZero);
  13.         // 打印结果
  14.         lessThanZero.print();
  15.         env.execute("Iterative Stream Example");
  16.     }

普通函数 & 富函数

Apache Flink 中有两种类型的函数: 「普通函数(Regular Functions)」和 「富函数(Rich Functions)」。主要区别在于富函数相比普通函数提供了更多生命周期方法和上下文信息。

  • 普通函数:这些函数只需要覆盖一个或几个特定方法,如 MapFunction 需要实现 map() 方法。它们没有生命周期方法,也不能访问执行环境的上下文。

  • 富函数:除了覆盖特定函数外,富函数还提供了对 Flink API 更多的控制和操作,包括:

    • 生命周期管理:可以覆盖 open() 和 close() 方法以便在函数启动前和关闭后做一些设置或清理工作。

    • 获取运行时上下文信息:例如,通过 getRuntimeContext() 方法获取并行任务的信息,如当前子任务的索引等。

    • 状态管理和容错:可以定义和使用托管状态(Managed State),这在构建容错系统时非常重要。

简而言之,如果你需要在函数中使用 Flink 的高级功能,如状态管理或访问运行时上下文,则需要使用富函数。如果不需要这些功能,使用普通函数即可。

普通函数类富函数类
MapFunctionRichMapFunction
FlatMapFunctionRichFlatMapFunction
FilterFunctionRichFilterFunction
............

普通函数:

  1. public static void main(String[] args) throws Exception {
  2.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         List<String> words = Arrays.asList("hello""world""flink""hello""world");
  4.         env.fromCollection(words)
  5.                 .map(new MapFunction<String, Tuple2<String, Integer>>() {
  6.                     @Override
  7.                     public Tuple2<String, Integer> map(String value) {
  8.                         return new Tuple2<>(value, 1);
  9.                     }
  10.                 })
  11.                 .keyBy(0)
  12.                 .sum(1)
  13.                 .print();
  14.         env.execute("Word Count Example");
  15.     }

富函数:

  1. public static void main(String[] args) throws Exception {
  2.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         List<String> words = Arrays.asList("hello""world""flink""hello""world");
  4.         env.fromCollection(words)
  5.                 .map(new RichMapFunction<String, Tuple2<String, Integer>>() {
  6.                     @Override
  7.                     public void open(Configuration parameters) throws Exception {
  8.                         super.open(parameters);
  9.                         // 可以在这里设置相关的配置或者资源,如数据库连接等
  10.                     }
  11.                     @Override
  12.                     public Tuple2<String, Integer> map(String value) throws Exception {
  13.                         return new Tuple2<>(value, 1);
  14.                     }
  15.                     @Override
  16.                     public void close() throws Exception {
  17.                         super.close();
  18.                         // 可以在这里完成资源的清理工作
  19.                     }
  20.                 })
  21.                 .keyBy(0)
  22.                 .sum(1)
  23.                 .print();
  24.         env.execute("Word Count Example");
  25.     }

ProcessFunction(处理函数)

ProcessFunction属于低层次的API,在类继承关系上属于富函数。

我们前面讲的mapfilterflatMap等算子都是基于这层封装出来的。

越低层次的API,功能越强大,用户能够获取的信息越多,比如可以拿到元素状态信息、事件时间、设置定时器等

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<String> dataStream = env.socketTextStream("localhost"9999)
  4.                 .map(new MapFunction<String, Tuple2<String, Integer>>() {
  5.                     @Override
  6.                     public Tuple2<String, Integer> map(String value) {
  7.                         return new Tuple2<>(value, 1);
  8.                     }
  9.                 })
  10.                 .keyBy(0)
  11.                 .process(new AlertFunction());
  12.         dataStream.print();
  13.         env.execute("Process Function Example");
  14.     }
  15.     public static class AlertFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, String> {
  16.         private transient ValueState<Integer> countState;
  17.         @Override
  18.         public void open(Configuration config) {
  19.             ValueStateDescriptor<Integer> descriptor =
  20.                     new ValueStateDescriptor<>(
  21.                             "countState"// state name
  22.                             TypeInformation.of(new TypeHint<Integer>() {}), // type information
  23.                             0); // default value
  24.             countState = getRuntimeContext().getState(descriptor);
  25.         }
  26.         @Override
  27.         public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
  28.             Integer currentCount = countState.value();
  29.             currentCount += 1;
  30.             countState.update(currentCount);
  31.             if (currentCount >= 3) {
  32.                 out.collect("Warning! The key '" + value.f0 + "' has been seen " + currentCount + " times.");
  33.             }
  34.         }
  35.     }

这里,我们创建一个名为AlertFunction的处理函数类,并继承KeyedProcessFunction。其中,ValueState用于保存状态信息,每个键会有其自己的状态实例。当计数达到或超过三次时,该系统将发出警告。这个例子主要展示了处理函数与其他运算符相比的两个优点:访问键控状态和生命周期管理方法(例如open())。

注意:上述示例假设你已经在本地的9999端口上设置了一个socket服务器,用于流式传输文本数据。如果没有,你需要替换这部分以适应你的输入源。

Sink

在Flink中,"Sink"是数据流计算的最后一步。它代表了一个输出端点,在那里计算结果被发送或存储。换句话说,Sink是数据流处理过程中的结束节点,负责将处理后的数据输出到外部系统,如数据库、文件、消息队列等。

Flink内置了大量Sink,可以将Flink处理后的数据输出到HDFS、kafka、Redis、ES、MySQL等。

Redis Sink

Flink处理的数据可以存储到Redis中,以便实时查询。

首先,需要导入Flink和Redis的连接器依赖:

  1. <!-- Flink Redis connector -->
  2.         <dependency>
  3.             <groupId>org.apache.bahir</groupId>
  4.             <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
  5.             <version>1.1.0</version>
  6.         </dependency>

下面的代码展示了"Word Count"(词频统计)操作,并将结果存储到Redis数据库中:

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<String> text = env.fromElements(
  4.                 "Hello World",
  5.                 "Hello Flink",
  6.                 "Hello Java");
  7.         DataStream<Tuple2<String, Integer>> counts =
  8.                 text.flatMap(new Tokenizer())
  9.                         .keyBy(value -> value.f0)
  10.                         .sum(1);
  11.         FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build();
  12.         counts.addSink(new RedisSink<>(conf, new RedisExampleMapper()));
  13.         env.execute("Word Count Example");
  14.     }
  15.     public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
  16.         @Override
  17.         public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  18.             String[] words = value.toLowerCase().split("\\W+");
  19.             for (String word : words) {
  20.                 if (word.length() > 0) {
  21.                     out.collect(new Tuple2<>(word, 1));
  22.                 }
  23.             }
  24.         }
  25.     }
  26.     public static final class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> {
  27.         @Override
  28.         public RedisCommandDescription getCommandDescription() {
  29.             return new RedisCommandDescription(RedisCommand.HSET);
  30.         }
  31.         @Override
  32.         public String getKeyFromData(Tuple2<String, Integer> data) {
  33.             return data.f0;
  34.         }
  35.         @Override
  36.         public String getValueFromData(Tuple2<String, Integer> data) {
  37.             return data.f1.toString();
  38.         }
  39.     }

Kafka Sink

处理结果写入到kafka topic中,Flink也是支持的,需要添加连接器依赖,跟读取kafka数据用的连接器依赖相同,之前添加过就不需要再添加了。

  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-connector-kafka_2.12</artifactId>
  4.     <version>1.13.6</version>
  5. </dependency>

还是用上面词频统计的例子:

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<String> text = env.fromElements(
  4.                 "Hello World",
  5.                 "Hello Flink",
  6.                 "Hello Java");
  7.         DataStream<Tuple2<String, Integer>> counts =
  8.                 text.flatMap(new Tokenizer())
  9.                         .keyBy(value -> value.f0)
  10.                         .sum(1);
  11.         // Define Kafka properties
  12.         Properties properties = new Properties();
  13.         properties.setProperty("bootstrap.servers""localhost:9092");
  14.         // Write the data stream to Kafka
  15.         counts.map(new MapFunction<Tuple2<String,Integer>, String>() {
  16.                     @Override
  17.                     public String map(Tuple2<String,Integer> value) throws Exception {
  18.                         return value.f0 + "," + value.f1.toString();
  19.                     }
  20.                 })
  21.                 .addSink(new FlinkKafkaProducer<>("my-topic"new SimpleStringSchema(), properties));
  22.         env.execute("Word Count Example");
  23.     }

MySQL Sink

Flink处理结果写入到MySQL中,这并不是Flink默认支持的,需要添加MySQL的驱动依赖:

  1. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  2. <dependency>
  3.     <groupId>mysql</groupId>
  4.     <artifactId>mysql-connector-java</artifactId>
  5.     <version>8.0.28</version>
  6. </dependency>

因为不是内嵌支持的,所以需要基于SinkFunction自定义Sink。

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<String> text = env.fromElements(
  4.                 "Hello World",
  5.                 "Hello Flink",
  6.                 "Hello Java");
  7.         DataStream<Tuple2<String, Integer>> counts =
  8.                 text.flatMap(new Tokenizer())
  9.                         .keyBy(value -> value.f0)
  10.                         .sum(1);
  11.         // Transform the Tuple2<String, Integer> to a format acceptable by MySQL
  12.         DataStream<String> mysqlData = counts.map(new MapFunction<Tuple2<String, Integer>, String>() {
  13.             @Override
  14.             public String map(Tuple2<String, Integer> value) throws Exception {
  15.                 return "'" + value.f0 + "'," + value.f1.toString();
  16.             }
  17.         });
  18.         // Write the data stream to MySQL
  19.         mysqlData.addSink(new MySqlSink());
  20.         env.execute("Word Count Example");
  21.     }
  22.     public static class MySqlSink implements SinkFunction<String> {
  23.         private Connection connection;
  24.         private PreparedStatement preparedStatement;
  25.         @Override
  26.         public void invoke(String value, Context context) throws Exception {
  27.             if(connection == null) {
  28.                 connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test""username""password");
  29.                 preparedStatement = connection.prepareStatement("INSERT INTO my_table(word, count) VALUES("+ value +")");
  30.             }
  31.             preparedStatement.executeUpdate();
  32.         }
  33.     }
  34. }

HBase Sink

需要导入HBase的依赖:

  1.         <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
  2.         <dependency>
  3.             <groupId>org.apache.hbase</groupId>
  4.             <artifactId>hbase-client</artifactId>
  5.             <version>2.5.2</version>
  6.         </dependency>
  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<String> text = env.fromElements(
  4.                 "Hello World",
  5.                 "Hello Flink",
  6.                 "Hello Java");
  7.         DataStream<Tuple2<String, Integer>> counts =
  8.                 text.flatMap(new Tokenizer())
  9.                         .keyBy(value -> value.f0)
  10.                         .sum(1);
  11.         counts.addSink(new HBaseSink());
  12.         env.execute("Word Count Example");
  13.     }
  14.     public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
  15.         @Override
  16.         public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  17.             String[] words = value.toLowerCase().split("\\W+");
  18.             for (String word : words) {
  19.                 if (word.length() > 0) {
  20.                     out.collect(new Tuple2<>(word, 1));
  21.                 }
  22.             }
  23.         }
  24.     }
  25.     public static class HBaseSink extends RichSinkFunction<Tuple2<String, Integer>> {
  26.         private org.apache.hadoop.conf.Configuration config;
  27.         private org.apache.hadoop.hbase.client.Connection connection;
  28.         private Table table;
  29.         @Override
  30.         public void invoke(Tuple2<String, Integer> value, Context context) throws IOException {
  31.             Put put = new Put(Bytes.toBytes(value.f0));
  32.             put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(value.f1));
  33.             table.put(put);
  34.         }
  35.         @Override
  36.         public void open(Configuration parameters) throws Exception {
  37.             config = HBaseConfiguration.create();
  38.             config.set("hbase.zookeeper.quorum""localhost");
  39.             config.set("hbase.zookeeper.property.clientPort""2181");
  40.             connection = ConnectionFactory.createConnection(config);
  41.             table = connection.getTable(TableName.valueOf("my-table"));
  42.         }
  43.         @Override
  44.         public void close() throws Exception {
  45.             table.close();
  46.             connection.close();
  47.         }
  48.     }

HBaseSink类是RichSinkFunction的实现,用于将结果写入HBase数据库。在invoke方法中,它将接收到的每个二元组(单词和计数)写入HBase。在open方法中,它创建了与HBase的连接,并指定了要写入的表。在close方法中,它关闭了与HBase的连接和表。

分区策略

在 Apache Flink 中,分区(Partitioning)是将数据流按照一定的规则划分成多个子数据流或分片,以便在不同的并行任务或算子中并行处理数据。分区是实现并行计算和数据流处理的基础机制。Flink 的分区决定了数据在作业中的流动方式,以及在并行任务之间如何分配和处理数据。

在 Flink 中,数据流可以看作是一个有向图,图中的节点代表算子(Operators),边代表数据流(Data Streams)。数据从源算子流向下游算子,这些算子可能并行地处理输入数据,而分区就是决定数据如何从一个算子传递到另一个算子的机制。

下面介绍Flink中常用的几种分区策略。

shuffle

场景:增大分区、提高并行度,解决数据倾斜。

DataStream → DataStream

分区元素随机均匀分发到下游分区,网络开销比较大

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Long> stream = env.fromSequence(110).setParallelism(1);
  4.         System.out.println(stream.getParallelism());
  5.         stream.shuffle().print();
  6.         env.execute();
  7.     }

输出结果:上游数据比较随意地分发到下游

  1. 1> 7
  2. 7> 1
  3. 2> 8
  4. 4> 5
  5. 8> 3
  6. 1> 9
  7. 8> 4
  8. 8> 10
  9. 6> 2
  10. 6> 6

rebalance

场景:增大分区、提高并行度,解决数据倾斜

DataStream → DataStream

轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Long> stream = env.fromSequence(110).setParallelism(1);
  4.         System.out.println(stream.getParallelism());
  5.         stream.rebalance().print();
  6.         env.execute();
  7.     }

输出:上游数据比较均匀的分发到下游

  1. 2> 2
  2. 1> 1
  3. 8> 8
  4. 5> 5
  5. 7> 7
  6. 4> 4
  7. 3> 3
  8. 6> 6
  9. 1> 9
  10. 2> 10

rescale

场景:减少分区,防止发生大量的网络传输,不会发生全量的重分区

DataStream → DataStream

通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素

和其他重分区策略(如 rebalance、forward、broadcast 等)不同的是,rescale 在运行时不会改变并行度,而且它只在本地(同一个 TaskManager 内)进行数据交换,所以它比其他重分区策略更加高效

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<String> dataStream = env.fromElements("1""2""3""4""5");
  4.         // 使用MapFunction将元素转换为整数类型
  5.         DataStream<Integer> intStream = dataStream.map(new MapFunction<String, Integer>() {
  6.             @Override
  7.             public Integer map(String value) {
  8.                 return Integer.parseInt(value);
  9.             }
  10.         });
  11.         // 使用rescale()进行重分区
  12.         DataStream<Integer> rescaledStream = intStream.rescale();
  13.         rescaledStream.print();
  14.         env.execute("Rescale Example");
  15.     }

在这个例子中,我们创建了一个字符串类型的DataStream然后通过map()将每一个元素转换为整数。然后,我们对结果DataStream应用rescale()操作来重分区数据。

值得注意的是,rescale()的实际影响取决于你的并行度和集群环境,如果不同的并行实例都在同一台机器上,或者并行度只有1,那么可能不会看到rescale()的效果。而在大规模并行处理的情况下,使用rescale()操作可以提高数据处理的效率。

此外,我们不能直接在打印结果中看到rescale的影响,因为它改变的是内部数据分布和处理方式,而不是输出的结果。如果想观察rescale的作用,需要通过Flink的Web UI或者日志来查看任务执行情况,如数据流的分布、各个子任务的运行状态等信息。

broadcast

场景:需要使用映射表、并且映射表会经常发生变动的场景

DataStream → DataStream

上游中每一个元素内容广播到下游每一个分区中

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Integer> dataStream = env.fromElements(12345);
  4.         DataStream<String> broadcastStream = env.fromElements("2""4");
  5.         MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>(
  6.                 "RulesBroadcastState",
  7.                 BasicTypeInfo.STRING_TYPE_INFO,
  8.                 BasicTypeInfo.STRING_TYPE_INFO);
  9.         BroadcastStream<String> broadcastData = broadcastStream.broadcast(descriptor);
  10.         dataStream.connect(broadcastData)
  11.                 .process(new BroadcastProcessFunction<Integer, String, String>() {
  12.                     @Override
  13.                     public void processElement(Integer value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
  14.                         if (ctx.getBroadcastState(descriptor).contains(String.valueOf(value))) {
  15.                             out.collect("Value " + value + " matches with a broadcasted rule");
  16.                         }
  17.                     }
  18.                     @Override
  19.                     public void processBroadcastElement(String rule, Context ctx, Collector<String> out) throws Exception {
  20.                         ctx.getBroadcastState(descriptor).put(rule, rule);
  21.                     }
  22.                 }).print();
  23.         env.execute("Broadcast State Example");
  24.     }

上述代码首先定义了一个主流和一个要广播的流。然后,我们创建了一个MapStateDescriptor,用于存储广播数据。接着,我们将广播流转换为BroadcastStream

最后,我们使用connect()方法连接主流和广播流,并执行process()方法。在这个process()方法中,我们定义了两个处理函数:processElement()processBroadcastElement()processElement()用于处理主流中的每个元素,并检查该元素是否存在于广播状态中。如果是,则输出一个字符串,表明匹配成功。而processBroadcastElement()则用于处理广播流中的每个元素,并将其添加到广播状态中。

注意:在分布式计算环境中,每个并行实例都会接收广播流中的所有元素。因此,广播状态对于所有的并行实例都是一样的。不过,在Flink 1.13版本中,广播状态尚未在故障恢复中提供完全的保障。所以在事件出现故障时,广播状态可能会丢失数据。

global

场景:并行度降为1

DataStream → DataStream

在 Apache Flink 中,Global 分区策略意味着所有数据都被发送到下游算子的同一个分区中。这种情况下,下游算子只有一个任务处理全部数据。这是一种特殊的分区策略,只有在下游算子能够很快地处理所有数据,或者需要全局排序或全局聚合时才会使用。

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         // 创建一个从1到100的数字流
  4.         DataStream<Long> numberStream = env.fromSequence(1100);
  5.         // 对流应用 map function
  6.         DataStream<Long> result = numberStream.global()
  7.                 .map(new MapFunction<Long, Long>() {
  8.                     @Override
  9.                     public Long map(Long value) {
  10.                         System.out.println("Processing " + value);
  11.                         return value * 2;
  12.                     }
  13.                 });
  14.         result.print();
  15.         env.execute("Global Partition Example");
  16.     }

以上代码创建了一个顺序生成 1-100 的数字流,并应用了 Global Partition,然后对每个数字进行乘2的操作。实际运行此代码时,你会观察到所有的数字都由同一任务处理,打印出来的处理顺序是连续的。这就是 Global Partition 的作用:所有数据都被发送到下游算子的同一实例进行处理。

需要注意的是,此示例只是为了演示 Global Partition 的工作原理,实际上并不推荐在负载均衡很重要的应用场景中使用这种分区策略,因为它可能导致严重的性能问题。

forward

场景:一对一的数据分发,默认的分区策略,数据在各个算子之间不会重新分配。map、flatMap、filter 等都是这种分区策略

DataStream → DataStream

上游分区数据分发到下游对应分区中

partition1->partition1;partition2->partition2

注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:

Forward partitioning does not allow change of parallelism. Upstream operation: Source: Socket Stream-1 parallelism: 1, downstream operation: Map-3 parallelism: 8 You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.
  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Integer> dataStream = env.fromElements(12345678910).setParallelism(1);
  4.         DataStream<Integer> forwardStream = dataStream.forward().map(new MapFunction<Integer, Integer>() {
  5.             @Override
  6.             public Integer map(Integer value) throws Exception {
  7.                 return value * value;
  8.             }
  9.         }).setParallelism(1);
  10.         forwardStream.print();
  11.         env.execute("Flink Forward Example");
  12.     }

此代码首先创建一个从1到10的数据流。然后,它使用 Forward 策略将这个数据流送入一个 MapFunction 中,该函数将每个数字平方。然后,它打印出结果。注意:以上代码中的forward调用实际上并没有改变任何分区策略,因为forward是默认分区策略。这里添加forward调用主要是为了说明其存在和使用方法。

keyBy

场景:与业务场景匹配

DataStream → DataStream

根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区

MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
  1. public static void main(String[] args) throws Exception {
  2.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Tuple2<Integer, Integer>> dataStream = env.fromElements(
  4.                 new Tuple2<>(13),
  5.                 new Tuple2<>(15),
  6.                 new Tuple2<>(24),
  7.                 new Tuple2<>(26),
  8.                 new Tuple2<>(37)
  9.         );
  10.         // 使用 keyBy 对流进行分区操作
  11.         DataStream<Tuple2<Integer, Integer>> keyedStream = dataStream
  12.                 .keyBy(0// 根据元组的第一个字段进行分区
  13.                 .sum(1);  // 对每个键对应的第二个字段求和
  14.         keyedStream.print();
  15.         env.execute("KeyBy example");
  16.     }

以上程序首先创建了一个包含五个元组的流,然后使用 keyBy 方法根据元组的第一个字段进行分区,并对每个键对应的第二个字段求和。执行结果中,每个键的值集合都被映射成了一个新的元组,其第一个字段是键,第二个字段是相应的和。

注意:在以上代码中,keyBy(0) 表示根据元组的第一个字段(索引从0开始)进行分区操作。另外,无论什么情况,都需要确保你的 Flink 集群是正常运行的,否则程序可能无法执行成功。

PartitionCustom

DataStream → DataStream

通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区

  1. public static void main(String[] args) throws Exception {
  2.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStream<Integer> data = env.fromElements(1,2,3,4,5,6,7,8,9,10);
  4.         // 使用自定义分区器进行分区
  5.         data.partitionCustom(new MyPartitioner(), i -> i).print();
  6.         env.execute("Custom partition example");
  7.     }
  8.     public static class MyPartitioner implements Partitioner<Integer> {
  9.         @Override
  10.         public int partition(Integer key, int numPartitions) {
  11.             return key % numPartitions;
  12.         }
  13.     }

这个程序将创建一个数据流,其中包含从1到10的整数。然后,它使用了一个自定义的分区器MyPartitioner来对这个数据流进行分区。这个分区器根据元素的值对numPartitions取模来决定数据去到哪个分区。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/607515
推荐阅读
相关标签
  

闽ICP备14008679号