当前位置:   article > 正文

大数据案例--网站流量项目(中)_大数据项目开发案例

大数据项目开发案例

目录 

一、Hive做离线批处理

1、实现步骤

①、启动hadoop,启动hive

②、在hive下创建weblog库,并使用

③、 创建外部表管理数据

④、为总表添加当天分区数据

⑤、建立数据清洗表,用于清洗出业务所需的字段。

⑥、业务处理

⑦、创建业务表并插入数据

⑧、从清洗表查询得到当天的统计指标,插入到业务表中

⑨、利用Sqoop工具从HDFS上将数据导入到Mysql数据库中

二、Hive的占位符与文件调用

1、概述

2、Hive文件的调用

3、Hive占位符的使用

4、结合业务实现

5、Linux Crontab定时任务

三、实时业务系统搭建

1、Flume与Kafka的连通

四、实时流开发环境搭建

1、Spark与HBase整合基础

2、实时流业务处理


一、Hive做离线批处理

1、实现步骤

①、启动hadoop,启动hive

进入hive的bin目录(以后台方式启动)

nohup hive --service metastore &

nohup hive --service hiveserver2 &

sh hive

②、在hive下创建weblog库,并使用

create database weblog;
use weblog

③、 创建外部表管理数据

建立总表,用于管理所有的字段数据。

总表特点:管理所有字段,外部表,分区表

hdfs上的数据:

建表语句:

create external table flux (url string,urlname string,title string,chset string,scr string,col string,lg string,je string,ec string,fv string,cn string,ref string,uagent string,stat_uv string,stat_ss string,cip string) PARTITIONED BY (reporttime string) row format delimited fields terminated by '|' location '/weblog';

 

④、为总表添加当天分区数据

1、msck repair table flux;

2、alter table flux add partition(reporttime='2022-04-20') location '/weblog/reporttime=2022-04-20';

⑤、建立数据清洗表,用于清洗出业务所需的字段。

dataclear   指定的分割符 :   |

去除多余字段,只保留需要的字段,并将会话信息拆开保存

所需要的字段为:

reporttime、url、urlname、uvid、ssid、sscount、sstime、cip

create table dataclear(reportTime string,url string,urlname string,uvid string,ssid string,sscount string,sstime string,cip string)row format delimited fields terminated by '|';

 从总表中查询出当天的对应的字段插入到清洗表中

  1. insert overwrite table dataclear
  2. select reporttime,url,urlname,stat_uv,split(stat_ss,"_")[0],split(stat_ss,"_")[1],split(stat_ss,"_")[2],cip from flux;

⑥、业务处理

1、pv

select count(*) as pv from dataclear where reportTime = '2022-04-20';

2、uv

uv - 独立访客数 - 一天之内所有的访客的数量 - 一天之内uvid去重后的总数

select count(distinct uvid) as uv from dataclear where reportTime = '2022-04-20';

3、vv

vv - 独立会话数 - 一天之内所有的会话的数量 - 一天之内ssid去重后的总数

select count(distinct ssid) as vv from dataclear where reportTime = '2022-04-20';

4、br

br - 跳出率 - 一天内跳出的会话总数/会话总数

select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime='2022-04-20' group by ssid having count(ssid) = 1) as br_tab) as br_taba,(select count(distinct ssid) as b from dataclear where reportTime='2022-04-20') as br_tabb;

这段sql就是对会话id分组,然后求出会话id为1的个数,这个就是跳出会话

5、newip

newip - 新增ip总数 - 一天内所有ip去重后在历史数据中从未出现过的数量

select count(distinct dataclear.cip) from dataclear where dataclear.reportTime = '2022-04-20' and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime < '2022-04-20');

6、newcust

newcust - 新增客户数 - 一天内所有的uvid去重后在历史数据中从未出现过的总数

select count(distinct dataclear.uvid) from dataclear where dataclear.reportTime='2021-05-10'

and uvid not in

(select dc2.uvid from dataclear as dc2 where dc2.reportTime < '2021-05-10');

7、avgtime

avgtime - 平均访问时常 - 一天内所有会话的访问时常的平均值

注: 一个会话的时长 = 会话中所有访问的时间的最大值 - 会话中所有访问时间的最小值

select avg(atTab.usetime) as avgtime from(select max(sstime) - min(sstime) as usetime from dataclear where reportTime='2022-04-20' group by ssid) as atTab;

8、avgdeep

avgdeep - 平均访问深度 - 一天内所有会话访问深度的平均值

一个会话的访问深度=一个会话访问的所有url去重后的个数

比如会话①:url http://demo/a.jsp     http://demo/b.jsp   http://demo/a.jsp 则访问深度是2

