当前位置:   article > 正文

大数据HelloWorld-Flink实现WordCount_hadoop使用flank编写wordcount

hadoop使用flank编写wordcount

MR,Spark,Flink自开篇第一个程序都是Word Count。那么今天Flink开始目标就是在本地调试出Word Count。所有的语言开篇章都是Hello Word,数据处理引擎也有Hello Word。那就是Word Count。

单机安装Flink

开始Flink之前先在本机尝试安装一下Flink,当然FLink正常情况下是部署的部署方式。作者比较穷,机器配置太低开不了几个虚拟机。所以只能先演示个单机的安装
。Apache Flink需要在Java1.8 +以上的环境中运行。
所以,先确保自己的JDK版本是1.8包含以上的

。Flink单机部署非常简单,只需安装下载安装即可。如果需要与Hadoop版本结合,那么下载相应我的地方直接下载了Scala2.11的相关版本。  
点击进入Apache页面进行下载,大小约有283MB。

将下载下来的压缩包进行解压即可。

打开命令行直接执行  /bin/start-cluster.bat  进行启动。 

 

浏览器打开  http:// localhost:8081

至此在Windows10环境下即完成Flink的启动。

编写WordCount

因为最终也会转换为JAVA字节码文件,所以Flink程序可以由Java,Scala两种语言都可以进行开发。也可以同时开发。某些Java写部分代码,Scala写其他部分代码。可以参考<Apache Flink利用Maven对Scala与Java进行混编>。

Flink官方提供快速生成工程的两种工具:SBT与Maven。由于作者比较熟悉Maven,(或者说没用过SBT)。所以直接使用Maven快速创建一个工程。

Java版本

  1. mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-java \
  4. -DarchetypeVersion=1.8.0

Scala版本

  1. mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-scala \
  4. -DarchetypeVersion=1.8.0

按照提示输入相关信息,即可生成最终的项目。

  1. ├── pom.xml
  2. └── src
  3. └── main
  4. ├── resources
  5. │ └── log4j.properties
  6. └── scala/java
  7. └── org
  8. └── myorg
  9. └── quickstart
  10. ├── BatchJob.scala
  11. └── StreamingJob.scala

把工程引入到IDEA中是否
使用Scala的话,那么需要安装Scala的插件。搜索安装同时需要把Scala语言包进行安装。不知道如何操作可以联系我微信公号<指尖数虫>。

  1. package jar;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.functions.ReduceFunction;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.util.Collector;
  8. public class BatchJob {
  9. public static void main(String[] args) throws Exception {
  10. // set up the batch execution environment
  11. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  12. //读取目录下的文件
  13. DataSource<String> data = env.readTextFile("/opt/Server_Packets/log/ServerLog_1_runtime.log");
  14. //把文件中的内容按照空格进行拆分为 word,1 1 是为了能够在下面进行计算.
  15. data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  16. @Override
  17. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
  18. for (String word : s.split(" ")){
  19. collector.collect(new Tuple2<>(word,1));
  20. }
  21. }
  22. })
  23. // 按照元组中的第1位进行分组
  24. .groupBy(0)
  25. // 分组的元组的计算方式为 value +value 也就是刚才的 同样的词 把 1+1
  26. .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
  27. @Override
  28. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t2, Tuple2<String, Integer> t1) throws Exception {
  29. return new Tuple2<>(t1.f0,t1.f1+ t2.f1);
  30. }
  31. })
  32. //输出结果
  33. .print();
  34. }
  35. }

更多有趣和专业的大数据相关文章,微信搜索  : 指尖数虫  或扫码关注

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/891344
推荐阅读
相关标签
  

闽ICP备14008679号