当前位置:   article > 正文

Flink的Transformations(1)_flink map java

flink map java

环境描述

本文主要练习了Flink框架中的转换方法:map,flatMap,filter

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.9.0</flink.version>
		<java.version>1.8</java.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>${java.version}</maven.compiler.source>
		<maven.compiler.target>${java.version}</maven.compiler.target>
	</properties>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
<dependencies>
		<!-- Apache Flink dependencies -->
		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.16.18</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.7</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
	</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

java 代码

package org.feng.transform;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.Serializable;

/**
 * Created by Feng on 2019/12/6 14:00
 * CurrentProject's name is flink
 * Flink的Transformations操作主要用于将一个和多个DataStream按需要转换成新的DataStream。
 * 1. DataStream Transformations:进行数据流相关转换操作;
 * 2. Physical partitioning:物理分区。Flink提供的底层API,允许用户定义数据的分区规则;
 * 3. Task chaining and resource groups:任务链和资源组。允许用户进行任务链和资源组的细粒度的控制。
 *
 * 练习:map() , flatMap , filter()
 * @author Feng
 */
public class Transformations1 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        // 先构建一个字符串类型的数据流
        DataStream<String> mapDataStream =
                env.fromElements("how", "old", "are", "you", "?");

        // map() 对一个DataStream中的每个元素都执行特定的转换操作
        mapDataStream.map(word -> "[" + word + "]").print();

        // flatMap() 和map()类似,但是可以将一个输入的元素映射为多个元素输出
        // 比如:将一句英文中的所有单词都封装为一个Words对象
        final String space = " ";
        String word1 = "Nice to meet you";
        String word2 = "Nice to meet you too";

        // 映射为多个对象
        DataStream<Words> flapMapDataStream =
                env.fromElements(word1, word2).flatMap(
                        (FlatMapFunction<String, Words>) (word, collector) -> {
                            for (String w : word.split(space)) {
                                collector.collect(new Words(w));
                            }
                }).returns(Words.class);

        flapMapDataStream.print();


        // filter() 过滤信息
        // 这里过滤出Words对象数据流中,属性word以t字符串开头的元素
        // 再使用map拼接上字符串
        flapMapDataStream
                .filter(words -> words.getWord().startsWith("t"))
                .map(words -> "filter():" + words)
                .print();

        env.execute();
    }

    /**
     * 自定义一个实体bean。用来练习数据流的转换操作。
     */
    @ToString
    @AllArgsConstructor
    @Getter
    @Setter
    private static class Words implements Serializable {

        private static final long serialVersionUID = -7202455108400607936L;

        private String word;
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

执行结果

2> [how]
2> [?]
4> [are]
3> [old]
1> [you]
2> Transformations1.Words(word=Nice)
3> Transformations1.Words(word=Nice)
2> Transformations1.Words(word=to)
2> filter():Transformations1.Words(word=to)
2> Transformations1.Words(word=meet)
2> Transformations1.Words(word=you)
3> Transformations1.Words(word=to)
3> filter():Transformations1.Words(word=to)
3> Transformations1.Words(word=meet)
3> Transformations1.Words(word=you)
3> Transformations1.Words(word=too)
3> filter():Transformations1.Words(word=too)

Process finished with exit code 0

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/972443
推荐阅读
相关标签
  

闽ICP备14008679号