select round(avg(adTab.deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime='2022-04-20' group by ssid) as adTab;

⑦、创建业务表并插入数据

create table tongji(reportTime string,pv int,uv int,vv int, br double,newip int, newcust int, avgtime double,avgdeep double) row format delimited fields terminated by '|';

⑧、从清洗表查询得到当天的统计指标,插入到业务表中

  1. insert overwrite table tongji select '2022-04-20',tab1.pv,tab2.uv,tab3.vv,tab4.br,tab5.newip,tab6.newcust,tab7.avgtime,tab8.avgdeep from
  2. (select count(*) as pv from dataclear where reportTime = '2022-04-20') as tab1,
  3. (select count(distinct uvid) as uv from dataclear where reportTime = '2022-04-20') as tab2,
  4. (select count(distinct ssid) as vv from dataclear where reportTime = '2022-04-20') as tab3,
  5. (select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime='2022-04-20' group by ssid having count(ssid) = 1) as br_tab) as br_taba,
  6. (select count(distinct ssid) as b from dataclear where reportTime='2022-04-20') as br_tabb) as tab4,
  7. (select count(distinct dataclear.cip) as newip from dataclear where dataclear.reportTime = '2022-04-20' and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime < '2022-04-20')) as tab5,
  8. (select count(distinct dataclear.uvid) as newcust from dataclear where dataclear.reportTime='2022-04-20' and uvid not in (select dc2.uvid from dataclear as dc2 where dc2.reportTime < '2022-04-20')) as tab6,
  9. (select round(avg(atTab.usetime),4) as avgtime from (select max(sstime) - min(sstime) as usetime from dataclear where reportTime='2022-04-20' group by ssid) as atTab) as tab7,
  10. (select round(avg(deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime='2022-04-20' group by ssid) as adTab) as tab8;

⑨、利用Sqoop工具从HDFS上将数据导入到Mysql数据库中

进入mysql:

mysql -uroot -proot
创建和使用库:

create database weblog;
use weblog;

创建表:

create table tongji(reporttime varchar(40),pv int,uv int,vv int,br double,newip int,newcust int,avgtime double,avgdeep double);

使用Sqoop将数据导入到mysql中:

进入:

cd /home/software/sqoop-1.4.7/bin/
执行:

sh sqoop export --connect jdbc:mysql://hadoop01:3306/weblog --username root --password root --export-dir '/user/hive/warehouse/weblog.db/tongji' --table tongji -m 1 --fields-terminated-by '|'

 进入mysql 查询:

二、Hive的占位符与文件调用

1、概述

对于上面的工作,我们发现需要手动去写hql语句从而完成离线数据的ETL,但每天都手动来做显然是不合适的,所以可以利用hive的文件调用与占位符来解决这个问题。

2、Hive文件的调用

实现步骤:

①、编写一个文件,后缀名为.hive

比如我们现在我们创建一个01.hive文件

目的是在 hive的weblog数据库下,创建一个tb1表

  1. use weblog;
  2. create table tb1 (id int,name string);

②、进入hive安装目录的bin目录

执行: sh hive -f  01.hive

注:-f  参数后跟的是01.hive文件的路径

③、测试hive的表是否创建成功

 

3、Hive占位符的使用

我们现在想通过hive执行文件,将 "tb1"这个表删除,则我们可以这样做:

①、创建02.hive文件

  1. use  weblog;
  2. drop table ${tb_name};

②、在bin目录下,执行:

sh hive -f 02.hive -d tb_name="tb1"

4、结合业务实现

在hive最后插入数据时,涉及到一个日志的分区是以每天为单位,所以我们需要手动去写这个日期,比如 2022-04-20。我们可以这样做:

①、将hql语句里的日期相关的取值用占位符来表示,并写在weblog.hive文件里

  1. use weblog;
  2. insert overwrite table tongji select ${reportTime},tab1.pv,tab2.uv,tab3.vv,tab4.br,tab5.newip,tab6.newcust,tab7.avgtime,tab8.avgdeep from (select count(*) as pv from dataclear where reportTime = ${reportTime}) as tab1,(select count(distinct uvid) as uv from dataclear where reportTime = ${reportTime}) as tab2,(select count(distinct ssid) as vv from dataclear where reportTime = ${reportTime}) as tab3,(select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime=${reportTime} group by ssid having count(ssid) = 1) as br_tab) as br_taba,(select count(distinct ssid) as b from dataclear where reportTime=${reportTime}) as br_tabb) as tab4,(select count(distinct dataclear.cip) as newip from dataclear where dataclear.reportTime = ${reportTime} and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime < ${reportTime})) as tab5,(select count(distinct dataclear.uvid) as newcust from dataclear where dataclear.reportTime=${reportTime} and uvid not in (select dc2.uvid from dataclear as dc2 where dc2.reportTime < ${reportTime})) as tab6,(select round(avg(atTab.usetime),4) as avgtime from (select max(sstime) - min(sstime) as usetime from dataclear where reportTime=${reportTime} group by ssid) as atTab) as tab7,(select round(avg(deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime=${reportTime} group by ssid) as adTab) as tab8;

②、在hive 的bin目录下执行:

sh hive -f  weblog.hive -d reportTime="2022-04-20"

对于日期,如果不想手写的话,可以通过linux的指令来获取:

 date "+%G-%m-%d"

所以我们可以这样来执行hive文件的调用:

sh hive -f weblog.hive -d reportTime=`date "+%G-%m-%d"`(注:是键盘右上方的反引号)

也可以写为:

sh hive -f weblog.hive -d reportTime=$(date "+%G-%m-%d")

5、Linux Crontab定时任务

在工作中需要数据库在每天零点自动备份所以需要建立一个定时任务。

crontab命令的功能是在一定的时间间隔调度一些命令的执行。

可以通过 crontab -e 进行定时任务的编辑

crontab文件格式:

*               *          *        *             *           command

minute   hour    day     month   week      command

分            时         天      月          星期        命令

示例:

 */1 * * * * rm -rf /home/software/1.txt

     每隔一分钟,删除指定目录的 1.txt文件

 对于上面的项目,我们可以这样写:

45 23 * * * ./home/software/hive-3.1.2/bin/hive -f /home/software/hive-3.1.2/bin/weblog.hive -d time=`date "+%G-%m
-%d"`

三、实时业务系统搭建

1、Flume与Kafka的连通

1.启动zk集群

2.启动kafka集群在其bin目录下执行

指令:sh kafka-server-start.sh ../config/server.properties

3.创建主题

查看主题:sh kafka-topics.sh --list --zookeeper hadoop01:2181

创建主题:sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic fluxdata

4.配置flume的data下的weblog.conf(自己建的)

  1. a1.sources=r1
  2. a1.channels=c1 c2
  3. a1.sinks=k1 k2
  4. a1.sources.r1.type=avro
  5. a1.sources.r1.bind=0.0.0.0
  6. a1.sources.r1.port=44444
  7. a1.sources.r1.interceptors=i1
  8. a1.sources.r1.interceptors.i1.type=timestamp
  9. a1.sinks.k1.type=hdfs
  10. a1.sinks.k1.hdfs.path=hdfs://192.168.186.128:9000/weblog/reportTime=%Y-%m-%d
  11. a1.sinks.k1.hdfs.fileType=DataStream
  12. a1.sinks.k1.hdfs.rollInterval=0
  13. a1.sinks.k1.hdfs.rollSize=0
  14. a1.sinks.k1.hdfs.rollCount=1000
  15. a1.sinks.k2.type=org.apache.flume.sink.kafka.KafkaSink
  16. a1.sinks.k2.brokerList=hadoop01:9092,hadoop02:9092,hadoop03:9092
  17. a1.sinks.k2.topic=fluxdata
  18. a1.channels.c1.type=memory
  19. a1.channels.c1.capacity=1000
  20. a1.channels.c1.transactionCapacity=100
  21. a1.channels.c2.type=memory
  22. a1.channels.c2.capacity=1000
  23. a1.channels.c2.transactionCapacity=100
  24. a1.sources.r1.channels=c1 c2
  25. a1.sinks.k1.channel=c1
  26. a1.sinks.k2.channel=c2

启动hadoop

在flume的data目录,执行下面语句启动flume:

../bin/flume-ng agent -n a1 -c ./ -f ./weblog.conf -Dflume.root.logger=INFO,console

5.启动tomcat,访问埋点服务器

6.测试kafka是否能够收到数据

        进入kafka的bin目录,启动kafka消费者线程:

 sh kafka-console-consumer.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic fluxdata --from-beginning

这时候我们访问页面:

四、实时流开发环境搭建

1、Spark与HBase整合基础

实现步骤:

1、启动IDEA

2、创建Maven工程,骨架选择quickstart

3、IDEA安装scala

4、为FluxStreamingServer工程添加scala sdk

这里如果spark如果是2版本,我们scala用scala2.11.7,稳定;如果是3版本,我们可以用scala2.12.X

5、创建一个scala目录,使其称为sources root

6、引入工程pom

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>org.example</groupId>
  6. <artifactId>FluxStreamingServer</artifactId>
  7. <version>1.0-SNAPSHOT</version>
  8. <name>FluxStreamingServer</name>
  9. <!-- FIXME change it to the project's website -->
  10. <url>http://www.example.com</url>
  11. <properties>
  12. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  13. <maven.compiler.source>1.7</maven.compiler.source>
  14. <maven.compiler.target>1.7</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>junit</groupId>
  19. <artifactId>junit</artifactId>
  20. <version>4.11</version>
  21. <scope>test</scope>
  22. </dependency>
  23. <!--spark -->
  24. <dependency>
  25. <groupId>org.apache.spark</groupId>
  26. <artifactId>spark-core_2.12</artifactId>
  27. <version>3.1.2</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.apache.spark</groupId>
  31. <artifactId>spark-streaming_2.12</artifactId>
  32. <version>3.1.1</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.apache.spark</groupId>
  36. <artifactId>spark-sql_2.12</artifactId>
  37. <version>3.1.2</version>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.apache.spark</groupId>
  41. <artifactId>spark-mllib_2.12</artifactId>
  42. <version>3.1.2</version>
  43. </dependency>
  44. <!-- kafka -->
  45. <dependency>
  46. <groupId>org.apache.kafka</groupId>
  47. <artifactId>kafka_2.12</artifactId>
  48. <version>2.8.0</version>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.apache.spark</groupId>
  52. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  53. <version>3.1.2</version>
  54. </dependency>
  55. <!--HBase-->
  56. <dependency>
  57. <groupId>org.apache.hbase</groupId>
  58. <artifactId>hbase</artifactId>
  59. <version>2.4.2</version>
  60. <type>pom</type>
  61. </dependency>
  62. <dependency>
  63. <groupId>org.apache.hbase</groupId>
  64. <artifactId>hbase-client</artifactId>
  65. <version>2.4.2</version>
  66. </dependency>
  67. <dependency>
  68. <groupId>org.apache.hbase</groupId>
  69. <artifactId>hbase-common</artifactId>
  70. <version>2.4.2</version>
  71. </dependency>
  72. <dependency>
  73. <groupId>org.apache.hbase</groupId>
  74. <artifactId>hbase-server</artifactId>
  75. <version>2.4.2</version>
  76. </dependency>
  77. <dependency>
  78. <groupId>org.apache.hbase</groupId>
  79. <artifactId>hbase-protocol</artifactId>
  80. <version>2.4.2</version>
  81. </dependency>
  82. <dependency>
  83. <groupId>org.apache.hbase</groupId>
  84. <artifactId>hbase-hadoop-compat</artifactId>
  85. <version>2.4.2</version>
  86. </dependency>
  87. <dependency>
  88. <groupId>org.apache.hbase</groupId>
  89. <artifactId>hbase-mapreduce</artifactId>
  90. <version>2.4.2</version>
  91. </dependency>
  92. <!--mysql-->
  93. <dependency>
  94. <groupId>mysql</groupId>
  95. <artifactId>mysql-connector-java</artifactId>
  96. <version>5.1.38</version>
  97. </dependency>
  98. <dependency>
  99. <groupId>com.mchange</groupId>
  100. <artifactId>c3p0</artifactId>
  101. <version>0.9.5.5</version>
  102. </dependency>
  103. </dependencies>
  104. <build>
  105. <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
  106. <plugins>
  107. <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
  108. <plugin>
  109. <artifactId>maven-clean-plugin</artifactId>
  110. <version>3.1.0</version>
  111. </plugin>
  112. <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
  113. <plugin>
  114. <artifactId>maven-resources-plugin</artifactId>
  115. <version>3.0.2</version>
  116. </plugin>
  117. <plugin>
  118. <artifactId>maven-compiler-plugin</artifactId>
  119. <version>3.8.0</version>
  120. </plugin>
  121. <plugin>
  122. <artifactId>maven-surefire-plugin</artifactId>
  123. <version>2.22.1</version>
  124. </plugin>
  125. <plugin>
  126. <artifactId>maven-jar-plugin</artifactId>
  127. <version>3.0.2</version>
  128. </plugin>
  129. <plugin>
  130. <artifactId>maven-install-plugin</artifactId>
  131. <version>2.5.2</version>
  132. </plugin>
  133. <plugin>
  134. <artifactId>maven-deploy-plugin</artifactId>
  135. <version>2.8.2</version>
  136. </plugin>
  137. <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
  138. <plugin>
  139. <artifactId>maven-site-plugin</artifactId>
  140. <version>3.7.1</version>
  141. </plugin>
  142. <plugin>
  143. <artifactId>maven-project-info-reports-plugin</artifactId>
  144. <version>3.0.0</version>
  145. </plugin>
  146. <plugin>
  147. <artifactId>maven-jar-plugin</artifactId>
  148. <version>3.0.2</version>
  149. <configuration>
  150. <archive>
  151. <manifest>
  152. <addClasspath>true</addClasspath>
  153. <useUniqueVersions>false</useUniqueVersions>
  154. <classpathPrefix>lib/</classpathPrefix>
  155. <mainClass>cn.tedu.streaming.StreamingDriver</mainClass>
  156. </manifest>
  157. </archive>
  158. </configuration>
  159. </plugin>
  160. </plugins>
  161. </pluginManagement>
  162. </build>
  163. </project>

7、学习Spark与Hbase整合基础

新建一个object

代码如下 

  1. package cn.yang.basic
  2. import org.apache.hadoop.fs.shell.find.Result
  3. import org.apache.hadoop.hbase.client.Put
  4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  5. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
  6. import org.apache.hadoop.mapreduce.Job
  7. import org.apache.spark.{SparkConf, SparkContext}
  8. /**
  9. * 如何通过Spark将数据写出到HBase表中
  10. */
  11. object HBaseWriteDriver {
  12. def main(args: Array[String]): Unit = {
  13. val conf = new SparkConf().setMaster("local").setAppName("writeHBase")
  14. val sc = new SparkContext(conf)
  15. //设定zookeeper集群IP地址。注意主机名和服务器ip对应一致
  16. sc.hadoopConfiguration.set("hbase.zookeeper.quorum",
  17. "hadoop01,hadoop02,hadoop03")
  18. //设定zookeeper通信端口
  19. sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")
  20. //指定输出的HBase表名
  21. sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"tbx")
  22. //创建Hadoop Job对象
  23. val job = new Job(sc.hadoopConfiguration)
  24. //设定输出的key类型
  25. job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  26. //设定输出的value类型,导包:org.apache.hadoop.fs.shell.find.Result
  27. job.setOutputValueClass(classOf[Result])
  28. //设定输出表类型
  29. job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
  30. val rdd=sc.parallelize(List("1 tom 23","2 rose 18","3 jim 25","4 jary 30"))
  31. //为了能够将数据插入到HBase表,需要做类型转换 RDD[String]->RDD[(输出key,输出value)]
  32. val hbaseRDD=rdd.map{line=>
  33. val arr = line.split(" ")
  34. val id=arr(0)
  35. val name=arr(1)
  36. val age=arr(2)
  37. //创建HBase的行对象并指定行键。导包:org.apache.hadoop.hbase.client.Put
  38. val row = new Put(id.getBytes())
  39. //①参:列族名 ②参:列名 ③参:列值
  40. row.addColumn("cf1".getBytes(),"name".getBytes(),name.getBytes())
  41. row.addColumn("cf1".getBytes(),"age".getBytes(),age.getBytes())
  42. (new ImmutableBytesWritable(),row)
  43. }
  44. //执行插入
  45. hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
  46. }
  47. }

