当前位置:   article > 正文

电话日志分析callLog(一)_移动电话日志分析

移动电话日志分析
  1. 一、项目简介
  2. ----------------------------------------------
  3. 1.hadoop+hbase+flume+zookeeper实现电信级海量通话日志数据的存储,随机
  4. 访问与实时读写。通过hash技术对rowkey进行分析处理,解决hbase的热点问题,协同
  5. coprocessor,解决系统的高吞吐量和查询负载问题以及如何避免中间结果导致通知风暴或
  6. 死递归问题,让同学们体验到大数据技术在企业中实战应用
  7. 2.整体架构分析
  8. hadoop体系架构与ha配置方案。
  9. hbase体系架构与ha配置方案
  10. flume实时收集架构方案。
  11. SSM实现前端web实现以及与后端HBase的交互架构方案。
  12. hive+oozie实现的周期任务调度。
  13. Spark streaming实现窗口化敏感词实时监控方案。
  14. 3.hbase中callLogs表的设计与实现。
  15. 通话信息的内容分析与常用场景分析以及对rowkey的设计与实现。
  16. 重点讲解盐析的原理与热点问题的解决。rowkey的设计原则与实战
  17. 中的技巧。
  18. 4.协处理原理与应用实战讲解。
  19. 被叫通话记录的设计思想讲解,以及通过协处理器方式实现callog日志主叫记录
  20. 被主换位与同步写入。在callog是表中数据的存储序列与双向查询方方式的一致
  21. 性透明结果处理。
  22. 5.Hadoop以及HBase的HA集群配置与实战。
  23. hadoop的使用QJM的高可用架构配置讲解,ResourceManager的高可用架构配置讲解。
  24. zookeeper的工作原理以及配置、实操演练,hbase与Hadoop HA集成注意事项以及客户端
  25. API编程细节处理。
  26. 二、创建新工程
  27. ------------------------------------------------
  28. 1.创建新工程 -- CallLogSystem
  29. 三、创建模拟日志生成程序模块CallLogGenModel
  30. -------------------------------------------------
  31. 1.创建模块 -- CallLogGenModel,添加Maven支持
  32. 2.创建类calllog.gen.main.App.class
  33. 3.编写App类
  34. ---------------------------------------
  1. package calllog.gen.main;
  2. import java.io.FileWriter;
  3. import java.io.IOException;
  4. import java.text.DecimalFormat;
  5. import java.text.SimpleDateFormat;
  6. import java.util.*;
  7. public class App {
  8. //电话簿
  9. public static Map<String, String> callers = new HashMap<String, String>();
  10. //电话号码
  11. public static List<String> phoneNumbers = new ArrayList<String>();
  12. static{
  13. callers.put("15811111111", "史让");
  14. callers.put("18022222222", "赵嗄");
  15. callers.put("15133333333", "张锕 ");
  16. callers.put("13269364444", "王以");
  17. callers.put("15032295555", "张噢");
  18. callers.put("17731086666", "张类");
  19. callers.put("15338597777", "李平");
  20. callers.put("15733218888", "杜跑");
  21. callers.put("15614209999", "任阳");
  22. callers.put("15778421111", "梁鹏");
  23. callers.put("18641241111", "郭彤");
  24. callers.put("15732641111", "刘飞");
  25. callers.put("13341101111", "段星");
  26. callers.put("13560191111", "唐华");
  27. callers.put("18301581111", "杨谋");
  28. callers.put("13520401111", "温英");
  29. callers.put("18332561111", "朱宽");
  30. callers.put("18620191111", "刘宗");
  31. phoneNumbers.addAll(callers.keySet());
  32. }
  33. public static void main(String [] args)
  34. {
  35. if(args == null || args.length == 0)
  36. {
  37. System.out.println("no args");
  38. System.exit(-1);
  39. }
  40. genCallLog(args[0]);
  41. }
  42. /**
  43. * 生成通话日志
  44. */
  45. private static void genCallLog(String logFilePath) {
  46. try {
  47. //文件写入器
  48. FileWriter fw = new FileWriter(logFilePath, true);
  49. Random random = new Random();
  50. while (true) {
  51. //主叫
  52. String caller = phoneNumbers.get(random.nextInt(callers.size()));
  53. String callerName = callers.get(caller);
  54. //被叫 (!= 主叫)
  55. String callee = phoneNumbers.get(random.nextInt(callers.size()));
  56. while (callee.equals(caller)) {
  57. callee = phoneNumbers.get(random.nextInt(callers.size()));
  58. }
  59. String calleeName = callers.get(callee);
  60. //通话时长(<10min)
  61. int duration = random.nextInt(60 * 10) + 1;
  62. DecimalFormat df = new DecimalFormat();
  63. df.applyPattern("000");
  64. String dur = df.format(duration);
  65. //通话时间timeStr
  66. int year = 2018;
  67. int month = random.nextInt(12);
  68. int day = random.nextInt(29) + 1;
  69. int hour = random.nextInt(24);
  70. int min = random.nextInt(60);
  71. int sec = random.nextInt(60);
  72. Calendar calendar = Calendar.getInstance();
  73. calendar.set(year,month,day,hour,min,sec);
  74. Date date = calendar.getTime();
  75. //如果时间超过今天就重新qushijian取时间.
  76. Date now = new Date();
  77. if (date.compareTo(now) > 0) {
  78. continue ;
  79. }
  80. SimpleDateFormat dfs = new SimpleDateFormat();
  81. dfs.applyPattern("yyyy/MM/dd HH:mm:ss");
  82. String timeStr = dfs.format(date);
  83. //通话日志
  84. //String callLog = caller + "," + callerName + "," + callee + "," + calleeName + "," + timeStr + "," + dur;
  85. String callLog = caller + "," + callee + "," + timeStr + "," + dur;
  86. fw.write(callLog+ "\r\n");
  87. fw.flush();
  88. Thread.sleep(200);
  89. }
  90. } catch (Exception e) {
  91. e.printStackTrace();
  92. }
  93. }
  94. }
  1. 4.打成jar包,扔到Linux上执行s100/s200
  2. b.ubuntu上创建目录
  3. $> mkdir /home/ubuntu/calllog
  4. a.执行命令
  5. cmd> java -cp CallLogGenModel-1.0-SNAPSHOT.jar calllog.gen.main.App d:\\calllog\\calllog.log
  6. $> java -cp /share/calllog/CallLogGenModel-1.0-SNAPSHOT.jar calllog.gen.main.App /home/ubuntu/calllog/calllog.log
  7. c.编写快捷脚本 ~/calllog/calllog.sh
  8. #!/bin/bash
  9. java -cp /share/calllog/CallLogGenModel-1.0-SNAPSHOT.jar calllog.gen.main.App /home/ubuntu/calllog/calllog.log
  10. d.修改calllog.sh权限
  11. $calllog> chmod 777 calllog.sh
  12. e.执行calllog.sh脚本
  13. $calllog> ./calllog.sh
  14. 四、启动s100 s200 的flume,开始实时收集日志calllog.log [s100 s200]
  15. ---------------------------------------------------------
  16. 1.编写flume配置文件[flume/conf/calllog.conf]
  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. a1.sources.r1.type=exec
  5. # -c +0 如果从头开始收集 -F:持续收集后续数据,否则进程停止。
  6. a1.sources.r1.command=tail -F -c +0 /home/ubuntu/calllog/calllog.log
  7. a1.channels.c1.type=memory
  8. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  9. a1.sinks.k1.kafka.topic = calllog
  10. a1.sinks.k1.kafka.bootstrap.servers = s200:9092 s300:9092 s400:9092
  11. a1.sinks.k1.kafka.flumeBatchSize = 20
  12. a1.sinks.k1.kafka.producer.acks = 1
  13. a1.sources.r1.channels = c1
  14. a1.sinks.k1.channel = c1
  1. 2.在s100和s200上启动flume,开始收集日志
  2. $s100> flume-ng agent -f /soft/flume/conf/calllog.conf -n a1 &
  3. $s200> flume-ng agent -f /soft/flume/conf/calllog.conf -n a1 &
  4. 五、启动kafka集群
  5. --------------------------------------------------
  6. 1.启动zk集群[s100 s200 s300]
  7. $> zkServer.sh start
  8. $> xcall.sh jps
  9. 2.启动kafka集群[s200 s300 s400]
  10. $> /soft/kafka/bin/kafka-server-start.sh -daemon /soft/kafka/config/server.properties
  11. $> netstat -ano | grep 9092
  12. 3.创建kafka主题
  13. $> kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 4 --topic calllog
  14. $> kafka-topics.sh --list --zookeeper s100:2181
  15. 4.在s300上开启kafka控制台消费者,消费flume收集的calllog主题,用于测试.
  16. $s300> kafka-console-consumer.sh --zookeeper s100:2181 --topic calllog
  17. 5.在s100和s200上 开启日志生成app,查看s300控制台输出情况
  18. $s100> ~/calllog/calllog.sh
  19. $s200> ~/calllog/calllog.sh
  20. 六、编写真正的kafka消费者HBase -- 从kafka提取消息,存放到hbase中
  21. ---------------------------------------------------------------------
  22. 1.启动hadoop集群[s100 s500 / s200 s300 s400], 完全分布式 + HA
  23. a.$s100> start-all.sh
  24. b.$s100> xcall.sh jps
  25. 6656 Jps
  26. 6353 ResourceManager
  27. 6261 DFSZKFailoverController
  28. 3317 QuorumPeerMain
  29. 5818 NameNode
  30. ----xcall : jps from s200 ----
  31. 6224 DataNode
  32. 6721 NodeManager
  33. 7025 Jps
  34. 6465 JournalNode
  35. 3847 QuorumPeerMain
  36. 4335 Kafka
  37. ----xcall : jps from s300 ----
  38. 6088 NodeManager
  39. 6409 Jps
  40. 4330 Kafka
  41. 5595 DataNode
  42. 5836 JournalNode
  43. 3612 QuorumPeerMain
  44. ----xcall : jps from s400 ----
  45. 4242 Kafka
  46. 5241 DataNode
  47. 5738 NodeManager
  48. 5482 JournalNode
  49. 6059 Jps
  50. ----xcall : jps from s500 ----
  51. 5317 Jps
  52. 5064 DFSZKFailoverController
  53. 4826 NameNode
  54. c.查看webui
  55. http://s100:50070
  56. 2.启动hbase集群[s100 s500/ s200 s300 s400]
  57. a.在s100上启动集群
  58. $s100>start-hbase.sh
  59. b.在s500上启动备份master节点
  60. $s500> hbase-daemon.sh start master
  61. c.查看webui
  62. http://s100:16010
  63. 3.创建hbase名字空间 + 表
  64. a.进入hbase终端
  65. $s100> hbase shell
  66. b.创建名字空间和表
  67. $s100> create_namespace 'call'
  68. $s100> create 'call:calllogs','f1'
  69. 4.编程实现 -- 创建kafka消费者,订阅calllog主题
  70. a.创建模块CalllogCustomerModel,添加maven支持
  71. b.添加maven依赖
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>groupId</groupId>
  7. <artifactId>CalllogCustomerModel</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>org.apache.kafka</groupId>
  12. <artifactId>kafka_2.11</artifactId>
  13. <version>0.10.0.1</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.hbase</groupId>
  17. <artifactId>hbase-client</artifactId>
  18. <version>1.2.4</version>
  19. </dependency>
  20. </dependencies>
  21. </project>
  1. c.创建包
  2. calllog.kafka.hbase.customer
  3. d.编写属性文件[resources/kafka.properties]
  4. zookeeper.connect=s100:2181,s200:2181,s300:2181
  5. group.id=calllog
  6. zookeeper.session.timeout.ms=500
  7. zookeeper.sync.time.ms=250
  8. auto.commit.interval.ms=1000
  9. #从头消费
  10. auto.offset.reset=smallest
  11. #主题
  12. topic=calllog
  13. #表名
  14. table.name=call:calllogs
  15. #分区数
  16. partition.number=100
  17. #主叫标记
  18. caller.flag=0
  19. #hash区域的模式
  20. hashcode.pattern=00
  21. e.拷贝hbase-site.xml配置文件到resources目录下
  1. <?xml version="1.0"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4. <!-- 使用完全分布式 -->
  5. <property>
  6. <name>hbase.cluster.distributed</name>
  7. <value>true</value>
  8. </property>
  9. <!-- 指定hbase数据在hdfs上的存放路径 -->
  10. <property>
  11. <name>hbase.rootdir</name>
  12. <value>hdfs://mycluster/hbase</value>
  13. </property>
  14. <!-- 配置zk地址 -->
  15. <property>
  16. <name>hbase.zookeeper.quorum</name>
  17. <value>192.168.43.131:2181,192.168.43.132:2181,192.168.43.133:2181</value>
  18. </property>
  19. <!-- zk的本地目录 -->
  20. <property>
  21. <name>hbase.zookeeper.property.dataDir</name>
  22. <value>/home/ubuntu/zookeeper</value>
  23. </property>
  24. </configuration>
  1. f.拷贝hdfs-site.xml到resources目录下
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4. <property>
  5. <name>dfs.replication</name>
  6. <value>3</value>
  7. </property>
  8. <property>
  9. <name>dfs.ha.automatic-failover.enabled</name>
  10. <value>true</value>
  11. </property>
  12. <property>
  13. <name>dfs.hosts</name>
  14. <value>/soft/hadoop/etc/dfs-hosts-include.conf</value>
  15. </property>
  16. <property>
  17. <name>dfs.hosts.exclude</name>
  18. <value>/soft/hadoop/etc/dfs-hosts-exclude.conf</value>
  19. </property>
  20. <!-- set ha -->
  21. <property>
  22. <name>dfs.nameservices</name>
  23. <value>mycluster</value>
  24. </property>
  25. <!-- myucluster下的名称节点两个id -->
  26. <!-- 注意:目前仅允许2个名称节点 -->
  27. <property>
  28. <name>dfs.ha.namenodes.mycluster</name>
  29. <value>nn1,nn2</value>
  30. </property>
  31. <property>
  32. <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  33. <value>s100:8020</value>
  34. </property>
  35. <property>
  36. <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  37. <value>s500:8020</value>
  38. </property>
  39. <property>
  40. <name>dfs.namenode.http-address.mycluster.nn1</name>
  41. <value>s100:50070</value>
  42. </property>
  43. <property>
  44. <name>dfs.namenode.http-address.mycluster.nn2</name>
  45. <value>s500:50070</value>
  46. </property>
  47. <property>
  48. <name>dfs.namenode.shared.edits.dir</name>
  49. <value>qjournal://s200:8485;s300:8485;s400:8485/mycluster</value>
  50. </property>
  51. <property>
  52. <name>dfs.client.failover.proxy.provider.mycluster</name>
  53. <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  54. </property>
  55. <property>
  56. <name>dfs.ha.fencing.methods</name>
  57. <value>
  58. sshfence
  59. shell(/bin/true)
  60. </value>
  61. </property>
  62. <property>
  63. <name>dfs.ha.fencing.ssh.private-key-files</name>
  64. <value>/home/ubuntu/.ssh/id_rsa</value>
  65. </property>
  66. <property>
  67. <name>dfs.journalnode.edits.dir</name>
  68. <value>/home/ubuntu/hadoop/journal</value>
  69. </property>
  70. <!-- set ha over -->
  71. </configuration>
  1. g.编写工具类PropertiesUtil -- 外部加载prop
  2. ---------------------------------------------------
  1. package calllog.kafka.hbase.customer;
  2. import java.io.IOException;
  3. import java.io.InputStream;
  4. import java.util.Properties;
  5. public class PropertiesUtil {
  6. public static Properties props;
  7. static {
  8. try {
  9. //外部加载属性文件props
  10. InputStream is = ClassLoader.getSystemResourceAsStream("kafka.properties");
  11. props = new Properties();
  12. props.load(is);
  13. is.close();
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. /**
  19. * 获取属性
  20. */
  21. public static String getPorp(String key)
  22. {
  23. return props.getProperty(key);
  24. }
  25. }
  1. h.编写类HbaseDao类 -- Hbase的访问数据对象,通过dao访问hbase
  2. 1)设rowkey:常用的主要指标,全部编写进来,而且要保证定长
  3. 区域号[00-99] , 1_id[主号码] , time , 标识[0/1 主叫/背叫] , 2_id[从属号码] , 时长
  4. 区域号[00-99] = (1_id[后四位] + time[yyyyMM]).hash() % 100[区域数]
  5. 2)代码
  6. --------------------------------------
  1. package calllog.kafka.hbase.customer;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import org.apache.hadoop.hbase.client.Put;
  8. import org.apache.hadoop.hbase.client.Table;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import java.io.IOException;
  11. import java.text.DecimalFormat;
  12. /**
  13. * hbase的数据访问对象
  14. */
  15. public class HbaseDao {
  16. private Table table = null;
  17. private DecimalFormat df = new DecimalFormat();
  18. //设计rowkey的分区标识
  19. private int partitions;
  20. //0 -- 主叫 1 -- 被叫
  21. private String flag;
  22. public HbaseDao() {
  23. try {
  24. //获取配置文件
  25. Configuration conf = HBaseConfiguration.create();
  26. //工厂类创建连接
  27. Connection conn = ConnectionFactory.createConnection(conf);
  28. //get table
  29. TableName tbName = TableName.valueOf(PropertiesUtil.getPorp("table.name"));
  30. table = conn.getTable(tbName);
  31. df.applyPattern(PropertiesUtil.getPorp("hashcode.pattern"));
  32. partitions = Integer.parseInt(PropertiesUtil.getPorp("partition.number"));
  33. flag = PropertiesUtil.getPorp("caller.flag");
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. /**
  39. * 向hbase中put数据
  40. */
  41. public void put(String log)
  42. {
  43. if(log == null || log.equals(""))
  44. {
  45. return;
  46. }
  47. try {
  48. //设计rowkey
  49. String rowKey = "";
  50. //解析日志
  51. String [] strs = log.split(",");
  52. if(strs != null && strs.length == 4)
  53. {
  54. String caller = strs[0];
  55. String callee = strs[1];
  56. String time = strs[2];
  57. String duration = strs[3];
  58. //计算区域号
  59. String hash = getRegionNumber(caller, time);
  60. rowKey = getRowkey(hash,caller,flag,time,callee,duration);
  61. //开始put
  62. Put p = new Put(Bytes.toBytes(rowKey));
  63. p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"),Bytes.toBytes(caller));
  64. p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"),Bytes.toBytes(callee));
  65. p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callTime"),Bytes.toBytes(time));
  66. p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callDuration"),Bytes.toBytes(duration));
  67. table.put(p);
  68. }
  69. } catch (IOException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. /**
  74. * 获取区域号码 -- Rowkey设计用
  75. * @return
  76. */
  77. public String getRegionNumber(String caller, String calltime)
  78. {
  79. //取得电话号码的后四位
  80. String last4Code = caller.substring(caller.length() - 4);
  81. //取得通话时间的年月
  82. String month = calltime.substring(0, 6);
  83. int hash = (Integer.parseInt(last4Code) ^ Integer.parseInt(month)) % partitions;
  84. return df.format(hash);
  85. }
  86. /**
  87. * 获取rowkey
  88. */
  89. public String getRowkey(String hash, String caller,String time,String flag, String callee,String dur)
  90. {
  91. return hash + "," + caller + "," + time + "," + flag + "," + callee + "," + dur;
  92. }
  93. }
  1. i.编写主类 -- HbaseCustomer
  2. ---------------------------------------------
  1. package calllog.kafka.hbase.customer;
  2. import kafka.consumer.Consumer;
  3. import kafka.consumer.ConsumerConfig;
  4. import kafka.message.MessageAndMetadata;
  5. import java.io.IOException;
  6. import java.io.InputStream;
  7. import java.util.HashMap;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.Properties;
  11. import kafka.consumer.ConsumerIterator;
  12. import kafka.consumer.KafkaStream;
  13. import kafka.javaapi.consumer.ConsumerConnector;
  14. import java.util.Properties;
  15. /**
  16. * hbase消费者,从kafka获取日志信息,存储到hbase中
  17. */
  18. public class HbaseCustomer {
  19. public static void main(String [] args)
  20. {
  21. //hbasedao
  22. HbaseDao dao = new HbaseDao();
  23. //创建消费者配置文件
  24. ConsumerConfig config = new ConsumerConfig(PropertiesUtil.props);
  25. //创建消费者
  26. ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(PropertiesUtil.props));
  27. //绑定主题
  28. String topic = PropertiesUtil.getPorp("topic");
  29. Map<String, Integer> map = new HashMap<String, Integer>();
  30. map.put(topic, new Integer(1));
  31. //开始消费
  32. Map<String, List<KafkaStream<byte[], byte[]>>> kafkaMsg = consumer.createMessageStreams(map);
  33. List<KafkaStream<byte[], byte[]>> msgList = kafkaMsg.get(topic);
  34. String kafka_hbaseMsg = "";
  35. for(KafkaStream<byte[],byte[]> msg : msgList)
  36. {
  37. ConsumerIterator<byte[],byte[]> mm = msg.iterator();
  38. while (mm.hasNext()) {
  39. MessageAndMetadata<byte[], byte[]> next = mm.next();
  40. byte [] m = next.message();
  41. //获取消息
  42. kafka_hbaseMsg = new String(m);
  43. //写入hbase
  44. dao.put(kafka_hbaseMsg);
  45. }
  46. }
  47. }
  48. }
  1. j.使用idea进行关联jar打包
  2. File --> project strructure --> artifacts --> ...
  3. l.在window上执行,测试
  4. cmd> java -cp CalllogCustomerModel.jar calllog.kafka.hbase.customer.HbaseCustomer
  5. m.将jar包放到共享文件夹下,在ubuntu上执行程序,查看程序是否正确执行
  6. $> java -cp CalllogCustomerModel.jar calllog.kafka.hbase.customer.HbaseCustomer
  7. 七、编写web程序,从hbase中提取数据,进行可视化展示
  8. --------------------------------------------------------
  9. 1.导入上次课程的SSM工程
  10. File --> project struct --> Models --> + --> ssm.imi --> meven '+' 添加pom.xml
  11. 2.进行一系列web设置:
  12. a. setting --> Application Server --> 添加tomcat服务器
  13. b. File --> project structure --> Artifacs --> + 添加ssmweb模块 --> +添加 右侧支持依赖的jar包和依赖的外部配置文件
  14. c. Run --> edit configuarations --> 添加tomcat local app --> deployment + --> 添加自己的web模块(ssm) --> 部署热更新
  15. 3.运行,输入网址 http://localhost:8080/user/findall?pn=1,测试程序
  16. 4.在domain包中添加calllog类
  17. ----------------------------------------------
  1. package com.it18zhang.ssm.domain;
  2. /**
  3. * calllog的domain类 -- 标准javabean
  4. */
  5. public class Calllog {
  6. private String caller;
  7. private String callee;
  8. private String callTime;
  9. private String callDuration;
  10. public String getCaller() {
  11. return caller;
  12. }
  13. public void setCaller(String caller) {
  14. this.caller = caller;
  15. }
  16. public String getCallee() {
  17. return callee;
  18. }
  19. public void setCallee(String callee) {
  20. this.callee = callee;
  21. }
  22. public String getCallTime() {
  23. return callTime;
  24. }
  25. public void setCallTime(String callTime) {
  26. this.callTime = callTime;
  27. }
  28. public String getCallDuration() {
  29. return callDuration;
  30. }
  31. public void setCallDuration(String callDuration) {
  32. this.callDuration = callDuration;
  33. }
  34. }
  1. 5.添加calllog service接口CalllogService.interface
  2. ------------------------------------------------------------
  1. package com.it18zhang.ssm.service;
  2. import Calllog;
  3. import java.util.List;
  4. /**
  5. * Calllog的服务类 -- 用于定制与服务器交互的规则
  6. */
  7. public interface CalllogService {
  8. //查询所有的calllog
  9. public List<Calllog> findAll();
  10. }
  1. 6.添加CalllogService的实现类CalllogServiceImpl,用于与hbase进行交互
  2. -------------------------------------------------------------------------
  3. a.准备必要的配置文件,拷贝[hbase-site.xml / hdfs-site.xml]到resouces目录下
  4. b.添加maven依赖
  5. <dependency>
  6. <groupId>org.apache.kafka</groupId>
  7. <artifactId>kafka_2.11</artifactId>
  8. <version>0.10.0.1</version>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.apache.hbase</groupId>
  12. <artifactId>hbase-client</artifactId>
  13. <version>1.2.4</version>
  14. </dependency>
  15. c.编写类CalllogServiceImpl
  16. ------------------------------------------
  1. package com.it18zhang.ssm.service.impl;
  2. import com.it18zhang.ssm.domain.Calllog;
  3. import com.it18zhang.ssm.service.CalllogService;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.TableName;
  7. import org.apache.hadoop.hbase.client.*;
  8. import org.apache.hadoop.hbase.util.Bytes;
  9. import org.springframework.stereotype.Service;
  10. import java.io.IOException;
  11. import java.util.*;
  12. /**
  13. * CalllogService的实现类
  14. */
  15. @Service("calllogService")
  16. public class CalllogServiceImpl implements CalllogService {
  17. private Table table;
  18. public CalllogServiceImpl()
  19. {
  20. try {
  21. //获取配置文件
  22. Configuration conf = HBaseConfiguration.create();
  23. //工厂类创建连接
  24. Connection conn = ConnectionFactory.createConnection(conf);
  25. //get table
  26. TableName tbName = TableName.valueOf("call:calllogs");
  27. table = conn.getTable(tbName);
  28. } catch (IOException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. /**
  33. * 查询所有的calllog
  34. * 全表扫描
  35. * @return
  36. */
  37. public List<Calllog> findAll() {
  38. List<Calllog> list = new ArrayList<Calllog>();
  39. try {
  40. //扫描
  41. Scan scan = new Scan();
  42. ResultScanner rs = table.getScanner(scan);
  43. Iterator<Result> it = rs.iterator();
  44. byte[] famliy = Bytes.toBytes("f1");
  45. byte[] callerf = Bytes.toBytes("caller");
  46. byte[] calleef = Bytes.toBytes("callee");
  47. byte[] callTimef = Bytes.toBytes("callTime");
  48. byte[] callDurationf = Bytes.toBytes("callDuration");
  49. Calllog calllog = null;
  50. while (it.hasNext()) {
  51. Result next = it.next();
  52. String caller = Bytes.toString(next.getValue(famliy, callerf));
  53. String callee = Bytes.toString(next.getValue(famliy, calleef));
  54. String callTime = Bytes.toString(next.getValue(famliy, callTimef));
  55. String callDuration = Bytes.toString(next.getValue(famliy, callDurationf));
  56. calllog = new Calllog();
  57. calllog.setCaller(caller);
  58. calllog.setCallee(callee);
  59. calllog.setCallTime(callTime);
  60. calllog.setCallDuration(callDuration);
  61. list.add(calllog);
  62. }
  63. } catch (Exception e) {
  64. e.printStackTrace();
  65. }
  66. return list;
  67. }
  68. }
  1. 7.添加CalllogContorller -- 用于web界面显示
  2. -------------------------------------------------------
  1. package com.it18zhang.ssm.web.controller;
  2. import com.it18zhang.ssm.domain.Calllog;
  3. import com.it18zhang.ssm.service.CalllogService;
  4. import com.it18zhang.ssm.service.impl.CalllogServiceImpl;
  5. import org.springframework.stereotype.Controller;
  6. import org.springframework.ui.Model;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import javax.annotation.Resource;
  9. import java.util.List;
  10. @Controller
  11. public class CalllogController {
  12. @Resource(name="calllogService")
  13. private CalllogService cs;
  14. @RequestMapping("calllog/findAll")
  15. public String findAll(Model model)
  16. {
  17. List<Calllog> list = cs.findAll();
  18. model.addAttribute("calllogs", list);
  19. return "calllog/calllogList";
  20. }
  21. }
        8.添加jsp页面calllog/calllogList.jsp     
  1. <%@ page contentType="text/html;charset=UTF-8" language="java" %>
  2. <%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
  3. <html>
  4. <head>
  5. <title>通话记录</title>
  6. <link rel="stylesheet" type="text/css" href="../css/my.css">
  7. </head>
  8. <body>
  9. <table id="t1" border="1px" class="t-1">
  10. <tr>
  11. <td>主叫</td>
  12. <td>被叫</td>
  13. <td>通话时间</td>
  14. <td>通话时长</td>
  15. </tr>
  16. <c:forEach items="${calllogs}" var="u">
  17. <tr>
  18. <td><c:out value="${u.caller}"/></td>
  19. <td><c:out value="${u.callee}"/></td>
  20. <td><c:out value="${u.callTime}"/></td>
  21. <td><c:out value="${u.callDuration}"/></td>
  22. </tr>
  23. </c:forEach>
  24. <tr>
  25. <td colspan="5" style="text-align: right">
  26. </td>
  27. </tr>
  28. </table>
  29. </body>
  30. </html>

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/301026
推荐阅读
相关标签
  

闽ICP备14008679号