当前位置:   article > 正文

仅用java+IntelliJ IDEA+Maven搭建Flink项目_idea flink java 例子

idea flink java 例子

1、背景

只在有java+intellij(自带maven)的环境快速搭建flink项目

 2、运行代码(亲测有效)

一、打开项目中的pom.xml文件,将其中依赖部分的"<scope>provided</scope>"注释(或删除)掉。

二、在IntelliJ IDEA中打开BatchJob源文件,编辑代码如下:

  1. package com.xueai8
  2. import org.apache.flink.api.scala._
  3. object BatchJob {
  4. def main(args: Array[String]) {
  5. // 设置批执行环境
  6. val env = ExecutionEnvironment.getExecutionEnvironment
  7. // 得到输入数据
  8. val text = env.fromElements("good good study", "day day up")
  9. // 对数据进行转换
  10. val counts = text.flatMap { _.toLowerCase.split("\\W+") }
  11. .map { (_, 1) }
  12. .groupBy(0)
  13. .sum(1)
  14. // 执行并输出结果
  15. counts.print()
  16. }
  17. }

三、在文件内任何空白处,单击右键,在弹出菜单中选择"run BatchJob",执行该程序,在下方的运行窗口可以看到如下输出结果:

  1. (up,1)
  2. (day,2)
  3. (good,2)
  4. (study,1)

四、运行streaming项目

在本机提前执行    nc -lk 1111,输入aa,abc,然后执行streaming任务

  1. import org.apache.flink.streaming.api.TimeCharacteristic
  2. import org.apache.flink.streaming.api.scala._
  3. import org.apache.flink.streaming.api.windowing.time.Time
  4. object StreamingJob {
  5. def main(args: Array[String]) {
  6. // set up the streaming execution environment
  7. val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  8. environment.setParallelism(1)
  9. environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  10. val log = environment.socketTextStream("127.0.0.1",1111)
  11. val value = log.map(value => {
  12. (value, 1)
  13. }).keyBy(0)
  14. .timeWindow(Time.seconds(10))
  15. .reduce((a,b)=>{(a._1,a._2+b._2)})
  16. .print()
  17. environment.execute()
  18. }
  19. }

3、步骤(亲测有效)

在IntelliJ IDEA中创建Flink项目

第一步:启动IntelliJ IDEA,创建一个新的项目,如下图所示:

第二步:选择Maven项目,并选择"Create from archetype":

第三步:自己添加Flink的archetype

添加flink-quickstart-scala的archetype如下图:repository是可选的

第四步:选择对应的archetype,比如,这里我选择flink-quickstart-scala,如下所示:

第五步:指定项目的groupId、artifactId名称。这里我分别取以下名称:

  • groupId:com.xuehua
  • artifactId:FlinkScala

第六步:接下来,指定项目的Maven配置,默认就好。

第七步:指定项目的名称和项目文件所在位置。这里保持默认即可。单击【Finish】按钮,开始创建项目:

第八步:Maven会自动构建项目,最后的项目结构如下所示:

可以看出,flink-quickstart-scala快速地构建了一个基本的Flink项目框架,并生成创建了两个模板程序文件:用于流处理的StreamingJob和用于批处理的BatchJob。

注:同样的步骤,选择flink-quickstart-java,创建一个基于Java API的Flink项目框架。请自行尝试。

第九步:

注释掉maven文件中的<plugin>信息和provided信息,添加scala插件

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/557263
推荐阅读
相关标签
  

闽ICP备14008679号