赞
踩
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.antg</groupId> <artifactId>worldcount</artifactId> <version>1.0-SNAPSHOT</version> <name>${project.artifactId}</name> <description>My wonderfull scala app</description> <inceptionYear>2018</inceptionYear> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.11</scala.version> <scala.compile.at.version>2.11</scala.compile.at.version> <flink.version>1.13.1</flink.version> <jdk.version>1.8</jdk.version> </properties> <dependencies> <!-- flink包依赖配置-start --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.compile.at.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- flink包依赖配置-end --> <!-- 日志类引入 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.6</version> <scope>compile</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>compile</scope> </dependency> </dependencies> <build> <plugins> <!-- 因为往往是scala和java在一起混合开发,故需要设置多个源文件目录,故需要maven新插件build-helper-maven-plugin来支持设置多个源文件夹,也可以设置多个资源路径 --> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 --> <source>${basedir}/src/main/java</source> <source>${basedir}/src/main/scala</source> </sources> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> <encoding>${encoding}</encoding> </configuration> </plugin> <!--打all-in-one jar包 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>flink.KafkaDemo1</mainClass> </transformer> --> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
数据文件 : input.txt
a b a c a
d a b a
c c d
e f
a
java代码
package com.antg; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class FlinkWordCount4DataSet { public static void main(String[] args) throws Exception { // 创建Flink的代码执行离线数据流上下文环境变量 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 定义从本地文件系统当中文件路径 String filePath = ""; if (args == null || args.length == 0) { filePath = "C:\\Users\\Administrator\\Desktop\\input.txt"; } else { filePath = args[0]; } // 获取输入文件对应的DataSet对象 DataSet<String> inputLineDataSet = env.readTextFile(filePath); // 对数据集进行多个算子处理,按空白符号分词展开,并转换成(word, 1)二元组进行统计 DataSet<Tuple2<String, Integer>> resultSet = inputLineDataSet .flatMap( new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { // 按空白符号分词 String[] wordArray = line.split("\\s"); // 遍历所有word,包成二元组输出 for (String word : wordArray) { out.collect(new Tuple2<String, Integer>( word, 1)); } } }).groupBy(0) // 返回的是一个一个的(word,1)的二元组,按照第一个位置的word分组 .sum(1); // 将第二个位置上的freq=1的数据求和 // 打印出来计算出来的(word,freq)的统计结果对 // 注:print会自行执行env.execute方法,故不用再最后执行env.execute正式开启执行过程 resultSet.print(); // 注:writeAsText的sink算子,必须要调用env.execute方法才能正式开启环境执行 // resultSet.writeAsText("d:\\temp\\output2", WriteMode.OVERWRITE) // .setParallelism(2); // 正式开启执行flink计算 // env.execute(); } }
注意 :
运行结果 :
这种运行方式比较推荐,支持flink交互的所有方式,比较灵活,而且上传到服务器的时候也不需要将flink的依赖打入包中,极大压缩了包的大小
构建环境:
下载flink1.13.1的源码包 https://flink.apache.org/zh/downloads.html
直接解压即可 tar -zxvf 路径
使hadoop的环境变量生效
方式一 : 将hadoop的环境变量设置到profile中
方式二 : 每次执行命令的终端先运行命令 export HADOOP_CLASS hadoop classpath
./bin/flink run-application -t yarn-application -c com.antg.FlinkWordCount4DataSet ../../flink/original-worldcount-1.0-SNAPSHOT.jar hdfs:///user/fujunhua/data/input.txt
结果在集群上,所以本地看不了
./bin/flink run -t yarn-per-job -c com.antg.FlinkWordCount4DataSet ../../flink/original-worldcount-1.0-SNAPSHOT.jar hdfs:///user/fujunhua/data/input.txt
per-job模式的main方法在客户端,所以客户端可以看到结果
附加模式
首先需要将session提前开启
./bin/yarn-session.sh
运行任务(客户端不可中途退出)
./bin/flink run -c com.antg.FlinkWordCount4DataSet ../../flink/original-worldcount-1.0-SNAPSHOT.jar hdfs:///user/fujunhua/data/input.txt
分离模式
开启session
./bin/yarn-session.sh -d
运行(客户端中途可退出)
命令与执行效果附加模式一样
代码
package com.antg; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class FlinkWordCount4DataStream { public static void main(String[] args) throws Exception { //创建上下文 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据流 String host = "localhost"; int post = 9999; DataStreamSource inputLineDataStream = env.socketTextStream(host,post); //处理数据 DataStream<Tuple2<String, Integer>> resultStream = inputLineDataStream .flatMap( new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { // 按空白符号分词 String[] wordArray = line.split("\\s"); // 遍历所有word,包成二元组输出 for (String word : wordArray) { out.collect(new Tuple2<String, Integer>( word, 1)); } } }).keyBy(0) // 返回的是一个一个的(word,1)的二元组,按照第一个位置的word分组,因为此实时流是无界的,即数据并不完整,故不用group // by而是用keyBy来代替 .sum(1); // 将第二个位置上的freq=1的数据求和 // 打印出来计算出来的(word,freq)的统计结果对 // 打印出来计算出来的(word,freq)的统计结果对 resultStream.print(); //启动处理 // 正式启动实时流处理引擎 env.execute(); } }
启动项目并使用netcat向9999端口发送数据
nc64.exe -lp 9999
与离线处理的一样,只不过一般数据源不是socket发送的,而是类似kafka等中间件发送
pom文件
一般开发scala项目时要将对应的java依赖也引入方便之后开发
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.antg</groupId> <artifactId>worldcount</artifactId> <version>1.0-SNAPSHOT</version> <name>${project.artifactId}</name> <description>My wonderfull scala app</description> <inceptionYear>2018</inceptionYear> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.11</scala.version> <scala.compile.version>2.11</scala.compile.version> <flink.version>1.13.1</flink.version> <jdk.version>1.8</jdk.version> </properties> <dependencies> <!-- flink包依赖配置-start --> <!-- java开发flink依赖-start --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.compile.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- java开发flink依赖-end --> <!-- scala开发flink依赖-start --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.compile.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.compile.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- scala开发flink依赖-end --> <!-- flink包依赖配置-end --> <!-- 日志类引入 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.6</version> <scope>compile</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>compile</scope> </dependency> </dependencies> <build> <plugins> <!-- 因为往往是scala和java在一起混合开发,故需要设置多个源文件目录,故需要maven新插件build-helper-maven-plugin来支持设置多个源文件夹,也可以设置多个资源路径 --> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 --> <source>${basedir}/src/main/java</source> <source>${basedir}/src/main/scala</source> </sources> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> <encoding>${encoding}</encoding> </configuration> </plugin> <!--打all-in-one jar包 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>flink.KafkaDemo1</mainClass> </transformer> --> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
代码
package com.antg import org.apache.flink.api.scala._ import org.apache.flink.api.scala.ExecutionEnvironment object FlinkWordCount4DataSet4Scala { def main(args: Array[String]): Unit = { //获取上下文执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //加载数据源-1-从内存当中的字符串渠道 // val source = env.fromElements("a b a c a", "a c d") // 加载数据源-2-定义从本地文件系统当中文件路径 var filePath = ""; if (args == null || args.length == 0) { filePath = "C:\\Users\\Administrator\\Desktop\\input.txt"; } else { filePath = args(0); } val source = env.readTextFile(filePath); //进行transformation操作处理数据 val ds = source.flatMap(x => x.split("\\s+")).map((_, 1)).groupBy(0).sum(1) //输出到控制台 ds.print() // 正式开始执行操作 // 由于是Batch操作,当DataSet调用print方法时,源码内部已经调用Excute方法,所以此处不再调用 //如果调用反而会出现上下文不匹配的执行错误 //env.execute("Flink Batch Word Count By Scala") } }
运行结果
与java版一致
后面几种运行方式也与java版一致这里就不赘述
依赖已经在离线版引入,这里就不赘述了
代码
package com.antg import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala._ object FlinkWOrdCount4DataStream4Scala { def main(args: Array[String]): Unit = { //获取上下文执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //加载或创建数据源-从socket端口获取 val source = env.socketTextStream("localhost", 9999, '\n') //进行transformation操作处理数据 val dataStream = source.flatMap(_.split("\\s+")).map((_, 1)).keyBy(0).sum(1) //输出到控制台 dataStream.print() //执行操作 env.execute("FlinkWordCount4DataStream4Scala") } }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。