8、启动服务器,三台zookeeper,hadoop,hbase

cd /home/software/hbase-2.4.2/bin/
sh start-hbase.sh 

9、进入01的hbase的shell建表 sh hbase shell

10、执行代码,附上读取与扫描hbase代码:

  1. package cn.yang.basic
  2. import org.apache.hadoop.hbase.HBaseConfiguration
  3. import org.apache.hadoop.hbase.client.Result
  4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  6. import org.apache.spark.{SparkConf, SparkContext}
  7. object HBaseReadDriver {
  8. def main(args: Array[String]): Unit = {
  9. val conf=new SparkConf().setMaster("local").setAppName("read")
  10. val sc=new SparkContext(conf)
  11. //创建HBase环境参数对象
  12. val hbaseConf=HBaseConfiguration.create()
  13. hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
  14. hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
  15. //指定读取的表名
  16. hbaseConf.set(TableInputFormat.INPUT_TABLE,"tbx")
  17. //执行读取。并将HBase表数据读取到RDD结果集中
  18. val resultRDD=sc.newAPIHadoopRDD(hbaseConf,
  19. classOf[TableInputFormat],
  20. classOf[ImmutableBytesWritable],
  21. //导包:import org.apache.hadoop.hbase.client.Result
  22. classOf[Result])
  23. resultRDD.foreach{case(k,v)=>
  24. val name=v.getValue("cf1".getBytes(),"name".getBytes())
  25. val age=v.getValue("cf1".getBytes(),"age".getBytes())
  26. println(new String(name)+":"+new String(age))
  27. }
  28. }
  29. }
  1. package cn.yang.basic
  2. import org.apache.commons.codec.binary.Base64
  3. import org.apache.hadoop.hbase.HBaseConfiguration
  4. import org.apache.hadoop.hbase.client.{Result, Scan}
  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  6. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  7. import org.apache.hadoop.hbase.protobuf.ProtobufUtil
  8. import org.apache.spark.{SparkConf, SparkContext}
  9. /**
  10. * 学习如何扫描hbase表数据
  11. */
  12. object HBaseScanDriver {
  13. def main(args: Array[String]): Unit = {
  14. val conf=new SparkConf().setMaster("local").setAppName("read")
  15. val sc=new SparkContext(conf)
  16. //创建HBase环境参数对象
  17. val hbaseConf=HBaseConfiguration.create()
  18. hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
  19. hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
  20. //指定读取的表名
  21. hbaseConf.set(TableInputFormat.INPUT_TABLE,"tbx")
  22. //创建HBase scan扫描对象
  23. val scan=new Scan()
  24. //设定扫描的起始行键
  25. scan.setStartRow("2".getBytes())
  26. //设定扫描终止行键。含头不含尾
  27. scan.setStopRow("4".getBytes())
  28. //设定scan对象使其生效
  29. hbaseConf.set(TableInputFormat.SCAN,
  30. Base64.encodeBase64String(ProtobufUtil.toScan(scan).toByteArray()))
  31. //执行读取。并将HBase表数据读取到RDD结果集中
  32. val resultRDD=sc.newAPIHadoopRDD(hbaseConf,
  33. classOf[TableInputFormat],
  34. classOf[ImmutableBytesWritable],
  35. //导包:import org.apache.hadoop.hbase.client.Result
  36. classOf[Result])
  37. resultRDD.foreach{case(k,v)=>
  38. val name=v.getValue("cf1".getBytes(),"name".getBytes())
  39. val age=v.getValue("cf1".getBytes(),"age".getBytes())
  40. println(new String(name)+":"+new String(age))
  41. }
  42. }
  43. }

