当前位置:   article > 正文

Storm案例:WordCount_头歌storm 的 wordcount 案例

头歌storm 的 wordcount 案例

Storm案例:WordCount

B.WordCount

1. 首先定义一个类继承BaseRichSpout,需要实现其方法,在这个类你需要产生数据并发送出去

public static class DataSourceSpout extends BaseRichSpout {
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        }

        @Override
        public void nextTuple() {
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.实现第一个open初始化方法,因为涉及发送,所以要定义一个collector

		private SpoutOutputCollector collector;
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
  • 1
  • 2
  • 3
  • 4
  • 5

3.实现第二个nextTuple方法,获取数据

在这里我们使用一个commons-io,可以再pom中添加依赖

</dependency>
  <groupId>commons-io</groupId>
  <artifactId>commons-io</artifactId>
  <version>2.4</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

发送数据方法:
collector.emit()
参数:1.list<object>
2.new values

		@Override
        public void nextTuple() {
        	//使用FileUtils.listFiles方法获得一个可以迭代的files.参数:
        	//1.new File(路径)
        	//2.new String[]{后缀名}
        	//3.是否递归
            Collection<File> files = FileUtils.listFiles(new File("/home/hadoop/shell"), new String[]{"txt"}, false);
            for (File file : files) {
                try { 
                	//使用FileUtils.readLines将读进来的文件转成一行一行的字符串数组
                    List<String> lines = FileUtils.readLines(file);
                    for (String line : lines) {
                        this.collector.emit(new Values(line));
                    }

                    FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

4.实现declareOutputFields方法:声明输出字段

通过declarer.declare()方法声明,参数:
new Fields(),对应于从nextTuple发送过来的数据

@Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
  • 1
  • 2
  • 3
  • 4

5.定义一个类继承BaseRichBolt,需要实现其方法,在这个类中需要对数据进行接收并进行split操作

public static class SplitBolt extends BaseRichBolt{

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        @Override
        public void execute(Tuple input) {
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

6.实现第一个prepare初始化方法,因为涉及发送,所以要定义一个collector

		private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
  • 1
  • 2
  • 3
  • 4
  • 5

7.实现execute,对数据进行接收和split操作

通过input.getIntegerByField(名称);进行接收数据,名称对应上面声明输出字段名

		@Override
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split("\t");
            for (String word : words) {
                this.collector.emit(new Values(word));
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

8.实现declareOutputFields方法:声明输出字段

通过declarer.declare()方法声明,参数:
new Fields(),对应于从nextTuple发送过来的数据

@Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
  • 1
  • 2
  • 3
  • 4

9.定义一个类继承BaseRichBolt,需要实现其方法,在这个类中需要对数据进行接收并进行Count操作

public static class SplitBolt extends BaseRichBolt{

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        @Override
        public void execute(Tuple input) {
        }
        
		@Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

需要注意的是,我们不需要再次对数据进行发送,所以无需定义SpoutOutputCollector

10.实现execute,对数据进行接收和Count操作

		Map<String,Integer> map = new HashMap<>();
        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            Integer i = map.get(word);
            if (i == null){
                i = 0;
            }
            i++;

            map.put(word,i);
            System.out.println("*******************");
            for (Map.Entry<String, Integer> e : map.entrySet()) {
                System.out.println(e.getKey()+" : "+e.getValue());
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

11.编写main方法进行测试

我们这里使用本地模式进行测试

官网链接
首先需要创建一个LocalCluster()
然后需要创建一个 TopologyBuilder()
然后通过builder.setSpout和builder.setBolt方法分别设置Spout和Bolt
在setBolt时需要使用.shuffleGrouping()方法指定上一个作业名称
通过cluster.submitTopology提交作业

storm中任何作业都是用TopologyBuilder进行提交的
Topology需要制定Spout和Bolt的执行顺序

public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("LocalWCStormTop",new DataSourceSpout());
        builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("LocalWCStormTop");
        builder.setBolt("CountWords",new CountWords()).shuffleGrouping("SplitBolt");

        LocalCluster cluster = new LocalCluster();
        //提交作业,参数:
        //1.AppName
        //2.new Config()
        //builder.createTopology()   创建
        cluster.submitTopology("LocalWCStormTop",new Config(),builder.createTopology());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

12.运行

在这里插入图片描述
运行成功

13.集群运行

先丢个官网链接
这边我们只需要稍微修改main里的代码

public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("LocalWCStormTop",new DataSourceSpout());
        builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("LocalWCStormTop");
        builder.setBolt("CountWords",new CountWords()).shuffleGrouping("SplitBolt");

        String name = LocalWCStormTop2.class.getSimpleName();
        //需要三个参数:
        //1.AppName
        //new一个config就行了
        //使用builder.createTopology()
        StormSubmitter.submitTopology(name,new Config(),builder.createTopology());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

然后打包,准备在storm集群上运行
怎么运行先丢个官网链接
我们使用jar包运行方式,官网描述:

storm jar topology-jar-path class args
  • 1

修改后:

./bin/storm jar storm-1.0.jar com.zwb.LocalWCStormTop2
  • 1

发布成功

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/701079
推荐阅读
相关标签
  

闽ICP备14008679号