当前位置:   article > 正文

Eclipse-Java实现Storm的WordCount词频统计_在eclipse中使用wordcount统计文本文件中单词出现次数

在eclipse中使用wordcount统计文本文件中单词出现次数

 

目录

Spout层

Bolt层

Topology层

结果:


文章中的所有内容不明白的可以查看前后文或者call博主;

相关文章:

Storm集群安装部署1——准备版

Storm集群安装部署2——Centos6.5的默认python2.6.6版本升级到python2.7.15

Storm集群安装部署3——在节点服务器上安装 Storm并启动Storm

Storm集群安装部署4——在Storm节点服务器上安装 nimBus和supervisor

Eclipse配置、创建Maven项目

Storm集群上运行第一个“Hello World文件”

Eclipse实现Storm的WordCount词频统计

 

创建Maven项目如下:

 

我会尽量吧项目的注释写清楚一些,不懂的就问博主[/微笑]

 

Spout层

WordSpout:

  1. package com.swun.storm.topology;
  2. import com.swun.storm.Utils.ThreadUtils;
  3. import com.swun.storm.bolt.WordCountBolt;
  4. import com.swun.storm.bolt.WordReportBolt;
  5. import com.swun.storm.bolt.WordSplitBolt;
  6. import com.swun.storm.spout.WordSpout;
  7. import backtype.storm.Config;
  8. import backtype.storm.LocalCluster;
  9. import backtype.storm.topology.TopologyBuilder;
  10. import backtype.storm.tuple.Fields;
  11. public class WordTopology {
  12. //定义常量
  13. private static final String WORD_SPOUT_ID = "word-spout";
  14. private static final String SPLIT_BOLT_ID = "split-bolt";
  15. private static final String COUNT_BOLT_ID = "count-bolt";
  16. private static final String REPORT_BOLT_ID = "report-bolt";
  17. private static final String WORD_TOPOLOGY_ID = "wordcount-topology";
  18. public static void main(String[] args) {
  19. //创建spout、bolt对象;
  20. WordSpout spout = new WordSpout();
  21. WordSplitBolt splitBolt = new WordSplitBolt();
  22. WordCountBolt countBolt = new WordCountBolt();
  23. WordReportBolt reportBolt = new WordReportBolt();
  24. //创建拓扑对象
  25. TopologyBuilder builder = new TopologyBuilder();
  26. //设置spout
  27. builder.setSpout(WORD_SPOUT_ID, spout);
  28. //WordSpot -- > SplitBolt。使用随机分组
  29. builder.setBolt(SPLIT_BOLT_ID, splitBolt,5).shuffleGrouping(WORD_SPOUT_ID);
  30. //WordSplitBolt --> WordCountBolt。使用字段分组
  31. builder.setBolt(COUNT_BOLT_ID, countBolt,5).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
  32. //WordCountBolt --> WordReportBollt
  33. builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
  34. //创建配置
  35. Config config = new Config();
  36. // config.setNumWorkers(2);
  37. config.setDebug(true);
  38. //本地模式
  39. LocalCluster cluster = new LocalCluster();
  40. //提交拓扑
  41. cluster.submitTopology(WORD_TOPOLOGY_ID, config, builder.createTopology());
  42. ThreadUtils.waitForSeconds(10);
  43. cluster.killTopology(WORD_TOPOLOGY_ID);
  44. cluster.shutdown();
  45. }
  46. }

Bolt层

WordSpiltBolt:

  1. package com.swun.storm.bolt;
  2. import java.util.Map;
  3. import com.sun.org.apache.bcel.internal.classfile.Field;
  4. import backtype.storm.task.OutputCollector;
  5. import backtype.storm.task.TopologyContext;
  6. import backtype.storm.topology.IRichBolt;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.tuple.Fields;
  9. import backtype.storm.tuple.Tuple;
  10. import backtype.storm.tuple.Values;
  11. public class WordSplitBolt implements IRichBolt{
  12. /**
  13. *
  14. */
  15. private static final long serialVersionUID = 7509915842346596564L;
  16. private OutputCollector collector;
  17. /**
  18. * 初始化collector
  19. */
  20. @Override
  21. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  22. this.collector = collector;
  23. }
  24. /**
  25. * 分割句子
  26. */
  27. @Override
  28. public void execute(Tuple input) {
  29. //接收字段名为sentence的数据
  30. String sentence = input.getStringByField("sentencs");
  31. //分割数据,放到字符串数组中
  32. String[] words = sentence.split(" ");
  33. //遍历一遍数组,将分割后的数据上传
  34. for(String word : words) collector.emit(new Values(word));
  35. }
  36. @Override
  37. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  38. //声明发送数据的字段名为word
  39. declarer.declare(new Fields("word"));
  40. }
  41. @Override
  42. public void cleanup() {
  43. }
  44. @Override
  45. public Map<String, Object> getComponentConfiguration() {
  46. return null;
  47. }
  48. }

 

