赞
踩
MapReduce
Hive
SparkSQL
SparkCore
azkaban/crontab
Hive + HBase(SQL)
HBase + Phoenix
Sqoop 导入到 MySQL 或是HBase
结合三大框架: Spring + Sturts2 + MyBatis ==> SSM
Echarts工具
创建数据库代码
create external table if not exists weblog (Valid boolean, Remote_addr String, Remote_user String, Time_local String, Request String, Status String, Body_bytes_sent String, Http_referer String, Http_user_agent String) row format delimited fields terminated by '\001' location 'hdfs://192.168.227.166:9000/weblog/output'; create external table if not exists pageviews (Session String,key String, Remote_addr String, Timestr String, Tequest String, Step int, Staylong int, Referal String, Useragent String, Bytes_send String, Status String) row format delimited fields terminated by '\001' location 'hdfs://192.168.227.166:9000/weblog/pageviews'; create external table if not exists visits (Session String, Remote_addr String, InTime String, OutTime String, InPage String, OutPage String, Referal String, PageVisits int) row format delimited fields terminated by '\001' location 'hdfs://192.168.227.166:9000/weblog/visitout'; create table if not exists weblog (Valid Boolean, Remote_addr Text, Remote_user Text, Time_local Text, Request Text, Status Text, Body_bytes_sent Text, Http_referer Text, Http_user_agent Text); create table if not exists pageviews (Session Text, key Text, Remote_addr Text, Timestr Text, Tequest Text, Step int, Staylong int, Referal Text, Useragent Text, Bytes_send Text, Status Text); create table if not exists visits (Session Text, Remote_addr Text, InTime Text, OutTime Text, InPage Text, OutPage Text, Referal Text, PageVisits int); // 导出hive数据到MySQL: sqoop export \ --connect jdbc:mysql://192.168.227.166:3306/biyesheji \ --username root \ --password 123456 \ --table pageviews \ --export-dir /weblog/pageviews/part-r-00000 \ --input-fields-terminated-by '\001' 列出MySQL中的某个数据库有哪些数据表: sqoop list-tables \ --connect jdbc:mysql://192.168.227.166:3306/biyesheji \ --username root \ --password 123456
数据ETL数仓操作代码
package com.mazh.aura.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.Locale; import java.util.UUID; import com.mazh.aura.mrbean.WebLogBean; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 将清洗之后的日志梳理出点击流pageviews模型数据 * * 输入数据是清洗过后的结果数据 * * 区分出每一次会话,给每一次visit(session)增加了session-id(随机uuid) * 梳理出每一次会话中所访问的每个页面(请求时间,url,停留时长,以及该页面在这次session中的序号) * 保留referral_url,body_bytes_send,useragent */ public class ClickStreamPageView { static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> { Text k = new Text(); WebLogBean v = new WebLogBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\001"); if (fields.length < 9) return; //将切分出来的各字段set到weblogbean中 v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]); //只有有效记录才进入后续处理 if (v.isValid()) { k.set(v.getRemote_addr()); /** * k : IP地址 * v : 整个一行转换成的一个 WebLogBean对象 */ context.write(k, v); } } } static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> { Text v = new Text(); @Override protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException { ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>(); // 先将一个用户的所有访问记录中的时间拿出来排序 try { for (WebLogBean bean : values) { WebLogBean webLogBean = new WebLogBean(); try { BeanUtils.copyProperties(webLogBean, bean); } catch(Exception e) { e.printStackTrace(); } beans.add(webLogBean); } //将bean按时间先后顺序排序 Collections.sort(beans, new Comparator<WebLogBean>() { @Override public int compare(WebLogBean o1, WebLogBean o2) { try { Date d1 = toDate(o1.getTime_local()); Date d2 = toDate(o2.getTime_local()); if (d1 == null || d2 == null) return 0; return d1.compareTo(d2); } catch (Exception e) { e.printStackTrace(); return 0; } } }); /** * 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step * 核心思想: * 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session * 否则,就属于不同的session */ int step = 1; String session = UUID.randomUUID().toString(); for (int i = 0; i < beans.size(); i++) { WebLogBean bean = beans.get(i); // 如果仅有1条数据,则直接输出 if (1 == beans.size()) { // 设置默认停留市场为60s v.set( session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + 60 + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); context.write(NullWritable.get(), v); session = UUID.randomUUID().toString(); break; } // 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出 if (i == 0) { continue; } // 求近两次时间差 long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local())); // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息 if (timeDiff < 30 * 60 * 1000) { v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001" + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus()); context.write(NullWritable.get(), v); step++; } else { // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001" + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus()); context.write(NullWritable.get(), v); // 输出完上一条之后,重置step编号 step = 1; session = UUID.randomUUID().toString(); } // 如果此次遍历的是最后一条,则将本条直接输出 if (i == beans.size() - 1) { // 设置默认停留市场为60s v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); context.write(NullWritable.get(), v); } } } catch (ParseException e) { e.printStackTrace(); } } private String toStr(Date date) { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.format(date); } private Date toDate(String timeStr) throws ParseException { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.parse(timeStr); } private long timeDiff(String time1, String time2) throws ParseException { Date d1 = toDate(time1); Date d2 = toDate(time2); return d1.getTime() - d2.getTime(); } private long timeDiff(Date time1, Date time2) throws ParseException { return time1.getTime() - time2.getTime(); } } public static void main(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME", "hadoop"); Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ClickStreamPageView.class); job.setMapperClass(ClickStreamMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(WebLogBean.class); job.setReducerClass(ClickStreamReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("/weblog/preout")); FileOutputFormat.setOutputPath(job, new Path("/weblog/pageviews")); boolean isDone = job.waitForCompletion(true); System.exit(isDone ? 0 : 1); } }
设计并开发一个网站用户行为分析系统。主要功能包括:
数据采集系统:主要负责各种用户信息数据的收集工作。
原始数据存储系统:主要负责把收集出来打大量数据储存到可以做分析的系统平台上。
原始数据清洗系统:主要负责原始数据的格式化清洗和查询出自己业务需要的数据格式。
可用数据存储模块:主要负责把清除出来的需要用的数据放到数据仓库中使之可以用hql进行查询分析,然后得出每个业务需要的不同数据指标。
数据分析展示系统:主要负责把分析出来的数据指标展示出来给人们分析这个网站的友好度,和他的完成度或者找出网站中出现了什么问题,来分析用户行为的特征做出不同的策略。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。