当前位置:   article > 正文

Flink 入门案例介绍_idea flink项目创建

idea flink项目创建

一、工程搭建

  • 在 IDEA 中创建一个 Maven 工程:FlinkTutorial

  • 在 pom 文件中引入依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <!-- 2.12 是scala版本 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

二、批处理 WordCount 案例

package com.app.wc

// 批处理 WordCount
public class WordCount {
    public static void main(String[] args) throws Exception {
    	// 1.创建 flink 执行环境
    	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    	
    	// 2.读取文件数据
    	// DataSource 是 Operator 的子类,Operator 是 DataSet 的子类
    	// Flink 的批处理是基于 DataSet 类型的 API 来处理
    	DataSource<String> inputData = env.readTextFile("datas/word.txt");
    	
    	// 3.执行数据处理(按空格分词并转换成 (word, 1) 这样的二元组格式),分组聚合
    	DataSet<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap())  //需要传入FlatMapFunction接口的实现类
    			 .groupBy(0)  //可以传入KeySelector实现类或位置索引或字段名
    			 .sum(1);  // 传入进行聚合计算的位置索引
    	
    	// 4.输出
    	result.print();
    	
    }
    
    // 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法
    // Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2
    public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
    	@override
    	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
    		// 按空格分词
    		String[] words = value.split(" ");
    		
    		// 遍历数组并转换为二元组输出
    		for(String word : words) {
    			out.collect(new Tuple2(word, 1));
    		}
    	}
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

三、有界流处理 WordCount 案例

package com.app.wc

// 流处理WordCount
public class StreamWordCount {
	public static void main(String[] args) throws Exception {
		// 1.创建flink流处理执行环境对象
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		// env.setParallelism(8); // 设置并发度
		
		// 2.读取文件
		StreamDataSource<String> inputData = env.readTextFile("datas/word.txt");
		
		// 3.处理数据(分词,转换结构),并分组聚合
		DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1);
		
		// 4.输出
		result.print();
		
		// 5.执行任务(流处理是事件触发的)
		env.execute();
		
	}
	
	 // 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法
    // Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2
    public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
    	@override
    	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
    		// 按空格分词
    		String[] words = value.split(" ");
    		
    		// 遍历数组并转换为二元组输出
    		for(String word : words) {
    			out.collect(new Tuple2(word, 1));
    		}
    	}
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

四、无界流处理 WordCount 案例

方便生产环境部署

package com.app.wc

public class StreamWordCount2 {
	public static void main(String[] args) throws Exception {
		// 1.创建flink流处理执行环境对象
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		// env.setParallelism(8); // 设置并发度
		
		// 2.监听 7777 端口服务(nc -lk 7777)
		// 2.1 使用 ParameterTool 类从启动参数中获取配置项
		ParameterTool tool = ParameterTool.formArgs(args);
		String hostname = tool.get("hostname");
		int port = tool.getInt("port");
		
		// 2.2 获取数据流
		DataStream<String> inputData = env.socketTextFile(hostname, port);
		
		// 3.处理数据(分词,转换结构),并分组聚合
		DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1);
		
		// 4.输出
		result.print();
		
		// 5.执行任务(流处理是事件触发的)
		env.execute();
		
	}
	
	 // 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法
    // Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2
    public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
    	@override
    	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
    		// 按空格分词
    		String[] words = value.split(" ");
    		
    		// 遍历数组并转换为二元组输出
    		for(String word : words) {
    			out.collect(new Tuple2(word, 1));
    		}
    	}
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/947055
推荐阅读
相关标签
  

闽ICP备14008679号