当前位置:   article > 正文

大数据框架:(电信诈骗实时查询框架)

大数据框架:(电信诈骗实时查询框架)

框架图:

flume:一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 

kafka:一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决

storm:流计算

mysql:数据库:用于与web的链接。(目前涉及到客户端的都是利用数据库进行交互)

hadoop:分布式大数据框架

hive:数据仓库

spark:基于内存的分布式大数据框架

redis:键值数据库

利用maven 进行依赖项的自行加载,不需要我们手动导包。

看此篇文章部署:eclipse配置maven环境

利用eclipse进行开发。

关于电话诈骗建立的几个类

1.数据库的链接:JDBC

  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.sql.ResultSet;
  4. import java.sql.SQLException;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import org.apache.commons.dbutils.BasicRowProcessor;
  8. import org.apache.commons.dbutils.QueryRunner;
  9. import org.apache.commons.dbutils.handlers.ArrayListHandler;
  10. public final class MyDbUtils {// 拒绝继承
  11. private static String className = "com.mysql.jdbc.Driver";
  12. private static String url = "jdbc:mysql://192.168.115.130:3306/test?useUnicode=true&characterEncoding=utf-8";
  13. private static String user = "root";
  14. private static String password = "root";
  15. private static QueryRunner queryRunner = new QueryRunner();
  16. public static final String INSERT_LOG = "INSERT INTO LOG(topdomain,usetime,time) VALUES(?,?,?)";
  17. // 拒绝new一个实例
  18. private MyDbUtils() {
  19. };
  20. static {// 调用该类时既注册驱动
  21. try {
  22. Class.forName(className);
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. throw new RuntimeException();
  26. }
  27. }
  28. public static void main(String[] args) {
  29. }
  30. public static List<String> executeQuerySql(String sql) {
  31. List<String> result = new ArrayList<String>();
  32. try {
  33. List<Object[]> requstList = queryRunner.query(getConnection(), sql,
  34. new ArrayListHandler(new BasicRowProcessor() {
  35. @Override
  36. public <Object> List<Object> toBeanList(ResultSet rs,
  37. Class<Object> type) throws SQLException {
  38. return super.toBeanList(rs, type);
  39. }
  40. }));
  41. for (Object[] objects : requstList) {
  42. result.add(objects[0].toString());
  43. }
  44. } catch (SQLException e) {
  45. e.printStackTrace();
  46. }
  47. return result;
  48. }
  49. @SuppressWarnings("unused")
  50. public static void update(String sql, Object... params) {
  51. try {
  52. Connection connection = getConnection();
  53. queryRunner.update(connection, sql, params);
  54. connection.close();
  55. } catch (SQLException e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. // 获取连接
  60. public static Connection getConnection() throws SQLException {
  61. return DriverManager.getConnection(url, user, password);
  62. }
  63. }

2.storm的topology创建(相当于jobtrack的创建)

  1. import storm.kafka.BrokerHosts;
  2. import storm.kafka.KafkaSpout;
  3. import storm.kafka.SpoutConfig;
  4. import storm.kafka.ZkHosts;
  5. import backtype.storm.Config;
  6. import backtype.storm.LocalCluster;
  7. import backtype.storm.StormSubmitter;
  8. import backtype.storm.generated.AlreadyAliveException;
  9. import backtype.storm.generated.InvalidTopologyException;
  10. import backtype.storm.generated.StormTopology;
  11. import backtype.storm.topology.TopologyBuilder;
  12. public class CdrTopology {
  13. public static void main(String[] args) {
  14. TopologyBuilder topologyBuilder = new TopologyBuilder();
  15. String KAFKASPOUT = KafkaSpout.class.getSimpleName();
  16. String SPLIT_BOLT = SplitBolt.class.getSimpleName();
  17. String SAVETOKAFKABOLT = SaveCallLogToKafkaBolt.class.getSimpleName();
  18. String SPLIT_BOLT1 = SplitBolt1.class.getSimpleName();
  19. String SAVETOKAFKABOLT1 = SaveCallLogToKafkaBolt1.class.getSimpleName();
  20. String SAVETOMYSQL = SavaCallLogToMysql.class.getSimpleName();
  21. //配置zookeeper
  22. BrokerHosts hosts = new ZkHosts("localhost:2181");//指定kafka使用的zk地址
  23. String topic = "cdr_log";//主题
  24. String zkRoot = "/kafka";//指定一个zk节点,节点不存在会自动创建。[这个节点会创建在storm集群使用的zk中]
  25. String id = "123";//groupid
  26. SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
  27. topologyBuilder.setSpout(KAFKASPOUT, new KafkaSpout(spoutConf));
  28. //topologyBuilder.setSpout(SPOUT_ID, new KafkaSpout(spoutConf),3);
  29. topologyBuilder.setBolt(SPLIT_BOLT1, new SplitBolt1()).shuffleGrouping(KAFKASPOUT);
  30. topologyBuilder.setBolt(SAVETOKAFKABOLT1, new SaveCallLogToKafkaBolt1()).shuffleGrouping(SPLIT_BOLT,"calllog");
  31. //topologyBuilder.setBolt(SAVETOMYSQL, new SavaCallLogToMysql()).shuffleGrouping(SPLIT_BOLT);
  32. StormTopology createTopology = topologyBuilder.createTopology();
  33. String simpleName = CdrTopology.class.getSimpleName();
  34. Config config = new Config();
  35. config.setStatsSampleRate(1D);// 开启精确计数
  36. if(args.length==0){
  37. LocalCluster localCluster = new LocalCluster();
  38. localCluster.submitTopology(simpleName, config, createTopology);
  39. }else{
  40. try {
  41. //config.setNumWorkers(45);
  42. config.setMaxSpoutPending(1000);
  43. StormSubmitter.submitTopology(simpleName, config, createTopology);
  44. } catch (AlreadyAliveException e) {
  45. e.printStackTrace();
  46. } catch (InvalidTopologyException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. }}

3.作业的分区:

  1. import kafka.producer.Partitioner;
  2. import kafka.utils.VerifiableProperties;
  3. /**
  4. * Created by jason on 2016/11/27.
  5. */
  6. public class PartitionerDemo implements Partitioner {
  7. private VerifiableProperties verifiableProperties;
  8. public PartitionerDemo(VerifiableProperties verifiableProperties) {
  9. this.verifiableProperties=verifiableProperties;
  10. }
  11. public int partition(Object key, int numPartitions) {
  12. String strKey= (String) key;
  13. //根据userid的hashCode分区
  14. return strKey.hashCode()%numPartitions;
  15. }
  16. }

4.数据库各个表的类建立

  1. import java.sql.Connection;
  2. import java.sql.SQLException;
  3. import java.sql.Statement;
  4. import java.util.Map;
  5. import cn.com.cintel.storm_siyuan.utils.MyDbUtils;
  6. import clojure.string__init;
  7. import backtype.storm.task.OutputCollector;
  8. import backtype.storm.task.TopologyContext;
  9. import backtype.storm.topology.OutputFieldsDeclarer;
  10. import backtype.storm.topology.base.BaseRichBolt;
  11. import backtype.storm.tuple.Tuple;
  12. public class SavaCallLogToMysql extends BaseRichBolt{
  13. private OutputCollector collector;
  14. private int time=0;
  15. private String callingnumber="0";
  16. private String callednumber="0";
  17. private String callingarea="0";
  18. private String calledarea="0";
  19. int is_land=0;
  20. int domain=0;
  21. private Connection connection;
  22. @Override
  23. public void prepare(Map stormConf, TopologyContext context,
  24. OutputCollector collector) {
  25. // TODO Auto-generated method stub
  26. this.collector = collector;
  27. this.connection = null;//注意:建议在这使用连接池
  28. }
  29. @Override
  30. public void execute(Tuple tuple) {
  31. // TODO Auto-generated method stub
  32. try {
  33. connection = MyDbUtils.getConnection();
  34. Statement state = connection.createStatement();
  35. time = tuple.getIntegerByField("time");
  36. callingnumber = tuple.getStringByField("callingnumber");
  37. callednumber = tuple.getStringByField("callednumber");
  38. callingarea = tuple.getStringByField("callingarea");
  39. calledarea = tuple.getStringByField("calledarea");
  40. is_land = tuple.getIntegerByField("is_land");
  41. domain = tuple.getIntegerByField("domain");
  42. String sql = "insert into calllog(time,callingnumber,callednumber,callingarea,calledarea,is_inland,domain) values("+time+",'"+callingnumber+"',"+"'"+callednumber+"',"+"'"+callingarea+"',"+"'"+calledarea+"',"+is_land+","+domain+")";
  43. System.out.println(sql);
  44. state.executeUpdate(sql);
  45. } catch (SQLException e) {
  46. // TODO Auto-generated catch block
  47. e.printStackTrace();
  48. }finally{
  49. if(connection!=null){
  50. try {
  51. connection.close();
  52. } catch (SQLException e) {
  53. // TODO Auto-generated catch block
  54. e.printStackTrace();
  55. }
  56. }
  57. }
  58. }
  59. @Override
  60. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  61. // TODO Auto-generated method stub
  62. }
  63. @Override
  64. public Map<String, Object> getComponentConfiguration() {
  65. // TODO Auto-generated method stub
  66. return super.getComponentConfiguration();
  67. }
  68. }
  1. import java.io.BufferedInputStream;
  2. import java.io.File;
  3. import java.io.FileInputStream;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6. import java.util.Map;
  7. import java.util.Properties;
  8. import java.util.Random;
  9. import kafka.javaapi.producer.Producer;
  10. import kafka.producer.KeyedMessage;
  11. import kafka.producer.ProducerConfig;
  12. import org.omg.CORBA.PRIVATE_MEMBER;
  13. import backtype.storm.task.OutputCollector;
  14. import backtype.storm.task.TopologyContext;
  15. import backtype.storm.topology.OutputFieldsDeclarer;
  16. import backtype.storm.topology.base.BaseRichBolt;
  17. import backtype.storm.tuple.Tuple;
  18. public class SaveCallLogToKafkaBolt extends BaseRichBolt {
  19. private String topic;
  20. private Properties prop;
  21. private Producer<String, String> producer;
  22. Random random=null;
  23. private int time;
  24. private String callingnumber;
  25. private String callednumber;
  26. private String callingarea;
  27. private String calledarea;
  28. int is_land;
  29. int domain;
  30. @Override
  31. public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
  32. System.out.println("SaveCallLogToKafkaBolt start");
  33. topic = "call_log";
  34. prop = new Properties();
  35. try {
  36. //加载producer的配置
  37. prop.load(SaveCallLogToKafkaBolt.class.getClassLoader().getResourceAsStream("producer.properties"));
  38. } catch (IOException e) {
  39. e.printStackTrace();
  40. }
  41. /*prop.setProperty("metadata.broker.list", "192.168.115.130:9092,192.168.115.132:9092,192.168.115.133:9092");
  42. prop.setProperty("partitioner.class", "cn.com.cintel.storm_siyuan.PartitionerDemo");
  43. prop.setProperty("producer.type", "sync");
  44. prop.setProperty("compression.codec", "none");
  45. prop.setProperty("serializer.class", "kafka.serializer.StringEncoder");*/
  46. producer = new Producer<>(new ProducerConfig(prop));
  47. random=new Random();
  48. }
  49. @Override
  50. public void execute(Tuple tuple) {
  51. try
  52. {
  53. StringBuffer keyedMessage = getKeyedMessage(tuple);
  54. String msg = keyedMessage.toString();
  55. //System.out.println(msg);
  56. Integer tt=random.nextInt(3);
  57. System.out.println("SaveCallLogToKafkaBolt msg:"+msg);
  58. producer.send(new KeyedMessage<String, String>(topic,tt.toString(),msg));
  59. }
  60. catch (Exception e)
  61. {
  62. // TODO Auto-generated catch block
  63. e.printStackTrace();
  64. }
  65. }
  66. private StringBuffer getKeyedMessage(Tuple tuple) {
  67. // TODO Auto-generated method stub
  68. time = tuple.getIntegerByField("time");
  69. callingnumber = tuple.getStringByField("callingnumber");
  70. callednumber = tuple.getStringByField("callednumber");
  71. callingarea = tuple.getStringByField("callingarea");
  72. calledarea = tuple.getStringByField("calledarea");
  73. is_land = tuple.getIntegerByField("is_land");
  74. domain = tuple.getIntegerByField("domain");
  75. StringBuffer msg = new StringBuffer();
  76. msg.append(time);
  77. msg.append("|" +callingnumber);
  78. msg.append("|" +callednumber);
  79. msg.append("|" +callingarea);
  80. msg.append("|" +calledarea);
  81. msg.append("|" +is_land);
  82. msg.append("|" +domain);
  83. msg.append("|" +"55");
  84. msg.append("|" +"66");
  85. msg.append("|" +";");
  86. return msg;
  87. }
  88. @Override
  89. public void declareOutputFields(OutputFieldsDeclarer arg0) {
  90. // TODO Auto-generated method stub
  91. }
  92. }

5.storm的bolt处理(bolt:对tuple处理的抽象过程)

  1. import java.util.Map;
  2. import backtype.storm.task.OutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseRichBolt;
  6. import backtype.storm.tuple.Fields;
  7. import backtype.storm.tuple.Tuple;
  8. import backtype.storm.tuple.Values;
  9. public class SplitBolt1 extends BaseRichBolt{
  10. private OutputCollector collector;
  11. String log;
  12. String[] splited;
  13. String time;
  14. public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
  15. // TODO Auto-generated method stub
  16. this.collector = arg2;
  17. }
  18. public void execute(Tuple input) {
  19. // TODO Auto-generated method stub
  20. //time,callingnumber,callednumber,callingarea,calledarea,is_land,domain
  21. try {
  22. log = new String(input.getBinaryByField("bytes"));
  23. splited = log.split("\\|",-1);
  24. Message msg = new Message();
  25. msg.setTime(Integer.parseInt(splited[0]));
  26. msg.setCallingnumber(splited[1]);
  27. msg.setCallednumber(splited[2]);
  28. msg.setCallingarea(splited[3]);
  29. msg.setCalledarea(splited[4]);
  30. msg.setIs_land(Integer.parseInt(splited[5]));
  31. msg.setDomain(Integer.parseInt(splited[6]));
  32. this.collector.emit("calllog", new Values(msg));
  33. this.collector.ack(input);
  34. } catch (NumberFormatException e) {
  35. // TODO Auto-generated catch block
  36. this.collector.fail(input);
  37. e.printStackTrace();
  38. }
  39. }
  40. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  41. // TODO Auto-generated method stub
  42. declarer.declareStream("calllog",new Fields("calllog"));
  43. }
  44. }

对于spark和hive的完善,请看下篇。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/362708?site
推荐阅读
相关标签
  

闽ICP备14008679号