赞
踩
以下内容均整理自网络
参考这个:https://blog.csdn.net/yongge1981/article/details/79260011
任务: 实时统计移动用户在每个小区的掉话率。
项目整体架构如下:
启动Zookeeper集群 zkServer.sh start 启动Hbase(完全分布式需要先启动Hadoop集群) 启动Kafka集群(是通过发送数据到kafka) bin/kafka-server-start.sh config/server.properties 启动storm ##node1上启动Nimbus $ storm nimbus >> ./logs/nimbus.out 2>&1 & ##后台启动,记得在storm主目录 ##启动界面 $ storm ui >> ./logs/ui.out 2>&1 & 节点node2和node3启动supervisor,按照配置,每启动一个supervisor就有了4个slots $ storm supervisor >> ./logs/supervisor.out 2>&1 &
start-hbase.sh
hbase shell
create 'cell_monitor_table','cf'
A、cmccstormjk02项目
a) 修改cmcc.constant. Constants中对应各配置项。如:HBASE_ZOOKEEPER_LIST、KAFKA_ZOOKEEPER_LIST、BROKER_LIST、ZOOKEEPERS
B、cmcc02_hbase项目
a) 修改cmcc.hbase.dao.impl.HBaseDAOImp中Hbase的Zookeeper集群配置。
在windows上运行kafka.productor. CellProducer,随机生成模拟数据,并将数据写入到Kafka中。
期间出现的错误:无法发送数据到对应主题。原因是这个java文件没有建立主机IP与别名的映射,所以无法访问。可以通过修改hosts文件,添加映射解决(C:\Windows\System32\drivers\etc)。
可查看Kafka中topic列表:
bin/kafka-topics.sh --zookeeper hadoop01:2181, hadoop02:2181, hadoop03:2181 –list
查看数据:
bin/kafka-console-consumer.sh --zookeeper hadoop01:2181, hadoop02:2181, hadoop03:2181 --topic mylog_cmcc
如果程序正确,则在hadoop虚拟机上可以查看到对应的消费数据:
在windoes上运行topo. KafkaOneCellMonintorTopology,从Kafka中读取数据,Storm分析之后,结果写入Hbase中存放。
此时,若已经产生“掉话”情况,分析结果在Hbase当中可以查看到:
将cmcc02_hbase添加到Tomcat中,启动Tomcat服务,浏览器通过访问以下地址查看:
http://localhost:8080/cmcc/onecellmonitor.jsp
生产数据并发送到kafka的主题下:
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package kafka.productor; import java.util.Properties; import java.util.Random; import backtype.storm.utils.Utils; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import tools.DateFmt; /*** * 模拟发送数据到kafka中 * * @author hadoop * */ public class CellProducer extends Thread { //extends Thread 按照线程执行,这里实际就一条线程,在调用类的对象在执行start()方法时会自动运行run方法。 //多线程简单例子: https://www.cnblogs.com/yjd_hycf_space/p/7526608.html // bin/kafka-topics.sh --create --zookeeper localhost:2181 // --replication-factor 3 --partitions 5 --topic cmcccdr private final kafka.javaapi.producer.Producer<Integer, String> producer; private final String topic; private final Properties props = new Properties(); //final修饰的关键字必须在成员变量中进行初始化,或直接初始化,且只能赋值一次 // 通过使用它来定义变量,这样不会在后面被修改了 //构造函数读取配置信息,并和kafka连接起来 public CellProducer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder");// 字符串消息 props.put("metadata.broker.list", KafkaProperties.broker_list); //读取集群节点信息 //这个就和kafka连接了? producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } //run方法发送数据到kafka主题下 public void run() { Random random = new Random(); //小区编号 String[] cell_num = { "29448-37062", "29448-51331", "29448-51331", "29448-51333", "29448-51343" }; // 正常0; 掉话1(信号断断续续); 断话2(完全断开) String[] drop_num = { "0", "1", "2" }; int i = 0; while (true) { i++; String testStr = String.format("%06d", random.nextInt(10) + 1); //1-10 10位数据 // messageStr: 2494 29448-000003 2016-01-05 10:25:17 1 // String messageStr = i + "\t" + ("29448-" + testStr) + "\t" + DateFmt.getCountDate(null, DateFmt.date_long) + "\t" + drop_num[random.nextInt(drop_num.length)]; System.out.println("product:" + messageStr); //将当前数据推送到kafka对应主题上的数据 //kafka生产数据并发送:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example producer.send(new KeyedMessage<Integer, String>(topic, messageStr));//不知道为什么命名是整形,值是String都行 Utils.sleep(1000); //推送一条休眠1000秒 // if(i == 500) { // break; // } //通过学习这个函数了解到,一些常用的参数可以新建一个类,通过赋值成员变量来实现。如KafkaProperties.broker_list //一些常见的方法也可以单独新建一个类,这样通过类名.方法实现(不实例化,通过静态成员函数实现)。如DateFmt.getCountDate() } } public static void main(String[] args) { // topic设置 CellProducer producerThread = new CellProducer(KafkaProperties.Cell_Topic); // 启动线程生成数据,会自动运行 run方法 producerThread.start(); } }
将kafka和storm连接并处理数据:
package topo; import java.util.ArrayList; import java.util.List; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import bolt.CellDaoltBolt; import bolt.CellFilterBolt; import cmcc.constant.Constants; import kafka.productor.KafkaProperties; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class KafkaOneCellMonintorTopology { /** * @param args */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); //通过这个包.类获取主机的名字 ZkHosts zkHosts = new ZkHosts(Constants.KAFKA_ZOOKEEPER_LIST); SpoutConfig spoutConfig = new SpoutConfig(zkHosts, "mylog_cmcc", //消费数据主题 "/MyKafka", // 偏移量offset的根目录 "MyTrack"); // 对应一个应用 List<String> zkServers = new ArrayList<String>(); System.out.println(zkHosts.brokerZkStr); for (String host : zkHosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers = zkServers; spoutConfig.zkPort = 2181; // 是否从头开始消费 spoutConfig.forceFromStart = false; spoutConfig.socketTimeoutMs = 60 * 1000; // String spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //spout去主题下接收数据 //spout整合kafka,可见官网配置详解 builder.setSpout("spout", new KafkaSpout(spoutConfig), 3); //bolt对数据进行处理://这个过滤类的作用是去掉通话编号,对时间进行格式化 builder.setBolt("cellBolt", new CellFilterBolt(), 3).shuffleGrouping("spout"); //这个bolt主要是按照小区统计电话挂话率 //"date", "cell_num", "drop_num" 时间,小区编号,掉话状态 builder.setBolt("CellDaoltBolt", new CellDaoltBolt(), 5) .fieldsGrouping("cellBolt", new Fields("cell_num")); Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(5); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { System.out.println("Local running"); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
上一个程序中的数据预处理(去掉记录编号和格式化时间):
package bolt; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import tools.DateFmt; public class CellFilterBolt implements IBasicBolt { /** * */ private static final long serialVersionUID = 1L; //这个过滤类的作用是去掉通话编号,对时间进行格式化 @Override public void execute(Tuple input, BasicOutputCollector collector) { String logString = input.getString(0); try { if (input != null) { //如果不为空,处理 String arr[] = logString.split("\\t");//按照制表符分割 // messageStr格式:消息编号\t小区编号\t时间\t状态 // 例: 2494 29448-000003 2016-01-05 10:25:17 1 // DateFmt.date_short是yyyy-MM-dd,把2016-01-05 10:25:17格式化2016-01-05 // 发出的数据格式: 时间, 小区编号, 掉话状态 //因为统计掉话率,记录编号不需要 collector.emit(new Values(DateFmt.getCountDate(arr[2], DateFmt.date_short), arr[1], arr[3])); } } catch (Exception e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date", "cell_num", "drop_num")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void prepare(Map map, TopologyContext arg1) { // TODO Auto-generated method stub } }
按照小区统计电话挂话率:
package bolt; import java.util.Calendar; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import cmcc.hbase.dao.HBaseDAO; import cmcc.hbase.dao.impl.HBaseDAOImp; import tools.DateFmt; public class CellDaoltBolt implements IBasicBolt { private static final long serialVersionUID = 1L; //序列化ID HBaseDAO dao = null; //这个表示马上就到处理完毕存储在HBASE了 long beginTime = System.currentTimeMillis(); //获取当前系统时间,毫秒(时间戳) long endTime = 0; // 通话总数 Map<String, Long> cellCountMap = new HashMap<String, Long>(); // 掉话数 Map<String, Long> cellDropCountMap = new HashMap<String, Long>(); String todayStr = null; // excute才是每来一条数据处理一次,可以写一个案列来测试一下 @Override public void execute(Tuple input, BasicOutputCollector collector) { // input为2016-01-05,29448-000003,1 if (input != null) { //时间 小区号 掉话状态 按行得到通话记录 String dateStr = input.getString(0); String cellNum = input.getString(1); String dropNum = input.getString(2); // 判断是否是当天,不是当天 就清除map 避免内存过大 // 基站数目 大概5-10万(北京市) // http://bbs.c114.net/thread-793707-1-1.html //获取当前时间,因为null todayStr = DateFmt.getCountDate(null, DateFmt.date_short); // 跨天的处理,大于当天的数据来了,就清空两个map // 思考: 如果程序崩溃了,map清零了,如果不出问题,一直做同一个cellid的累加 // 这个逻辑不好,应该换成一个线程定期的清除map数据,而不是这里判断 //todayStr.compareTo(dateStr) < 0 表示未来的数据,清除 if (todayStr != dateStr && todayStr.compareTo(dateStr) < 0) { cellCountMap.clear(); cellDropCountMap.clear(); } // 当前cellid的通话数统计 Long cellAll = cellCountMap.get(cellNum);//拿出次数,方便叠加 if (cellAll == null) { cellAll = 0L; } //小区数+出现总次数 cellCountMap.put(cellNum, ++cellAll); // 掉话数统计,大于0就是掉话, 做法 和上面一样 Long cellDropAll = cellDropCountMap.get(cellNum); int t = Integer.parseInt(dropNum); //挂话状态,解析成整数,0,1,2 if (t > 0) { if (cellDropAll == null) { cellDropAll = 0L; } //小区数+非正常挂话总次数 cellDropCountMap.put(cellNum, ++cellDropAll); } // 1.定时写库.为了防止写库过于频繁 这里间隔一段时间写一次 // 2.也可以检测map里面数据size 写数据到 hbase // 3.自己可以设计一些思路 ,当然 采用redis 也不错 // 4.采用tick定时存储也是一个思路 endTime = System.currentTimeMillis(); //是运行到这里的此时时刻 // flume+kafka 集成 // 当前掉话数 // 1.每小时掉话数目 // 2.每小时 通话数据 // 3.每小时 掉话率 // 4.昨天的历史轨迹 // 5.同比去年今天的轨迹(如果有数据) // hbase 按列存储的数据() // 10万 // rowkey cellnum+ day //beginTime 最开始的值是第一次运行这个类的系统时间。 if (endTime - beginTime >= 5000) { // 5s 写一次库,防止频繁写数据库 if (cellCountMap.size() > 0 && cellDropCountMap.size() > 0) { // x轴,相对于小时的偏移量,格式为 时:分,数值 数值是时间的偏移 String arr[] = this.getAxsi(); // 当前日期 String today = DateFmt.getCountDate(null, DateFmt.date_short); // 当前分钟 String today_minute = DateFmt.getCountDate(null, DateFmt.date_minute); // cellCountMap为通话数据的map Set<String> keys = cellCountMap.keySet();//所有小区编号 for (Iterator iterator = keys.iterator(); iterator.hasNext();) { String key_cellnum = (String) iterator.next(); //该时刻的小区通话总数,掉话数 打印测试 System.out.println("key_cellnum: " + key_cellnum + "***" + arr[0] + "---" + arr[1] + "---" + cellCountMap.get(key_cellnum) + "----" + cellDropCountMap.get(key_cellnum)); //写入HBase数据,样例: {time_title:"10:45",xAxis:10.759722222222223,call_num:140,call_drop_num:91} //按照HBASE的格式将数据存起来插入HBASE(见insert方法实HBASEDAOImp现),可以学习下HBASE的安装,java基本使用。 // HBASE: 表名 主键ID 数据的列族 内容 //tableName,rowKey, family, quailifer[], value[] dao.insert("cell_monitor_table", key_cellnum + "_" + today, "cf", new String[] { today_minute }, new String[] { "{" + "time_title:\"" + arr[0] + "\",xAxis:" + arr[1] + ",call_num:" + cellCountMap.get(key_cellnum) + ",call_drop_num:" + cellDropCountMap.get(key_cellnum) + "}" } ); } } // 需要重置初始时间,这样才能实现每5秒记录一次 beginTime = System.currentTimeMillis(); } } } @Override public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub dao = new HBaseDAOImp(); Calendar calendar = Calendar.getInstance(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } // 获取X坐标,就是当前时间的坐标,小时是单位 public String[] getAxsi() { // 取当前时间 Calendar c = Calendar.getInstance(); int hour = c.get(Calendar.HOUR_OF_DAY); int minute = c.get(Calendar.MINUTE); int sec = c.get(Calendar.SECOND); // 总秒数 int curSecNum = hour * 3600 + minute * 60 + sec; // (12*3600+30*60+0)/3600=12.5 Double xValue = (double) curSecNum / 3600; // 时:分,数值 数值是时间的偏移 // 时间:时分显示 + 时间的偏移,按照小时为密度偏移,这样才能好做x轴。因为一般都是按照多少小时叠加。 String[] end = { hour + ":" + minute, xValue.toString() }; return end; } @Override public void cleanup() { } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。