2、实时流业务处理

实现步骤:

①、启动三台服务器,启动zookeeper,启动hadoop,启动kafka,启动flume

cd /home/software/kafka_2.10-0.10.0.1/bin/
sh kafka-server-start.sh ../config/server.properties

在flume的data目录,执行下面语句启动flume:

../bin/flume-ng agent -n a1 -c ./ -f ./weblog.conf -Dflume.root.logger=INFO,console

②、整合SparkStreaming与kafka,完成代码编写

在FluxStreamingServer下的scala文件下新建一个包,streaming,新建一个Driver

添加代码,下面为全部内容的代码:

结构:

Driver: 

  1. package cn.yang.streaming
  2. import cn.yang.TongjiBean
  3. import cn.yang.dao.{HBaseUtil, MysqlUtil}
  4. import cn.yang.pojo.LogBean
  5. import org.apache.kafka.common.serialization.StringDeserializer
  6. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  7. import org.apache.spark.streaming.kafka010.KafkaUtils
  8. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  9. import org.apache.spark.streaming.{Seconds, StreamingContext}
  10. import org.apache.spark.{SparkConf, SparkContext}
  11. import java.util.Calendar
  12. object Driver {
  13. def main(args: Array[String]): Unit = {
  14. //如果后续要使用SparkStreaming从kafka消费数据,启动的线程数至少是2个
  15. //其中一个线程负责SparkStreaming,另外一个线程负责从kafka消费数据
  16. //还需要设定一下序列化参数
  17. val conf=new SparkConf().setMaster("local[2]").setAppName("kafkasource")
  18. .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  19. val sc= new SparkContext(conf)
  20. sc.setLogLevel("error")
  21. //创建SparkStreaming对象,并指定批大小
  22. val ssc = new StreamingContext(sc,Seconds(5))
  23. //指定从kafka消费的主题,通过Array可以指定消费多个主题
  24. val topics = Array("fluxdata")
  25. //指定kafka的配置参数。通过Map来进行设定,key是属性名,value是属性值
  26. //需要指定:kafka服务集群列表;key value的序列化类型,固定为String类型,消费者组名
  27. val kafkaParams: Map[String, Object] = Map[String, Object](
  28. "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
  29. "key.deserializer" -> classOf[StringDeserializer],
  30. "value.deserializer" -> classOf[StringDeserializer],
  31. "group.id" -> "bos"
  32. )
  33. //1参:SparkStreaming对象 2参:从Kafka消费模式,消费指定主题的所有分区数据
  34. //3参:kafka订阅参阅信息
  35. val stream=KafkaUtils.createDirectStream[String, String](
  36. ssc,
  37. PreferConsistent,
  38. Subscribe[String, String](topics, kafkaParams))
  39. .map(x=>x.value())
  40. //打印输出方式一
  41. //foreachRDD,将当前批次内的所有数据转变为一个RDD
  42. stream.foreachRDD{rdd=>
  43. //将RDD[String]->Iterator[String]迭代器
  44. val lines=rdd.toLocalIterator
  45. //遍历迭代器
  46. while(lines.hasNext){
  47. //获取一条数据
  48. val line=lines.next()
  49. //第一步:做数据字段清洗。所需字段:url urlname uvid ssid sscount sstime cip
  50. val arr= line.split("\\|")
  51. val url=arr(0)
  52. val urlname=arr(1)
  53. val uvid=arr(13)
  54. val ssid=arr(14).split("_")(0)
  55. val sscount=arr(14).split("_")(1)
  56. val sstime=arr(14).split("_")(2)
  57. val cip=arr(15)
  58. //第二步:将清洗好的字段封装到bean中
  59. val logBean=LogBean(url,urlname,uvid,ssid,sscount,sstime,cip)
  60. //第三步:统计实时业务指标。有pv uv vv newip newcust
  61. //这5个指标的统计结果定为两种情况:1或0
  62. //3-1 pv:页面访问量。用户访问1次,就记作1个pv
  63. val pv =1
  64. //3-2 uv:独立用户数。uv=1或uv=0,处理逻辑:
  65. //①拿着当前记录的uvid去HBase表(webtable)查询当天的所有数据
  66. //②、如果没查到此uvid的记录,则记uv=1
  67. //③、如果查到了此uvid的记录,则记uv=0
  68. //实现难点:
  69. //如何查询Hbase表‘当天’的数据?
  70. //查询的起始时间戳startTime=当天的凌晨零点时间戳
  71. //查询的终止时间戳endTime=sstime
  72. val endTime=sstime.toLong
  73. val calendar=Calendar.getInstance()
  74. calendar.setTimeInMillis(endTime)
  75. calendar.set(Calendar.HOUR,0)
  76. calendar.set(Calendar.MINUTE,0)
  77. calendar.set(Calendar.SECOND,0)
  78. calendar.set(Calendar.MILLISECOND,0)
  79. //获取当天凌晨零点的时间戳
  80. val startTime=calendar.getTimeInMillis
  81. //如何判断当前记录中的uvid在HBase表是否出现过?
  82. //可以使用HBase的行键过滤器来实现(使用HBase的行键正则过滤器)
  83. val uvRegex="^\\d+_"+uvid+".*$"
  84. val uvRDD=HBaseUtil.queryHBase(sc,startTime,endTime,uvRegex)
  85. val uv=if(uvRDD.count()==0) 1 else 0
  86. //3-3 vv:独立会话数。vv=1 或 vv=0 判断逻辑同uv
  87. //只不过判断指标变为当前记录的ssid
  88. val vvRegex="^\\d+_\\d+_"+ssid+".*$"
  89. val vvResult=HBaseUtil.queryHBase(sc,startTime,endTime,vvRegex)
  90. val vv=if(vvResult.count()==0) 1 else 0
  91. //3-4:newip:新增ip。newip=1 或newip=0 判断逻辑:
  92. //用当前记录中的ip去HBase表查询历史数据(包含当天)
  93. //如果没查到,则newip=1.反之newip=0
  94. val ipRegex="^\\d+_\\d+_\\d+_"+cip+".*$"
  95. val ipResult=HBaseUtil.queryHBase(sc,startTime=0,endTime,ipRegex)
  96. val newip=if(ipResult.count()==0) 1 else 0
  97. //3-5 newcust:新增用户数。处理逻辑和newip相同
  98. //判断指标更换为uvid。正则使用uvRegex
  99. val custResult=HBaseUtil.queryHBase(sc,startTime=0,endTime,uvRegex)
  100. val newcust=if(custResult.count()==0)1 else 0
  101. //第四步:将统计好的业务指标封装到bean中,然后插入到mysql数据库中
  102. val tongjiBean=TongjiBean(sstime,pv,uv,vv,newip,newcust)
  103. MysqlUtil.saveToMysql(tongjiBean)
  104. //将封装好的bean数据存到HBase表中,供后续做查询使用
  105. HBaseUtil.saveToHBase(sc,logBean)
  106. println(logBean)
  107. }
  108. }
  109. //打印输出方式二,有线-----
  110. //stream.print()
  111. ssc.start()
  112. //保证SparkStreaming一直开启,直到用户主动中断退出为止
  113. ssc.awaitTermination()
  114. }
  115. }

 dao-HBaseUtil

  1. package cn.yang.dao
  2. import cn.yang.pojo.LogBean
  3. import org.apache.commons.codec.binary.Base64
  4. import org.apache.hadoop.fs.shell.find.Result
  5. import org.apache.hadoop.hbase.HBaseConfiguration
  6. import org.apache.hadoop.hbase.client.{Put, Scan}
  7. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
  8. import org.apache.hadoop.hbase.filter.{RegexStringComparator, RowFilter}
  9. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  10. import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
  11. import org.apache.hadoop.hbase.protobuf.ProtobufUtil
  12. import org.apache.hadoop.mapreduce.Job
  13. import org.apache.spark.SparkContext
  14. import scala.util.Random
  15. object HBaseUtil {
  16. def queryHBase(sc: SparkContext, startTime: Long, endTime: Long, regex: String) = {
  17. val hbaseConf=HBaseConfiguration.create()
  18. hbaseConf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03")
  19. hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
  20. //指定读取的表名
  21. hbaseConf.set(TableInputFormat.INPUT_TABLE,"webtable")
  22. val scan=new Scan()
  23. scan.withStartRow(startTime.toString().getBytes)
  24. scan.withStopRow(endTime.toString().getBytes)
  25. //org.apache.hadoop.hbase.filter.RowFilter
  26. val filter=new RowFilter(CompareOp.EQUAL,
  27. new RegexStringComparator(regex))
  28. //绑定过滤器使其生效,即在做范围查询时结合行键正则过滤器来返回对应的结果
  29. scan.setFilter(filter)
  30. //设置scan对象,使其生效
  31. hbaseConf.set(TableInputFormat.SCAN,
  32. Base64.encodeBase64String(ProtobufUtil.toScan(scan).toByteArray()))
  33. //执行读取,将结果返回到结果集RDD中
  34. val resultRDD=sc.newAPIHadoopRDD(hbaseConf,
  35. classOf[TableInputFormat],
  36. classOf[ImmutableBytesWritable],
  37. classOf[org.apache.hadoop.hbase.client.Result])
  38. //QueryByRangeAndRegex方法:返回结果集RDD
  39. resultRDD
  40. }
  41. /*
  42. 将封装好的logbean数据存到指定的HBase中
  43. */
  44. def saveToHBase(sc: SparkContext, logBean: LogBean) = {
  45. sc.hadoopConfiguration.set("hbase.zookeeper.quorum",
  46. "hadoop01,hadoop02,hadoop03")
  47. sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort",
  48. "2181")
  49. sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"webtable")
  50. val job = new Job(sc.hadoopConfiguration)
  51. job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  52. job.setOutputValueClass(classOf[Result])
  53. job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
  54. val rdd=sc.parallelize(List(logBean))
  55. val hbaseRDD=rdd.map{bean=>
  56. //本项目的行键设计为:sstime_uvid_ssid_cip_随机数字
  57. //行键以时间戳开头,作用:让HBase按时间戳做升序排序,便于后续按时间段范围查询
  58. //行键中还包含uvid,ssid,cip信息。便于统计处相关业务指标,比如uv,vv等
  59. //随机数字满足散列原则
  60. val rowKey=bean.sstime+"_"+bean.uvid+"_"+bean.ssid+"_"+bean.cip+"_"+Random.nextInt(100)
  61. //创建一个HBase行对象并指定行键
  62. val row=new Put(rowKey.getBytes)
  63. row.addColumn("cf1".getBytes,"url".getBytes,bean.url.getBytes)
  64. row.addColumn("cf1".getBytes,"urlname".getBytes,bean.urlname.getBytes)
  65. row.addColumn("cf1".getBytes,"uvid".getBytes,bean.uvid.getBytes)
  66. row.addColumn("cf1".getBytes,"ssid".getBytes,bean.ssid.getBytes)
  67. row.addColumn("cf1".getBytes,"sscount".getBytes,bean.sscount.getBytes)
  68. row.addColumn("cf1".getBytes,"sstime".getBytes,bean.sstime.getBytes)
  69. row.addColumn("cf1".getBytes,"cip".getBytes,bean.cip.getBytes)
  70. (new ImmutableBytesWritable,row)
  71. }
  72. //执行写出
  73. hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
  74. }
  75. }

