赞
踩
在 IDEA 中创建一个 Maven 工程:FlinkTutorial
在 pom 文件中引入依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<!-- 2.12 是scala版本 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
package com.app.wc // 批处理 WordCount public class WordCount { public static void main(String[] args) throws Exception { // 1.创建 flink 执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2.读取文件数据 // DataSource 是 Operator 的子类,Operator 是 DataSet 的子类 // Flink 的批处理是基于 DataSet 类型的 API 来处理 DataSource<String> inputData = env.readTextFile("datas/word.txt"); // 3.执行数据处理(按空格分词并转换成 (word, 1) 这样的二元组格式),分组聚合 DataSet<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()) //需要传入FlatMapFunction接口的实现类 .groupBy(0) //可以传入KeySelector实现类或位置索引或字段名 .sum(1); // 传入进行聚合计算的位置索引 // 4.输出 result.print(); } // 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法 // Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2 public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { @override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { // 按空格分词 String[] words = value.split(" "); // 遍历数组并转换为二元组输出 for(String word : words) { out.collect(new Tuple2(word, 1)); } } } }
package com.app.wc // 流处理WordCount public class StreamWordCount { public static void main(String[] args) throws Exception { // 1.创建flink流处理执行环境对象 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(8); // 设置并发度 // 2.读取文件 StreamDataSource<String> inputData = env.readTextFile("datas/word.txt"); // 3.处理数据(分词,转换结构),并分组聚合 DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1); // 4.输出 result.print(); // 5.执行任务(流处理是事件触发的) env.execute(); } // 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法 // Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2 public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { @override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { // 按空格分词 String[] words = value.split(" "); // 遍历数组并转换为二元组输出 for(String word : words) { out.collect(new Tuple2(word, 1)); } } } }
方便生产环境部署
package com.app.wc public class StreamWordCount2 { public static void main(String[] args) throws Exception { // 1.创建flink流处理执行环境对象 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(8); // 设置并发度 // 2.监听 7777 端口服务(nc -lk 7777) // 2.1 使用 ParameterTool 类从启动参数中获取配置项 ParameterTool tool = ParameterTool.formArgs(args); String hostname = tool.get("hostname"); int port = tool.getInt("port"); // 2.2 获取数据流 DataStream<String> inputData = env.socketTextFile(hostname, port); // 3.处理数据(分词,转换结构),并分组聚合 DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1); // 4.输出 result.print(); // 5.执行任务(流处理是事件触发的) env.execute(); } // 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法 // Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2 public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> { @override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { // 按空格分词 String[] words = value.split(" "); // 遍历数组并转换为二元组输出 for(String word : words) { out.collect(new Tuple2(word, 1)); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。