赞
踩
public static class DataSourceSpout extends BaseRichSpout {
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
}
@Override
public void nextTuple() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
在这里我们使用一个commons-io,可以再pom中添加依赖
</dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
发送数据方法:
collector.emit()
参数:1.list<object>
2.new values
@Override public void nextTuple() { //使用FileUtils.listFiles方法获得一个可以迭代的files.参数: //1.new File(路径) //2.new String[]{后缀名} //3.是否递归 Collection<File> files = FileUtils.listFiles(new File("/home/hadoop/shell"), new String[]{"txt"}, false); for (File file : files) { try { //使用FileUtils.readLines将读进来的文件转成一行一行的字符串数组 List<String> lines = FileUtils.readLines(file); for (String line : lines) { this.collector.emit(new Values(line)); } FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis())); } catch (IOException e) { e.printStackTrace(); } } }
通过declarer.declare()方法声明,参数:
new Fields(),对应于从nextTuple发送过来的数据
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
public static class SplitBolt extends BaseRichBolt{ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
通过input.getIntegerByField(名称);进行接收数据,名称对应上面声明输出字段名
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
this.collector.emit(new Values(word));
}
}
通过declarer.declare()方法声明,参数:
new Fields(),对应于从nextTuple发送过来的数据
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public static class SplitBolt extends BaseRichBolt{ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { }
需要注意的是,我们不需要再次对数据进行发送,所以无需定义SpoutOutputCollector
Map<String,Integer> map = new HashMap<>(); @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer i = map.get(word); if (i == null){ i = 0; } i++; map.put(word,i); System.out.println("*******************"); for (Map.Entry<String, Integer> e : map.entrySet()) { System.out.println(e.getKey()+" : "+e.getValue()); } }
我们这里使用本地模式进行测试
官网链接
首先需要创建一个LocalCluster()
然后需要创建一个 TopologyBuilder()
然后通过builder.setSpout和builder.setBolt方法分别设置Spout和Bolt
在setBolt时需要使用.shuffleGrouping()方法指定上一个作业名称
通过cluster.submitTopology提交作业
storm中任何作业都是用TopologyBuilder进行提交的
Topology需要制定Spout和Bolt的执行顺序
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("LocalWCStormTop",new DataSourceSpout());
builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("LocalWCStormTop");
builder.setBolt("CountWords",new CountWords()).shuffleGrouping("SplitBolt");
LocalCluster cluster = new LocalCluster();
//提交作业,参数:
//1.AppName
//2.new Config()
//builder.createTopology() 创建
cluster.submitTopology("LocalWCStormTop",new Config(),builder.createTopology());
}
运行成功
先丢个官网链接
这边我们只需要稍微修改main里的代码
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("LocalWCStormTop",new DataSourceSpout());
builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("LocalWCStormTop");
builder.setBolt("CountWords",new CountWords()).shuffleGrouping("SplitBolt");
String name = LocalWCStormTop2.class.getSimpleName();
//需要三个参数:
//1.AppName
//new一个config就行了
//使用builder.createTopology()
StormSubmitter.submitTopology(name,new Config(),builder.createTopology());
}
然后打包,准备在storm集群上运行
怎么运行先丢个官网链接
我们使用jar包运行方式,官网描述:
storm jar topology-jar-path class args
修改后:
./bin/storm jar storm-1.0.jar com.zwb.LocalWCStormTop2
发布成功
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。