赞
踩
目录
文章中的所有内容不明白的可以查看前后文或者call博主;
相关文章:
Storm集群安装部署2——Centos6.5的默认python2.6.6版本升级到python2.7.15
Storm集群安装部署3——在节点服务器上安装 Storm并启动Storm
Storm集群安装部署4——在Storm节点服务器上安装 nimBus和supervisor
创建Maven项目如下:
我会尽量吧项目的注释写清楚一些,不懂的就问博主[/微笑]
WordSpout:
- package com.swun.storm.topology;
-
- import com.swun.storm.Utils.ThreadUtils;
- import com.swun.storm.bolt.WordCountBolt;
- import com.swun.storm.bolt.WordReportBolt;
- import com.swun.storm.bolt.WordSplitBolt;
- import com.swun.storm.spout.WordSpout;
-
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.tuple.Fields;
-
- public class WordTopology {
-
- //定义常量
- private static final String WORD_SPOUT_ID = "word-spout";
- private static final String SPLIT_BOLT_ID = "split-bolt";
- private static final String COUNT_BOLT_ID = "count-bolt";
- private static final String REPORT_BOLT_ID = "report-bolt";
- private static final String WORD_TOPOLOGY_ID = "wordcount-topology";
-
-
- public static void main(String[] args) {
- //创建spout、bolt对象;
- WordSpout spout = new WordSpout();
- WordSplitBolt splitBolt = new WordSplitBolt();
- WordCountBolt countBolt = new WordCountBolt();
- WordReportBolt reportBolt = new WordReportBolt();
- //创建拓扑对象
-
- TopologyBuilder builder = new TopologyBuilder();
- //设置spout
- builder.setSpout(WORD_SPOUT_ID, spout);
- //WordSpot -- > SplitBolt。使用随机分组
- builder.setBolt(SPLIT_BOLT_ID, splitBolt,5).shuffleGrouping(WORD_SPOUT_ID);
- //WordSplitBolt --> WordCountBolt。使用字段分组
- builder.setBolt(COUNT_BOLT_ID, countBolt,5).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
- //WordCountBolt --> WordReportBollt
- builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
-
- //创建配置
- Config config = new Config();
- // config.setNumWorkers(2);
- config.setDebug(true);
- //本地模式
-
- LocalCluster cluster = new LocalCluster();
- //提交拓扑
- cluster.submitTopology(WORD_TOPOLOGY_ID, config, builder.createTopology());
- ThreadUtils.waitForSeconds(10);
- cluster.killTopology(WORD_TOPOLOGY_ID);
- cluster.shutdown();
-
- }
-
- }

WordSpiltBolt:
- package com.swun.storm.bolt;
-
- import java.util.Map;
-
- import com.sun.org.apache.bcel.internal.classfile.Field;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.IRichBolt;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
-
- public class WordSplitBolt implements IRichBolt{
-
-
-
- /**
- *
- */
- private static final long serialVersionUID = 7509915842346596564L;
-
- private OutputCollector collector;
-
-
- /**
- * 初始化collector
- */
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
-
- /**
- * 分割句子
- */
- @Override
- public void execute(Tuple input) {
-
- //接收字段名为sentence的数据
- String sentence = input.getStringByField("sentencs");
-
- //分割数据,放到字符串数组中
- String[] words = sentence.split(" ");
-
- //遍历一遍数组,将分割后的数据上传
- for(String word : words) collector.emit(new Values(word));
-
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- //声明发送数据的字段名为word
- declarer.declare(new Fields("word"));
-
- }
-
- @Override
- public void cleanup() {
-
- }
-
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- }

WordCountBolt:
- package com.swun.storm.bolt;
-
- import java.util.HashMap;
- import java.util.Map;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.IRichBolt;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
-
- public class WordCountBolt implements IRichBolt{
-
- /**
- *
- */
- private static final long serialVersionUID = 2503069329578140148L;
- private OutputCollector collector;
- //用Map来统计分割后的词语
- private Map<String, Long> counts;
-
-
- /**
- * 初始化
- */
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- this.counts = new HashMap<String, Long>();
- }
-
- /**
- * 统计数据的频率
- */
- @Override
- public void execute(Tuple input) {
- //接收字段名为word的数据
- String wordString = input.getStringByField("word");
-
- //去map中查询数据的频率
- Long numLong = counts.get(wordString);
-
- //如果数据从来没有出现过,就初始化数据的次数为0
- if(numLong == null) numLong = 0L;
-
- //词语出现的次数+1
- numLong++;
-
- //把结果重新放入map中,方便别的bolt进程异步更改数据
- counts.put(wordString, numLong);
-
- //将更新的数据放入容器中
- collector.emit(new Values(wordString,numLong));
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- //声明发送数据的字段名为word,和count。这里是提交了两个字段
- declarer.declare(new Fields("word","count"));
- }
-
- @Override
- public void cleanup() {
-
- }
-
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- }

