赞
踩
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.wgy</groupId> <artifactId>FlinkDome</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.10.1</version> </dependency> </dependencies> </project>
package com.wgy.wordcount import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment} import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]): Unit = { //创建一个批处理的执行环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //从文件中读取数据 var inputPath="D:\\scala_spark\\FlinkDome\\src\\main\\resources\\words"; val inputDataSet: DataSet[String] = env.readTextFile(inputPath) //对数据进行转换处理统计,先分词,再按照Word进行分组,最后进行聚合统计 val resultDataSet: AggregateDataSet[(String, Int)] = inputDataSet.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) //打印结果 resultDataSet.print() } }
//结果
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(scala,2)
(flink,2)
(hello,8)
(java,2)
(word,2)
//linux系统
yum install -y nc //下载端口工具
nc -lk 7777 //设置端口
package com.wgy.wordcount import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ object StreamWordCount { def main(args: Array[String]): Unit = { //创建一个流处理的执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //设置并行数,默认是系统核数 //env.setParallelism(4); //接收一个scoket文本流 val inputDataSet: DataStream[String] = env.socketTextStream("hadoop101",7777) //对数据进行转换处理统计,先分词,再按照Word进行分组,最后进行聚合统计 val resultDataSet: DataStream[(String, Int)] = inputDataSet.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1) //打印结果 resultDataSet.print() //启动任务执行 env.execute("stream word count") } }
//结果,word分区通过hash值或者取模分区,单词到指定分区,同一单词一定到同一分区,不同单词可能到同一分区
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
3> (hello,1)
7> (flink,1)
3> (hello,2)
7> (flink,2)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。