当前位置:   article > 正文

flink算子split、select、connect、union、project示例代码_flink project算子

flink project算子

 对flink相关算子的总结归纳

参考: flink1.10官方文档

github大数据flink知识点

当前maven配置为:

  1. <properties>
  2. <flink.version>1.10.0</flink.version>
  3. </properties>
  4. <!-- Flink dependencies -->
  5. <dependency>
  6. <groupId>org.apache.flink</groupId>
  7. <artifactId>flink-core</artifactId>
  8. <version>${flink.version}</version>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.apache.flink</groupId>
  12. <artifactId>flink-runtime-web_2.11</artifactId>
  13. <version>${flink.version}</version>
  14. </dependency>
  15. <!--Use this dependency if you are using the DataStream API-->
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-streaming-java_2.11</artifactId>
  19. <version>${flink.version}</version>
  20. <exclusions>
  21. <exclusion>
  22. <groupId>org.xerial.snappy</groupId>
  23. <artifactId>snappy-java</artifactId>
  24. </exclusion>
  25. </exclusions>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-clients_2.11</artifactId>
  30. <version>${flink.version}</version>
  31. </dependency>
  32. </dependencies>

以下为代码示例和介绍:

  1. import org.apache.flink.api.java.tuple.Tuple2;
  2. import org.apache.flink.api.java.tuple.Tuple3;
  3. import org.apache.flink.streaming.api.collector.selector.OutputSelector;
  4. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.SplitStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.co.CoMapFunction;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. public class DataTransfroms {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. split(env);
  15. connect(env);
  16. union(env);
  17. project(env);
  18. env.execute("flink");
  19. }
  20. }

* Connect [DataStream,DataStream → ConnectedStreams]

Connect 操作用于连接两个或者多个类型不同的 DataStream ,其返回的类型是 ConnectedStreams ,此时被连接的多个 DataStreams 可以共享彼此之间的数据状态。
但是需要注意的是由于不同 DataStream 之间的数据类型是不同的,如果想要进行后续的计算操作,还需要通过 CoMap 或 CoFlatMap 将 ConnectedStreams 转换回 DataStream:

  1. public static void connect(StreamExecutionEnvironment env) {
  2. DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 3),
  3. new Tuple2<>("b", 5));
  4. DataStreamSource<Integer> streamSource02 = env.fromElements(2, 3, 9);
  5. // 使用connect进行连接
  6. ConnectedStreams<Tuple2<String, Integer>, Integer> connect = streamSource01.connect(streamSource02);
  7. connect.map(new CoMapFunction<Tuple2<String, Integer>, Integer, Integer>() {
  8. @Override
  9. public Integer map1(Tuple2<String, Integer> value) throws Exception {
  10. return value.f1;
  11. }
  12. @Override
  13. public Integer map2(Integer value) throws Exception {
  14. return value;
  15. }
  16. }).map(x -> x * 100).print();
  17. }

运行结果:

  1. 500
  2. 200
  3. 300
  4. 300
  5. 900

* Union [DataStream* → DataStream]

用于连接两个或者多个元素类型相同的 DataStream 。当然一个 DataStream 也可以与其本生进行连接,此时该 DataStream 中的每个元素都会被获取两次:

  1. public static void union(StreamExecutionEnvironment env) {
  2. DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 1),
  3. new Tuple2<>("a", 2));
  4. DataStreamSource<Tuple2<String, Integer>> streamSource02 = env.fromElements(new Tuple2<>("b", 1),
  5. new Tuple2<>("b", 2));
  6. streamSource01.union(streamSource02);
  7. streamSource01.print();
  8. streamSource01.union(streamSource01, streamSource02);
  9. streamSource01.print();
  10. }

运行结果:

  1. (a,2)
  2. (a,1)
  3. (a,1)
  4. (a,2)

* Split [DataStream → SplitStream]:

- 用于将一个 DataStream 按照指定规则进行拆分为多个 DataStream,需要注意的是这里进行的是逻辑拆分,即 Split 只是将数据贴上不同的类型标签,但最终返回的仍然只是一个 SplitStream;

* Select [SplitStream → DataStream]:

- 想要从逻辑拆分的 SplitStream 中获取真实的不同类型的 DataStream,需要使用 Select 算子

  1. public static void split(StreamExecutionEnvironment env) {
  2. DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
  3. // 标记
  4. SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() {
  5. @Override
  6. public Iterable<String> select(Integer value) {
  7. List<String> output = new ArrayList<String>();
  8. output.add(value % 2 == 0 ? "even" : "odd");
  9. return output;
  10. }
  11. });
  12. // 获取偶数数据集
  13. split.select("even").print();
  14. }

运行结果:

  1. 2
  2. 6
  3. 4
  4. 8


* project [DataStream → DataStream]

- project 主要用于获取 tuples 中的指定字段集

  1. public static void project(StreamExecutionEnvironment env) {
  2. DataStreamSource<Tuple3<String, Integer, String>> streamSource = env.fromElements(
  3. new Tuple3<>("li", 22, "2018-09-23"),
  4. new Tuple3<>("ming", 33, "2020-09-23"));
  5. streamSource.project(0, 2).print();
  6. }

运行结果:

  1. (ming,2020-09-23)
  2. (li,2018-09-23)

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/639073
推荐阅读
相关标签
  

闽ICP备14008679号