赞
踩
由于做毕设之前学过大数据,但是一直没有做过一整套的实时数据分析系统,有点遗憾。所以毕业设计就自主选了这一套系统,算是对之前知识进行一次整合运行,也挑战一下自己。
该系统主要对用户行为日志(此项目使用的数据源是数据集,可以根据自己需求,在数据采集时监控网站用户数据存放目录或者用爬虫实时爬取数据的存放目录)进行实时分析可视化。
先放最终系统成果,才有耐心看下面的内容!!!
登录系统后,通过日志采集模块来采集目标日志数据,将采集到的数据发送给日志传输模块,数据存放于kafka对应的topic中;数据处理模块创建与kafka的连接,消费对应topic中的数据,对数据进行预处理之后再进行处理分析,处理所得的结果数据存放进对应各topic中,以便于数据可视化,同时也将结果数据存入Redis数据库,便于后期其他功能分析使用。最后通过可视化模块,后台使用Flask作为Web框架,前端使用H5+Echarts,将结果数据进行可视化。系统流程图如图所示:
系统相关技术和组件:
Hadoop、Spark、Flume、Kafka、Zookeeper、Flask、SocketIO、Echarts、Scala、Python。项目架构如图所示:
此项目由于计算机硬件配置较低,所以采用Hadoop伪分布式集群(部署在虚拟机的linux系统上用于存放源数据和程序检查点)和单机Spark集群(部署在本地windows上)
1.Hadoop2.9.2
伪分布式搭建参考(此项目) https://blog.csdn.net/xujingran/article/details/83898140
全分布式搭建参考 https://blog.csdn.net/u011254180/article/details/77922331
2.Flume1.9.0
搭建参考 https://blog.csdn.net/caodaoxi/article/details/8885645
Flume作为kafka的sink的配置文件:
3.Kafka2.4.0
伪分布式搭建参考(此) https://blog.csdn.net/weixin_42207486/article/details/80635246
全分布式搭建参考 https://blog.csdn.net/qq_39211575/article/details/103677016
5.Spark2.4.4
Windows单机搭建参考(此) https://blog.csdn.net/Python_Big_love/article/details/81878142
6.Zookeeper3.5.6
伪分布式搭建参考(此项目)https://blog.csdn.net/MISSRIVEN/article/details/81394595
全分布式搭建参考 https://blog.csdn.net/sjhuangx/article/details/81155501
7.flask(系统Web框架)
安装参考 https://blog.csdn.net/cckavin/article/details/90766924
注意!!!
在本地(windows)Spark集群中编写SparkStreaming程序的时候,引入maven配置信息中(此项目依赖如下),scala、kafka、Spark-Streaming-kafka的版本都需要一致,高版本低版本都不行。
本系统使用scala版本为2.11、spark版本为2.4.4(此版本也有scala2.12编写版)、kafka版本为2.4.0(spark-streaming-kafka0.8最高支持kafka2.3.0以下版本,所以此项目使用0.10版本)
此外spark-streaming-kafka0.8和0.10在连接kafka时有差别,网上搜的两种连接分别为Receiver DStream和Direct DStream,但是0.10版本取消了Receiver DStream,所以只能用后面一种,而且创建实时数据流代码网上示例很多都过时了,需要用以下官网最新连接代码。(读取kafka数据会报序列化错误,需要注册序列化方式,以下代码中已加入Kryo序列化方式)
//构建conf ssc 对象 val conf = new SparkConf(). setAppName("Kafka_director"). setMaster("local[2]"). set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses(Array( classOf[Array[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]] )) val sc:SparkContext=new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(3)) //设置数据检查点 ssc.checkpoint("hdfs://192.168.222.132:9000/checkpoint") //kafka 需要Zookeeper 需要消费者组 val topics = Set("demo") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "192.168.222.132:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g1", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val data = KafkaUtils.createDirectStream( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
topic | 用处 |
---|---|
demo | 接受源数据 |
ordernumall | 总订单数 |
ordernumgender | 男女购物人数 |
behavior | pv+buy+cart+fav |
visitnum | 总访问量 |
ordernumage | 各年龄段购物人数 |
ordernumbrandtop | 各品牌销量 |
ordernumcattop | 各商品类别销量 |
ordernumregion | 各地区订单量 |
创建kafka连接,用于消费目标topic中的数据,创建kafka生产者发送结果数据到对应的topic。设立检查点。
//构建conf ssc 对象
val conf = new SparkConf().
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。