WordCountBolt:

  1. package com.swun.storm.bolt;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import backtype.storm.task.OutputCollector;
  5. import backtype.storm.task.TopologyContext;
  6. import backtype.storm.topology.IRichBolt;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.tuple.Fields;
  9. import backtype.storm.tuple.Tuple;
  10. import backtype.storm.tuple.Values;
  11. public class WordCountBolt implements IRichBolt{
  12. /**
  13. *
  14. */
  15. private static final long serialVersionUID = 2503069329578140148L;
  16. private OutputCollector collector;
  17. //用Map来统计分割后的词语
  18. private Map<String, Long> counts;
  19. /**
  20. * 初始化
  21. */
  22. @Override
  23. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  24. this.collector = collector;
  25. this.counts = new HashMap<String, Long>();
  26. }
  27. /**
  28. * 统计数据的频率
  29. */
  30. @Override
  31. public void execute(Tuple input) {
  32. //接收字段名为word的数据
  33. String wordString = input.getStringByField("word");
  34. //去map中查询数据的频率
  35. Long numLong = counts.get(wordString);
  36. //如果数据从来没有出现过,就初始化数据的次数为0
  37. if(numLong == null) numLong = 0L;
  38. //词语出现的次数+1
  39. numLong++;
  40. //把结果重新放入map中,方便别的bolt进程异步更改数据
  41. counts.put(wordString, numLong);
  42. //将更新的数据放入容器中
  43. collector.emit(new Values(wordString,numLong));
  44. }
  45. @Override
  46. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  47. //声明发送数据的字段名为word,和count。这里是提交了两个字段
  48. declarer.declare(new Fields("word","count"));
  49. }
  50. @Override
  51. public void cleanup() {
  52. }
  53. @Override
  54. public Map<String, Object> getComponentConfiguration() {
  55. return null;
  56. }
  57. }

WordReportBolt:

  1. package com.swun.storm.bolt;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import backtype.storm.task.OutputCollector;
  5. import backtype.storm.task.TopologyContext;
  6. import backtype.storm.topology.IRichBolt;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.tuple.Tuple;
  9. public class WordReportBolt implements IRichBolt{
  10. /**
  11. *
  12. */
  13. private static final long serialVersionUID = -778078375282102637L;
  14. private OutputCollector collector;
  15. //保存最终的结果,缓存效果
  16. private Map<String, Long> countsNum;
  17. /**
  18. * 初始化
  19. */
  20. @Override
  21. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  22. this.collector = collector;
  23. this.countsNum = new HashMap<String, Long>();
  24. }
  25. @Override
  26. public void execute(Tuple input) {
  27. //接收字段名为word和count的数据。
  28. String word = input.getStringByField("word");
  29. Long count = input.getLongByField("count");
  30. //将数据统一放进countNum这个map中
  31. countsNum.put(word,count);
  32. //这里为什么要连个map来统计词频呢?
  33. //CountBolt中的counts是用来试试统计每次接收词语后的数据,每个bolt有多个进程,counts是共享数据,是实时变化的
  34. //countNum是用来接汇总的。汇总不同词语的总数
  35. }
  36. @Override
  37. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  38. }
  39. /**
  40. * 在bolt停止的时候执行该方法
  41. */
  42. @Override
  43. public void cleanup() {
  44. //打印结果
  45. System.out.println("-----------------FINAL COUNTS-----------------");
  46. for(String key : countsNum.keySet()){
  47. System.out.println(key + ":" + countsNum.get(key));
  48. }
  49. System.out.println("----------------------------------------------");
  50. }
  51. @Override
  52. public Map<String, Object> getComponentConfiguration() {
  53. return null;
  54. }
  55. }

 

Topology层

WordTopology:

  1. package com.swun.storm.topology;
  2. import com.swun.storm.Utils.ThreadUtils;
  3. import com.swun.storm.bolt.WordCountBolt;
  4. import com.swun.storm.bolt.WordReportBolt;
  5. import com.swun.storm.bolt.WordSplitBolt;
  6. import com.swun.storm.spout.WordSpout;
  7. import backtype.storm.Config;
  8. import backtype.storm.LocalCluster;
  9. import backtype.storm.topology.TopologyBuilder;
  10. import backtype.storm.tuple.Fields;
  11. public class WordTopology {
  12. //定义常量
  13. private static final String WORD_SPOUT_ID = "word-spout";
  14. private static final String SPLIT_BOLT_ID = "split-bolt";
  15. private static final String COUNT_BOLT_ID = "count-bolt";
  16. private static final String REPORT_BOLT_ID = "report-bolt";
  17. private static final String WORD_TOPOLOGY_ID = "wordcount-topology";
  18. public static void main(String[] args) {
  19. //创建spout、bolt对象;
  20. WordSpout spout = new WordSpout();
  21. WordSplitBolt splitBolt = new WordSplitBolt();
  22. WordCountBolt countBolt = new WordCountBolt();
  23. WordReportBolt reportBolt = new WordReportBolt();
  24. //创建拓扑对象
  25. TopologyBuilder builder = new TopologyBuilder();
  26. //设置spout
  27. builder.setSpout(WORD_SPOUT_ID, spout);
  28. //WordSpot -- > SplitBolt。使用随机分组
  29. builder.setBolt(SPLIT_BOLT_ID, splitBolt,5).shuffleGrouping(WORD_SPOUT_ID);
  30. //WordSplitBolt --> WordCountBolt。使用字段分组
  31. builder.setBolt(COUNT_BOLT_ID, countBolt,5).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
  32. //WordCountBolt --> WordReportBollt
  33. builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
  34. //创建配置
  35. Config config = new Config();
  36. // config.setNumWorkers(2);
  37. config.setDebug(true);
  38. //本地模式
  39. LocalCluster cluster = new LocalCluster();
  40. //提交拓扑
  41. cluster.submitTopology(WORD_TOPOLOGY_ID, config, builder.createTopology());
  42. ThreadUtils.waitForSeconds(10);
  43. cluster.killTopology(WORD_TOPOLOGY_ID);
  44. cluster.shutdown();
  45. }
  46. }

结果:

 

 

今天的作业终于做完了T~T

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/337608
推荐阅读
相关标签
  

闽ICP备14008679号