当前位置:   article > 正文

Flink(1):开发环境的搭建(Java和scala)_scala flink maven archetype

scala flink maven archetype

一、实现功能

流式开发Flink开发环境的搭建。

二、实现步骤:Java开发环境

【参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/java_api_quickstart.html】

1.本地环境

(1)官网要求
Maven 3.0.4 (or higher) and Java 8.x 
(2)本地环境
Maven:3.3.9
Java 1.8

2.创建java项目

(1)进入项目目录,运行maven命令

  1. mvn archetype:generate   \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-java \
  4. -DarchetypeVersion=1.7.2 \
  5. -DarchetypeCatalog=local

本地:

E:\Tools\WorkspaceforMyeclipse\flink_project>mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.2  -DarchetypeCatalog=local

备注:添加DarchetypeCatalog参数,使创建项目更加快

(2)输入GAV对应参数

  1. Define value for property 'groupId': com.bd.flink
  2. Define value for property 'artifactId': flink-pro
  3. Define value for property 'version' 1.0-SNAPSHOT: : 1.0
  4. Define value for property 'package' com.bd.flink: :
  5. Confirm properties configuration:
  6. groupId: com.bd.flink
  7. artifactId: flink-pro
  8. version: 1.0
  9. package: com.bd.flink
  10.  Y: : Y

(3)查看创建结果

  1. E:\Tools\WorkspaceforMyeclipse\flink_project>tree
  2. 卷 本地磁盘 的文件夹 PATH 列表
  3. 卷序列号为 0003-6793
  4. E:.
  5. └─flink-pro
  6.     └─src
  7.         └─main
  8.             ├─java
  9.             │  └─com
  10.             │      └─bd
  11.             │          └─flink
  12.             └─resources

3.导入idea

File-》Open-》导入项目pom.xml
 
查看项目结构

4.项目打包

(1)使用maven命令