dao-MysqlUtil

  1. package cn.yang.dao
  2. import cn.yang.TongjiBean
  3. import com.mchange.v2.c3p0.ComboPooledDataSource
  4. import java.sql.{Connection, PreparedStatement, ResultSet}
  5. import java.text.SimpleDateFormat
  6. object MysqlUtil {
  7. val c3p0=new ComboPooledDataSource()
  8. def saveToMysql(tongjiBean: TongjiBean) = {
  9. var conn:Connection=null
  10. var ps1:PreparedStatement=null
  11. var rs:ResultSet=null
  12. var ps2:PreparedStatement=null
  13. var ps3:PreparedStatement=null
  14. try{
  15. conn=c3p0.getConnection
  16. /*
  17. 处理逻辑:
  18. 1、查询mysql的tongji2表当天的数据
  19. 2、如果当天还没有数据,则做新增插入
  20. 如果当天已有数据,则作更新累加
  21. */
  22. //解析出当天的日期。格式如:2022-04-24
  23. val sdf=new SimpleDateFormat("YYYY-MM-dd")
  24. val todayTime=sdf.format(tongjiBean.sstime.toLong)
  25. //先查询tongji2表。如果当天已经有数据了,则更新累加
  26. //如果当天还没有数据,则新增插入
  27. ps1=conn.prepareStatement("select * from tongji2 where reporttime=?")
  28. ps1.setString(1,todayTime)
  29. //执行查询并返回结果集
  30. rs=ps1.executeQuery()
  31. if(rs.next()){
  32. //当天已经有数据,则做更新累积
  33. ps2=conn.prepareStatement(
  34. "update tongji2 set pv=pv+?,uv=uv+?,vv=vv+?,newip=newip+?,newcust=newcust+? where reporttime=?")
  35. ps2.setInt(1, tongjiBean.pv)
  36. ps2.setInt(2, tongjiBean.uv)
  37. ps2.setInt(3, tongjiBean.vv)
  38. ps2.setInt(4, tongjiBean.newip)
  39. ps2.setInt(5, tongjiBean.newcust)
  40. ps2.setString(6, todayTime)
  41. ps2.executeUpdate()
  42. }else{
  43. //则表示当天还没有数据,则新增插入
  44. ps3=conn.prepareStatement("insert into tongji2 values(?,?,?,?,?,?)")
  45. ps3.setString(1, todayTime)
  46. ps3.setInt(2, tongjiBean.pv)
  47. ps3.setInt(3, tongjiBean.uv)
  48. ps3.setInt(4, tongjiBean.vv)
  49. ps3.setInt(5, tongjiBean.newip)
  50. ps3.setInt(6, tongjiBean.newcust)
  51. ps3.executeUpdate()
  52. }
  53. }catch {
  54. case t:Exception=>{
  55. t.printStackTrace()
  56. }
  57. }finally {
  58. if(ps3!=null)ps3.close
  59. if(ps2!=null)ps2.close
  60. if(rs!=null)rs.close
  61. if(ps1!=null)ps1.close
  62. if(conn!=null)conn.close
  63. }
  64. }
  65. }

pojo-LogBean

  1. package cn.yang.pojo
  2. case class LogBean(url:String,
  3. urlname:String,
  4. uvid:String,
  5. ssid:String,
  6. sscount:String,
  7. sstime:String,
  8. cip:String)

TongjiBean

  1. package cn.yang
  2. case class TongjiBean(sstime:String,
  3. pv:Int,
  4. uv:Int,
  5. vv:Int,
  6. newip:Int,
  7. newcust:Int)

③、启动SparkStreaming

④、启动tomcat,访问埋点服务器,测试SparkStreaming是否能够收到数据

⑤、启动HBase

cd /home/software/hbase-2.4.2/bin/

sh start-hbase.sh

sh hbase shell

建表:create 'webtable','cf1'

 ⑥、我们启动tomcat,启动Driver测试,然后扫描表webtable

发现有数据了:

⑦、进入mysql,在weblog库下新建表

 create table tongji2(reporttime date,pv int,uv int,vv int,newip int,newcust int);

⑧、执行我们的程序,访问埋点,最后到mysql查看数据,这也是我们本项目实现的最终结果,结果内容存储到mysql数据库中

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/650732
推荐阅读
相关标签
  

闽ICP备14008679号