赞
踩
参考: flink1.10官方文档
当前maven配置为:
- <properties>
- <flink.version>1.10.0</flink.version>
- </properties>
- <!-- Flink dependencies -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime-web_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <!--Use this dependency if you are using the DataStream API-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>${flink.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- </dependencies>
以下为代码示例和介绍:
-
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.api.java.tuple.Tuple3;
- import org.apache.flink.streaming.api.collector.selector.OutputSelector;
- import org.apache.flink.streaming.api.datastream.ConnectedStreams;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SplitStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-
- import java.util.ArrayList;
- import java.util.List;
-
-
- public class DataTransfroms {
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- split(env);
- connect(env);
- union(env);
- project(env);
- env.execute("flink");
- }
-
- }
Connect 操作用于连接两个或者多个类型不同的 DataStream ,其返回的类型是 ConnectedStreams ,此时被连接的多个 DataStreams 可以共享彼此之间的数据状态。
但是需要注意的是由于不同 DataStream 之间的数据类型是不同的,如果想要进行后续的计算操作,还需要通过 CoMap 或 CoFlatMap 将 ConnectedStreams 转换回 DataStream:
- public static void connect(StreamExecutionEnvironment env) {
- DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 3),
- new Tuple2<>("b", 5));
- DataStreamSource<Integer> streamSource02 = env.fromElements(2, 3, 9);
- // 使用connect进行连接
- ConnectedStreams<Tuple2<String, Integer>, Integer> connect = streamSource01.connect(streamSource02);
- connect.map(new CoMapFunction<Tuple2<String, Integer>, Integer, Integer>() {
- @Override
- public Integer map1(Tuple2<String, Integer> value) throws Exception {
- return value.f1;
- }
-
- @Override
- public Integer map2(Integer value) throws Exception {
- return value;
- }
- }).map(x -> x * 100).print();
- }
- 500
- 200
- 300
- 300
- 900
用于连接两个或者多个元素类型相同的 DataStream 。当然一个 DataStream 也可以与其本生进行连接,此时该 DataStream 中的每个元素都会被获取两次:
- public static void union(StreamExecutionEnvironment env) {
- DataStreamSource<Tuple2<String, Integer>> streamSource01 = env.fromElements(new Tuple2<>("a", 1),
- new Tuple2<>("a", 2));
- DataStreamSource<Tuple2<String, Integer>> streamSource02 = env.fromElements(new Tuple2<>("b", 1),
- new Tuple2<>("b", 2));
- streamSource01.union(streamSource02);
-
- streamSource01.print();
-
- streamSource01.union(streamSource01, streamSource02);
-
- streamSource01.print();
- }
- (a,2)
- (a,1)
- (a,1)
- (a,2)
- 用于将一个 DataStream 按照指定规则进行拆分为多个 DataStream,需要注意的是这里进行的是逻辑拆分,即 Split 只是将数据贴上不同的类型标签,但最终返回的仍然只是一个 SplitStream;
- 想要从逻辑拆分的 SplitStream 中获取真实的不同类型的 DataStream,需要使用 Select 算子
- public static void split(StreamExecutionEnvironment env) {
- DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
- // 标记
- SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() {
- @Override
- public Iterable<String> select(Integer value) {
- List<String> output = new ArrayList<String>();
- output.add(value % 2 == 0 ? "even" : "odd");
- return output;
- }
- });
- // 获取偶数数据集
- split.select("even").print();
- }
- 2
- 6
- 4
- 8
- project 主要用于获取 tuples 中的指定字段集
- public static void project(StreamExecutionEnvironment env) {
- DataStreamSource<Tuple3<String, Integer, String>> streamSource = env.fromElements(
- new Tuple3<>("li", 22, "2018-09-23"),
- new Tuple3<>("ming", 33, "2020-09-23"));
- streamSource.project(0, 2).print();
- }
- (ming,2020-09-23)
- (li,2018-09-23)
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。