进入项目根目录,执行

  1. E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro>mvn clean package
  2. 打包结果
  3. [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ flink-pro ---
  4. [INFO] Building jar: E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro\target\flink-pro-1.0.jar
  5. [INFO]
  6. [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flink-pro ---
  7. [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
  8. [INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
  9. [INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
  10. [INFO] Replacing original artifact with shaded artifact.
  11. [INFO] Replacing E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro\target\flink-pro-1.0.jar with E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro\target\flink-pro-1.0-shaded.jar
  12. [INFO] ------------------------------------------------------------------------
  13. [INFO] BUILD SUCCESS
  14. [INFO] ------------------------------------------------------------------------
  15. [INFO] Total time: 11.005 s
  16. [INFO] Finished at: 2019-11-30T16:24:20+08:00
  17. [INFO] Final Memory: 25M/184M
  18. [INFO] ------------------------------------------------------------------------

(2)    使用idea的maven打包工具
View-》Tools Windows-》Maven Projects-》clean+package

5.java开发WordCount项目实例

(1)四步

第一步:创建开发环境(set up the batch execution environment)
第二步:读取数据
第三步:开发业务逻辑(transform operations)
第四步:执行程序(execute program)

(2)代码

  1. package com.bd.flink._1130application;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.DataSource;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.util.Collector;
  7. /**
  8.  * Created by Administrator on 2019/11/30.
  9.  * wordcount代码:java实现
  10.  */
  11. public class BatchWCJava {
  12.     public static void main(String[] args) throws Exception {
  13.         String input="data\\hello.txt";
  14.         //第一步:创建开发环境(set up the batch execution environment)
  15.         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  16.         //第二步:读取数据
  17.         DataSource<String>  text=env.readTextFile(input);
  18.         //第三步:开发业务逻辑(transform operations)
  19.         text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
  20.             @Override
  21.             public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
  22.                 String[] tokens=value.toLowerCase().split(" ");
  23.                 for (String token : tokens) {
  24.                     if(token.length()>0){
  25.                         collector.collect(new Tuple2<String,Integer>(token,1));
  26.                     }
  27.                 }
  28.             }
  29.         }).groupBy(0).sum(1).print();
  30.         // 第四步:执行程序(execute program)
  31.         //  execute(), count(), collect(), 或者print()都是执行算子,运行即可
  32. //        env.execute("Flink Batch Java API Skeleton");
  33.     }
  34. }

(3)运行结果

其中flink_project\flink-pro\data\hello.txt内容

  1. flink hadoop storm
  2. flume spark streaming
  3. is excellent

执行结果

  1. (is,1)
  2. (streaming,1)
  3. (excellent,1)
  4. (hadoop,1)
  5. (flink,1)
  6. (flume,1)
  7. (storm,1)
  8. (spark,1)

三、实现步骤:scala环境

【参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/scala_api_quickstart.html

1.本地环境

(1)官网要求
Maven 3.0.4 (or higher) and Java 8.x 
(2)本地环境
Maven:3.3.9
Java 1.8

2.创建java项目

(1)进入项目目录,运行maven命令

  1. mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-scala \
  4. -DarchetypeVersion=1.7.2 \
  5. -DarchetypeCatalog=local

本地:

E:\Tools\WorkspaceforMyeclipse\flink_project>mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.7.2 -DarchetypeCatalog=local

备注:添加DarchetypeCatalog参数,使创建项目更加快

(2)输入GAV对应参数

  1. Define value for property 'groupId': com.bd.flink
  2. Define value for property 'artifactId': flink-pro-scala
  3. Define value for property 'version' 1.0-SNAPSHOT: : 1.0
  4. Define value for property 'package' com.bd.flink: :
  5. Confirm properties configuration:
  6. groupId: com.bd.flink
  7. artifactId: flink-pro-scala
  8. version: 1.0
  9. package: com.bd.flink
  10.  Y: : Y

(3)查看创建结果

  1. E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro-scala>tree
  2. 卷 本地磁盘 的文件夹 PATH 列表
  3. 卷序列号为 0003-6793
  4. E:.
  5. └─src
  6.     └─main
  7.         ├─resources
  8.         └─scala
  9.             └─com
  10.                 └─bd
  11.                     └─flink

3.导入idea

File-》Open-》导入项目pom.xml
 
查看项目结构

4.项目打包

(1)使用maven命令

进入项目根目录,执行

E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro-scala>mvn clean package  

打包结果

  1. [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ flink-pro-scala ---
  2. [INFO] Building jar: E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro-scala\target\flink-pro-scala-1.0.jar
  3. [INFO]
  4. [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flink-pro-scala ---
  5. [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
  6. [INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
  7. [INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
  8. [INFO] Replacing original artifact with shaded artifact.
  9. [INFO] Replacing E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro-scala\target\flink-pro-scala-1.0.jar with E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro-scala\target\flink-pro-scala-1.0-shaded.jar
  10. [INFO] ------------------------------------------------------------------------
  11. [INFO] BUILD SUCCESS
  12. [INFO] ------------------------------------------------------------------------
  13. [INFO] Total time: 24.117 s
  14. [INFO] Finished at: 2019-11-30T18:02:59+08:00
  15. [INFO] Final Memory: 16M/199M
  16. [INFO] ------------------------------------------------------------------------

(2)使用idea的maven打包工具
View-》Tools Windows-》Maven Projects

 


5.修改scala版本

自动生成scala版本默认为2.11.12

  1. <properties>
  2.    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3.    <flink.version>1.7.2</flink.version>
  4.    <scala.binary.version>2.11</scala.binary.version>
  5.    <scala.version>2.11.12</scala.version>
  6. </properties>

而本地开发环境的式11.8,所以会报错:Cannot resolve reference read TextFile with such signature
 
解决方法:即修改为2.11.8

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <flink.version>1.7.2</flink.version>
  4. <scala.binary.version>2.11</scala.binary.version>
  5. <!--<scala.version>2.11.12</scala.version>-->
  6. <scala.version>2.11.8</scala.version>
  7. </properties>

6.scala开发WordCount项目实例

(1)四步

第一步:创建开发环境
第二步:读取数据
第三步:开发业务逻辑(transform operations)
第四步:执行程序(execute program)

(2)代码

  1. package com.bd.flink._1130WordCount
  2. import org.apache.flink.api.scala.ExecutionEnvironment
  3. /**
  4. * Created by Administrator on 2019/11/30.
  5. */
  6. object BatchWordCountScala {
  7. def main(args: Array[String]): Unit = {
  8. val input="data\\hello.txt"
  9. //第一步:创建开发环境
  10. val env=ExecutionEnvironment.getExecutionEnvironment
  11. //第二步:读取数据
  12. val text=env.readTextFile(input)
  13. //第三步:开发业务逻辑(transform operations)
  14. //引入scala隐式转换
  15. import org.apache.flink.api.scala._
  16. text.flatMap(_.toLowerCase.split(" "))
  17. .filter(_.nonEmpty)
  18. .map((_,1))
  19. .groupBy(0)
  20. .sum(1).print()
  21. //第四步:执行程序(execute program)
  22. //本地print为一个执行操作
  23. }
  24. }

(3)运行结果

其中flink_project\flink-pro\data\hello.txt内容

  1. flink hadoop storm
  2. flume spark streaming
  3. is excellent

执行结果

  1. (is,1)
  2. (streaming,1)
  3. (excellent,1)
  4. (hadoop,1)
  5. (flink,1)
  6. (flume,1)
  7. (storm,1)
  8. (spark,1)

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/598005
推荐阅读
相关标签
  

闽ICP备14008679号