WordReportBolt:
- package com.swun.storm.bolt;
-
- import java.util.HashMap;
- import java.util.Map;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.IRichBolt;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.tuple.Tuple;
-
- public class WordReportBolt implements IRichBolt{
-
- /**
- *
- */
- private static final long serialVersionUID = -778078375282102637L;
-
- private OutputCollector collector;
-
-
- //保存最终的结果,缓存效果
- private Map<String, Long> countsNum;
-
- /**
- * 初始化
- */
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- this.countsNum = new HashMap<String, Long>();
- }
-
- @Override
- public void execute(Tuple input) {
- //接收字段名为word和count的数据。
- String word = input.getStringByField("word");
- Long count = input.getLongByField("count");
-
- //将数据统一放进countNum这个map中
- countsNum.put(word,count);
-
- //这里为什么要连个map来统计词频呢?
- //CountBolt中的counts是用来试试统计每次接收词语后的数据,每个bolt有多个进程,counts是共享数据,是实时变化的
- //countNum是用来接汇总的。汇总不同词语的总数
-
- }
-
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
-
- /**
- * 在bolt停止的时候执行该方法
- */
- @Override
- public void cleanup() {
-
- //打印结果
- System.out.println("-----------------FINAL COUNTS-----------------");
- for(String key : countsNum.keySet()){
- System.out.println(key + ":" + countsNum.get(key));
- }
- System.out.println("----------------------------------------------");
- }
-
-
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
- }

WordTopology:
- package com.swun.storm.topology;
-
- import com.swun.storm.Utils.ThreadUtils;
- import com.swun.storm.bolt.WordCountBolt;
- import com.swun.storm.bolt.WordReportBolt;
- import com.swun.storm.bolt.WordSplitBolt;
- import com.swun.storm.spout.WordSpout;
-
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.tuple.Fields;
-
- public class WordTopology {
-
- //定义常量
- private static final String WORD_SPOUT_ID = "word-spout";
- private static final String SPLIT_BOLT_ID = "split-bolt";
- private static final String COUNT_BOLT_ID = "count-bolt";
- private static final String REPORT_BOLT_ID = "report-bolt";
- private static final String WORD_TOPOLOGY_ID = "wordcount-topology";
-
-
- public static void main(String[] args) {
- //创建spout、bolt对象;
- WordSpout spout = new WordSpout();
- WordSplitBolt splitBolt = new WordSplitBolt();
- WordCountBolt countBolt = new WordCountBolt();
- WordReportBolt reportBolt = new WordReportBolt();
- //创建拓扑对象
-
- TopologyBuilder builder = new TopologyBuilder();
- //设置spout
- builder.setSpout(WORD_SPOUT_ID, spout);
- //WordSpot -- > SplitBolt。使用随机分组
- builder.setBolt(SPLIT_BOLT_ID, splitBolt,5).shuffleGrouping(WORD_SPOUT_ID);
- //WordSplitBolt --> WordCountBolt。使用字段分组
- builder.setBolt(COUNT_BOLT_ID, countBolt,5).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
- //WordCountBolt --> WordReportBollt
- builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
-
- //创建配置
- Config config = new Config();
- // config.setNumWorkers(2);
- config.setDebug(true);
- //本地模式
-
- LocalCluster cluster = new LocalCluster();
- //提交拓扑
- cluster.submitTopology(WORD_TOPOLOGY_ID, config, builder.createTopology());
- ThreadUtils.waitForSeconds(10);
- cluster.killTopology(WORD_TOPOLOGY_ID);
- cluster.shutdown();
-
- }
-
- }

今天的作业终于做完了T~T
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。