当前位置:   article > 正文

flume+mr+hive+sqoop+mysql点击流 流程_hdfs+mr+hive+sqoop

hdfs+mr+hive+sqoop

Flume

用来实时监控指定路径,定时将数据写入hdfs中。

 

Flume.conf样例

#定义agent名, source、channel、sink的名称

f1.sources = r1

f1.channels = c1

f1.sinks = k1

 

#具体定义source

f1.sources.r1.type = spooldir

f1.sources.r1.spoolDir = /test_dir/aaa

 

#具体定义channel

f1.channels.c1.type = memory

f1.channels.c1.capacity = 10000

f1.channels.c1.transactionCapacity = 1000

 

#定义拦截器,为消息添加时间戳

f1.sources.r1.interceptors = i1

f1.sources.r1.interceptors.i1.type = host

#使用拦截器插件获取agent所在服务器的主机名,咱这边没有用到

#f1.sources.r1.interceptors.i1.hostHeader = hostname

f1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

 

 

#具体定义sink

f1.sinks.k1.type = hdfs

f1.sinks.k1.hdfs.path = hdfs://192.168.8.101:9000/flume/events/%Y%m%d/

f1.sinks.k1.hdfs.filePrefix = events-

f1.sinks.k1.hdfs.fileType = DataStream

#指定hdfs备份数量,避免多次滚动产生小文件

f1.sinks.k1.hdfs.minBlockReplicas=1

#不按照条数生成文件

f1.sinks.k1.hdfs.rollCount = 0

#写入hdfs文件格式

f1.sinks.k1.hdfs.writeFormat=Text

#HDFS上的文件达到1M时生成一个文件

f1.sinks.k1.hdfs.rollSize = 104857600

#HDFS上的文件达到60秒生成一个文件   

f1.sinks.k1.hdfs.rollInterval = 0

#组装source、channel、sink

f1.sources.r1.channels = c1

f1.sinks.k1.channel = c1

flume执行命令如下:

flume-ng agent -n f1 --conf /opt/flume-1.7.0/conf/ -f /test_dir/local2hdfs.conf -Dflume.root.logger=INFO,console

f1为agent名字

Mapreduce

首先,将写好的mr程序打包上传到虚拟机上,

依次执行数据清洗,创建会话,生成pv数据以及Visits数据

 

 hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.one.logClean

 

 hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.two.logSession

 

hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.three.PageViews

 

hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.four.VisitsInfo

Hive

注意:这边只对pageviews表做入库分析操作。visits表是相同的操作操作自己完成。

//创建数据库

create database web_click;

//使用数据库

use web_click;

//创建pageviews非分区表,目的是加载mr数据

create table pageviews(session string,ip string,requestdate string,requesttime string,visitpage string, staytime string,step string) row format delimited fields terminated by " ";

 

//加载数据

load data inpath "hdfs://hadoop:9000/clickstream/pageviews/20181013/*" overwrite into table pageviews;

 

Insert into table ods_pageviews partition(inputDate=”2016-09-18”) select ... from pageviews

 

//创建ods_pageviews分区表,插入每天的数据

create table ods_pageviews(session string,ip string,viewtime string,visitpage string, staytime string,step string) partitioned by(inputDate string) row format delimited fields terminated by " ";

 

insert overwrite table ods_pageviews partition(inputDate="2016-09-18") select session,ip,concat(requestdate,"-",requesttime),visitpage,staytime,step from pageviews where requestdate="2016-09-18";

 

insert overwrite table ods_pageviews partition(inputDate="2016-09-19") select session,ip,concat(requestdate,"-",requesttime),visitpage,staytime,step from pageviews where requestdate="2016-09-19";

ods_pageviews分区 如图:

//创建时间维度表ods_dim_pageviews_time   时间作为分区字段

create table ods_dim_pageviews_time(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) row format delimited fields terminated by ' ';

 

//插入指定每天的时间

insert overwrite table ods_dim_pageviews_time partition(inputDate='2016-09-18') select distinct pv.viewtime, substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv where inputDate="2016-09-18";

 

insert overwrite table ods_dim_pageviews_time partition(inputDate='2016-09-19') select distinct pv.viewtime, substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv where inputDate="2016-09-19";

样例数据:

//创建url维度表ods_dim_pageviews_url   时间作为分区字段

create table ods_dim_pageviews_url(visitpage string,host string,path string,query string) partitioned by(inputDate string) row format delimited fields terminated by ' ';

 

insert overwrite table ods_dim_pageviews_url partition(inputDate='2016-09-18') select distinct pv.visitpage,b.host,b.path,b.query from (select * from ods_pageviews where inputDate='2016-09-18') pv lateral view parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;

 

insert overwrite table ods_dim_pageviews_url partition(inputDate='2016-09-19') select distinct pv.visitpage,b.host,b.path,b.query from (select * from ods_pageviews where inputDate='2016-09-19') pv lateral view parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;

 

表中数据:

//查询每天PV总数前20的页面

create table visitpageTop as select op.visitpage as path,count(*) as num from ods_pageviews as op join ods_dim_pageviews_url as opurl on (op.visitpage = opurl.visitpage) join ods_dim_pageviews_time as optime on (optime.time = op.viewtime) where optime.year='2016' and optime.month='09' and optime.day='19' group by op.visitpage sort by num desc limit 20;

 

Sqoop

通过sqoop将数据导入MySQL中,方便后去web展示

条件:需要MySQL提前存在该表

//创建MySQL数据库

create database web_click;

use web_click;

//创建MySQL表,字段与导入表的字段和类型一致

create table visitpageTop(path varchar(100),num bigint(20));

 

//通过sqoop导入

sqoop export --connect "jdbc:mysql://192.168.8.101:3306/web_click?useUnicode=true&characterEncoding=utf-8" --username root --password 123456 --table visitpageTop --fields-terminated-by "\001" --export-dir /user/hive/warehouse/web_click.db/visitpagetop

 

导入Mysql后的数据展示:

Shell脚本

为了方便自动化运行,将以上写入到shell脚本中。

此脚本为样例脚本,并不能每天跑一次,因为内部时间已经固定

  1. #!/bin/bash
  2. echo "开始执行mr代码:"
  3. hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.one.logClean
  4. echo "数据清洗完成..."
  5. hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.two.logSession
  6. echo "创建会话完成..."
  7. hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.three.PageViews
  8. echo "PageViews数据生成..."
  9. hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.four.VisitsInfo
  10. echo "VisitsInfo数据生成..."
  11. echo "开始出入数据到hive表..."
  12. /opt/spark-2.2.1-bin-hadoop2.6/bin/spark-sql --master spark://hadoop:7077 -f /test_dir/pageviews01.sql
  13. echo "数据已插入pageviews。ods_pageviews,ods_dim_pageviews_time中..."
  14. hive -f /test_dir/pageviews02.sql
  15. echo "数据已插入url维度表ods_dim_pageviews_url..."
  16. /opt/spark-2.2.1-bin-hadoop2.6/bin/spark-sql --master spark://hadoop:7077 -f /test_dir/pageviews03.sql
  17. echo "每天PV总数前20的页面已生成..."
  18. echo "sqoop 将数据导入mysql中..."
  19. sqoop export --connect "jdbc:mysql://192.168.8.101:3306/web_click?useUnicode=true&characterEncoding=utf-8" --username root --password 123456 --table visitpag
  20. eTop --fields-terminated-by "\001" --export-dir /user/hive/warehouse/web_click.db/visitpagetop
  21. echo "导入成功,流程结束..."

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/634266
推荐阅读
相关标签
  

闽ICP备14008679号