赞
踩
本文主要练习了Flink框架中的转换方法:map,flatMap,filter
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>
package org.feng.transform; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; import lombok.ToString; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.Serializable; /** * Created by Feng on 2019/12/6 14:00 * CurrentProject's name is flink * Flink的Transformations操作主要用于将一个和多个DataStream按需要转换成新的DataStream。 * 1. DataStream Transformations:进行数据流相关转换操作; * 2. Physical partitioning:物理分区。Flink提供的底层API,允许用户定义数据的分区规则; * 3. Task chaining and resource groups:任务链和资源组。允许用户进行任务链和资源组的细粒度的控制。 * * 练习:map() , flatMap , filter() * @author Feng */ public class Transformations1 { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 先构建一个字符串类型的数据流 DataStream<String> mapDataStream = env.fromElements("how", "old", "are", "you", "?"); // map() 对一个DataStream中的每个元素都执行特定的转换操作 mapDataStream.map(word -> "[" + word + "]").print(); // flatMap() 和map()类似,但是可以将一个输入的元素映射为多个元素输出 // 比如:将一句英文中的所有单词都封装为一个Words对象 final String space = " "; String word1 = "Nice to meet you"; String word2 = "Nice to meet you too"; // 映射为多个对象 DataStream<Words> flapMapDataStream = env.fromElements(word1, word2).flatMap( (FlatMapFunction<String, Words>) (word, collector) -> { for (String w : word.split(space)) { collector.collect(new Words(w)); } }).returns(Words.class); flapMapDataStream.print(); // filter() 过滤信息 // 这里过滤出Words对象数据流中,属性word以t字符串开头的元素 // 再使用map拼接上字符串 flapMapDataStream .filter(words -> words.getWord().startsWith("t")) .map(words -> "filter():" + words) .print(); env.execute(); } /** * 自定义一个实体bean。用来练习数据流的转换操作。 */ @ToString @AllArgsConstructor @Getter @Setter private static class Words implements Serializable { private static final long serialVersionUID = -7202455108400607936L; private String word; } }
2> [how] 2> [?] 4> [are] 3> [old] 1> [you] 2> Transformations1.Words(word=Nice) 3> Transformations1.Words(word=Nice) 2> Transformations1.Words(word=to) 2> filter():Transformations1.Words(word=to) 2> Transformations1.Words(word=meet) 2> Transformations1.Words(word=you) 3> Transformations1.Words(word=to) 3> filter():Transformations1.Words(word=to) 3> Transformations1.Words(word=meet) 3> Transformations1.Words(word=you) 3> Transformations1.Words(word=too) 3> filter():Transformations1.Words(word=too) Process finished with exit code 0
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。