赞
踩
用来实时监控指定路径,定时将数据写入hdfs中。
Flume.conf样例
#定义agent名, source、channel、sink的名称 f1.sources = r1 f1.channels = c1 f1.sinks = k1
#具体定义source f1.sources.r1.type = spooldir f1.sources.r1.spoolDir = /test_dir/aaa
#具体定义channel f1.channels.c1.type = memory f1.channels.c1.capacity = 10000 f1.channels.c1.transactionCapacity = 1000
#定义拦截器,为消息添加时间戳 f1.sources.r1.interceptors = i1 f1.sources.r1.interceptors.i1.type = host #使用拦截器插件获取agent所在服务器的主机名,咱这边没有用到 #f1.sources.r1.interceptors.i1.hostHeader = hostname f1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具体定义sink f1.sinks.k1.type = hdfs f1.sinks.k1.hdfs.path = hdfs://192.168.8.101:9000/flume/events/%Y%m%d/ f1.sinks.k1.hdfs.filePrefix = events- f1.sinks.k1.hdfs.fileType = DataStream #指定hdfs备份数量,避免多次滚动产生小文件 f1.sinks.k1.hdfs.minBlockReplicas=1 #不按照条数生成文件 f1.sinks.k1.hdfs.rollCount = 0 #写入hdfs文件格式 f1.sinks.k1.hdfs.writeFormat=Text #HDFS上的文件达到1M时生成一个文件 f1.sinks.k1.hdfs.rollSize = 104857600 #HDFS上的文件达到60秒生成一个文件 f1.sinks.k1.hdfs.rollInterval = 0 #组装source、channel、sink f1.sources.r1.channels = c1 f1.sinks.k1.channel = c1 |
flume执行命令如下:
flume-ng agent -n f1 --conf /opt/flume-1.7.0/conf/ -f /test_dir/local2hdfs.conf -Dflume.root.logger=INFO,console
f1为agent名字
首先,将写好的mr程序打包上传到虚拟机上,
依次执行数据清洗,创建会话,生成pv数据以及Visits数据
hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.one.logClean
hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.two.logSession
hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.three.PageViews
hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.four.VisitsInfo
注意:这边只对pageviews表做入库分析操作。visits表是相同的操作操作自己完成。
//创建数据库 create database web_click; //使用数据库 use web_click; //创建pageviews非分区表,目的是加载mr数据 create table pageviews(session string,ip string,requestdate string,requesttime string,visitpage string, staytime string,step string) row format delimited fields terminated by " ";
//加载数据 load data inpath "hdfs://hadoop:9000/clickstream/pageviews/20181013/*" overwrite into table pageviews;
Insert into table ods_pageviews partition(inputDate=”2016-09-18”) select ... from pageviews
//创建ods_pageviews分区表,插入每天的数据 create table ods_pageviews(session string,ip string,viewtime string,visitpage string, staytime string,step string) partitioned by(inputDate string) row format delimited fields terminated by " ";
insert overwrite table ods_pageviews partition(inputDate="2016-09-18") select session,ip,concat(requestdate,"-",requesttime),visitpage,staytime,step from pageviews where requestdate="2016-09-18";
insert overwrite table ods_pageviews partition(inputDate="2016-09-19") select session,ip,concat(requestdate,"-",requesttime),visitpage,staytime,step from pageviews where requestdate="2016-09-19"; |
ods_pageviews分区 如图:
//创建时间维度表ods_dim_pageviews_time 时间作为分区字段 create table ods_dim_pageviews_time(time string,year string,month string,day string,hour string,minutes string,seconds string) partitioned by(inputDate String) row format delimited fields terminated by ' ';
//插入指定每天的时间 insert overwrite table ods_dim_pageviews_time partition(inputDate='2016-09-18') select distinct pv.viewtime, substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv where inputDate="2016-09-18";
insert overwrite table ods_dim_pageviews_time partition(inputDate='2016-09-19') select distinct pv.viewtime, substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv where inputDate="2016-09-19"; |
样例数据:
//创建url维度表ods_dim_pageviews_url 时间作为分区字段 create table ods_dim_pageviews_url(visitpage string,host string,path string,query string) partitioned by(inputDate string) row format delimited fields terminated by ' ';
insert overwrite table ods_dim_pageviews_url partition(inputDate='2016-09-18') select distinct pv.visitpage,b.host,b.path,b.query from (select * from ods_pageviews where inputDate='2016-09-18') pv lateral view parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;
insert overwrite table ods_dim_pageviews_url partition(inputDate='2016-09-19') select distinct pv.visitpage,b.host,b.path,b.query from (select * from ods_pageviews where inputDate='2016-09-19') pv lateral view parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;
|
表中数据:
//查询每天PV总数前20的页面
|
通过sqoop将数据导入MySQL中,方便后去web展示
条件:需要MySQL提前存在该表
//创建MySQL数据库 create database web_click; use web_click; //创建MySQL表,字段与导入表的字段和类型一致 create table visitpageTop(path varchar(100),num bigint(20));
//通过sqoop导入
|
导入Mysql后的数据展示:
为了方便自动化运行,将以上写入到shell脚本中。
此脚本为样例脚本,并不能每天跑一次,因为内部时间已经固定
- #!/bin/bash
-
- echo "开始执行mr代码:"
-
- hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.one.logClean
-
- echo "数据清洗完成..."
-
- hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.two.logSession
-
- echo "创建会话完成..."
-
- hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.three.PageViews
-
- echo "PageViews数据生成..."
-
- hadoop jar /mr-1.0-SNAPSHOT.jar cn.mapreduce.four.VisitsInfo
-
- echo "VisitsInfo数据生成..."
-
- echo "开始出入数据到hive表..."
-
- /opt/spark-2.2.1-bin-hadoop2.6/bin/spark-sql --master spark://hadoop:7077 -f /test_dir/pageviews01.sql
-
- echo "数据已插入pageviews。ods_pageviews,ods_dim_pageviews_time中..."
-
- hive -f /test_dir/pageviews02.sql
-
- echo "数据已插入url维度表ods_dim_pageviews_url..."
-
- /opt/spark-2.2.1-bin-hadoop2.6/bin/spark-sql --master spark://hadoop:7077 -f /test_dir/pageviews03.sql
-
- echo "每天PV总数前20的页面已生成..."
-
-
-
- echo "sqoop 将数据导入mysql中..."
-
- sqoop export --connect "jdbc:mysql://192.168.8.101:3306/web_click?useUnicode=true&characterEncoding=utf-8" --username root --password 123456 --table visitpag
-
- eTop --fields-terminated-by "\001" --export-dir /user/hive/warehouse/web_click.db/visitpagetop
-
-
-
- echo "导入成功,流程结束..."
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。