赞
踩
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
package cn.xxx.bigdata.weblog.mrbean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 原始数据对象-nginx日志 * * * @author vander * @date 2019年5月31日 */ public class WebLogBean implements Writable { private boolean valid = true;// 判断数据是否合法 private String remote_addr;// 记录客户端的ip地址 private String remote_user;// 记录客户端用户名称,忽略属性"-" private String time_local;// 记录访问时间与时区 private String request;// 记录请求的url与http协议 private String status;// 记录请求状态;成功是200 private String body_bytes_sent;// 记录发送给客户端文件主体内容大小 private String http_referer;// 用来记录从那个页面链接访问过来的 private String http_user_agent;// 记录客户浏览器的相关信息 public void set(boolean valid, String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) { this.valid = valid; this.remote_addr = remote_addr; this.remote_user = remote_user; this.time_local = time_local; this.request = request; this.status = status; this.body_bytes_sent = body_bytes_sent; this.http_referer = http_referer; this.http_user_agent = http_user_agent; } public String getRemote_addr() { return this.remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getRemote_user() { return this.remote_user; } public void setRemote_user(String remote_user) { this.remote_user = remote_user; } public String getTime_local() { return this.time_local; } public void setTime_local(String time_local) { this.time_local = time_local; } public String getRequest() { return this.request; } public void setRequest(String request) { this.request = request; } public String getStatus() { return this.status; } public void setStatus(String status) { this.status = status; } public String getBody_bytes_sent() { return this.body_bytes_sent; } public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; } public String getHttp_referer() { return this.http_referer; } public void setHttp_referer(String http_referer) { this.http_referer = http_referer; } public String getHttp_user_agent() { return this.http_user_agent; } public void setHttp_user_agent(String http_user_agent) { this.http_user_agent = http_user_agent; } public boolean isValid() { return this.valid; } public void setValid(boolean valid) { this.valid = valid; } public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(getRemote_addr()); sb.append("\001").append(getRemote_user()); sb.append("\001").append(getTime_local()); sb.append("\001").append(getRequest()); sb.append("\001").append(getStatus()); sb.append("\001").append(getBody_bytes_sent()); sb.append("\001").append(getHttp_referer()); sb.append("\001").append(getHttp_user_agent()); return sb.toString(); } public void readFields(DataInput in) throws IOException { this.valid = in.readBoolean(); this.remote_addr = in.readUTF(); this.remote_user = in.readUTF(); this.time_local = in.readUTF(); this.request = in.readUTF(); this.status = in.readUTF(); this.body_bytes_sent = in.readUTF(); this.http_referer = in.readUTF(); this.http_user_agent = in.readUTF(); } public void write(DataOutput out) throws IOException { out.writeBoolean(this.valid); out.writeUTF(null == this.remote_addr ? "" : this.remote_addr); out.writeUTF(null == this.remote_user ? "" : this.remote_user); out.writeUTF(null == this.time_local ? "" : this.time_local); out.writeUTF(null == this.request ? "" : this.request); out.writeUTF(null == this.status ? "" : this.status); out.writeUTF(null == this.body_bytes_sent ? "" : this.body_bytes_sent); out.writeUTF(null == this.http_referer ? "" : this.http_referer); out.writeUTF(null == this.http_user_agent ? "" : this.http_user_agent); } } package cn.xxx.bigdata.weblog.mrbean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 分析数据对象-页面访问信息 * * * @author vander * @date 2019年5月31日 */ public class PageViewsBean implements Writable { private String session;//session id private String remote_addr;//来访的ip地址 private String timestr;//请求时间 private String request;//请求url private int step;// private String staylong; private String referal;//来源url private String useragent;//客户终端标识 private String bytes_send; private String status;//响应码 public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) { this.session = session; this.remote_addr = remote_addr; this.useragent = useragent; this.timestr = timestr; this.request = request; this.step = step; this.staylong = staylong; this.referal = referal; this.bytes_send = bytes_send; this.status = status; } public String getSession() { return this.session; } public void setSession(String session) { this.session = session; } public String getRemote_addr() { return this.remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getTimestr() { return this.timestr; } public void setTimestr(String timestr) { this.timestr = timestr; } public String getRequest() { return this.request; } public void setRequest(String request) { this.request = request; } public int getStep() { return this.step; } public void setStep(int step) { this.step = step; } public String getStaylong() { return this.staylong; } public void setStaylong(String staylong) { this.staylong = staylong; } public String getReferal() { return this.referal; } public void setReferal(String referal) { this.referal = referal; } public String getUseragent() { return this.useragent; } public void setUseragent(String useragent) { this.useragent = useragent; } public String getBytes_send() { return this.bytes_send; } public void setBytes_send(String bytes_send) { this.bytes_send = bytes_send; } public String getStatus() { return this.status; } public void setStatus(String status) { this.status = status; } public void readFields(DataInput in) throws IOException { this.session = in.readUTF(); this.remote_addr = in.readUTF(); this.timestr = in.readUTF(); this.request = in.readUTF(); this.step = in.readInt(); this.staylong = in.readUTF(); this.referal = in.readUTF(); this.useragent = in.readUTF(); this.bytes_send = in.readUTF(); this.status = in.readUTF(); } public void write(DataOutput out) throws IOException { out.writeUTF(this.session); out.writeUTF(this.remote_addr); out.writeUTF(this.timestr); out.writeUTF(this.request); out.writeInt(this.step); out.writeUTF(this.staylong); out.writeUTF(this.referal); out.writeUTF(this.useragent); out.writeUTF(this.bytes_send); out.writeUTF(this.status); } } package cn.xxx.bigdata.weblog.mrbean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 分析数据对象-来访数据 * * * @author vander * @date 2019年5月31日 */ public class VisitBean implements Writable { private String session;// session id private String remote_addr;// 来访的ip地址 private String inTime;// 开始访问时间 private String outTime;// 结束访问时间 private String inPage;// 开发访问的url private String outPage;// 结束访问的url private String referal;// 来源url private int pageVisits;// 访问页面数量 public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) { this.session = session; this.remote_addr = remote_addr; this.inTime = inTime; this.outTime = outTime; this.inPage = inPage; this.outPage = outPage; this.referal = referal; this.pageVisits = pageVisits; } public String getSession() { return this.session; } public void setSession(String session) { this.session = session; } public String getRemote_addr() { return this.remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getInTime() { return this.inTime; } public void setInTime(String inTime) { this.inTime = inTime; } public String getOutTime() { return this.outTime; } public void setOutTime(String outTime) { this.outTime = outTime; } public String getInPage() { return this.inPage; } public void setInPage(String inPage) { this.inPage = inPage; } public String getOutPage() { return this.outPage; } public void setOutPage(String outPage) { this.outPage = outPage; } public String getReferal() { return this.referal; } public void setReferal(String referal) { this.referal = referal; } public int getPageVisits() { return this.pageVisits; } public void setPageVisits(int pageVisits) { this.pageVisits = pageVisits; } public void readFields(DataInput in) throws IOException { this.session = in.readUTF(); this.remote_addr = in.readUTF(); this.inTime = in.readUTF(); this.outTime = in.readUTF(); this.inPage = in.readUTF(); this.outPage = in.readUTF(); this.referal = in.readUTF(); this.pageVisits = in.readInt(); } public void write(DataOutput out) throws IOException { out.writeUTF(this.session); out.writeUTF(this.remote_addr); out.writeUTF(this.inTime); out.writeUTF(this.outTime); out.writeUTF(this.inPage); out.writeUTF(this.outPage); out.writeUTF(this.referal); out.writeInt(this.pageVisits); } public String toString() { return this.session + "\001" + this.remote_addr + "\001" + this.inTime + "\001" + this.outTime + "\001" + this.inPage + "\001" + this.outPage + "\001" + this.referal + "\001" + this.pageVisits; } }
package cn.xxx.bigdata.weblog.utils; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; public abstract class DateUtil { public static String getYestDate() { Calendar instance = Calendar.getInstance(); instance.add(5, -1); Date time = instance.getTime(); String format = new SimpleDateFormat("yyyy-MM-dd").format(time); return format; } }
package cn.xxx.bigdata.weblog.pre; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Locale; import java.util.Set; import cn.xxx.bigdata.weblog.mrbean.WebLogBean; /** * 原始数据清洗-获得WebLogBean * * * @author vander * @date 2019年5月31日 */ public class WebLogParser { public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US); public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); public static WebLogBean parser(String line) { WebLogBean webLogBean = new WebLogBean(); String[] arr = line.split(" "); if (arr.length > 11) { webLogBean.setRemote_addr(arr[0]); webLogBean.setRemote_user(arr[1]); String time_local = formatDate(arr[3].substring(1)); if ((null == time_local) || ("".equals(time_local))) { time_local = "-invalid_time-"; } webLogBean.setTime_local(time_local); webLogBean.setRequest(arr[6]); webLogBean.setStatus(arr[8]); webLogBean.setBody_bytes_sent(arr[9]); webLogBean.setHttp_referer(arr[10]); if (arr.length > 12) { StringBuilder sb = new StringBuilder(); for (int i = 11; i < arr.length; i++) { sb.append(arr[i]); } webLogBean.setHttp_user_agent(sb.toString()); } else { webLogBean.setHttp_user_agent(arr[11]); } if (Integer.parseInt(webLogBean.getStatus()) >= 400) { webLogBean.setValid(false); } if ("-invalid_time-".equals(webLogBean.getTime_local())) { webLogBean.setValid(false); } } else { webLogBean = null; } return webLogBean; } public static void filtStaticResource(WebLogBean bean, Set<String> pages) { if (!pages.contains(bean.getRequest())) { bean.setValid(false); } } public synchronized static String formatDate(String time_local) { try { return df2.format(df1.parse(time_local)); } catch (ParseException e) { } return null; } } package cn.xxx.bigdata.weblog.pre; import java.io.IOException; import java.net.URI; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.xxx.bigdata.weblog.mrbean.WebLogBean; import cn.xxx.bigdata.weblog.utils.DateUtil; /** * * 步骤一: * 处理原始日志,过滤出真实pv请求 转换时间格式 对缺失字段填充默认值 对记录标记valid和invalid * * * @author vander * @date 2019年5月31日 */ public class WeblogPreProcess extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = super.getConf(); //TODO 可以通过args进行传参 String inputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/input"; String outputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/weblogPreOut"; FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:9000"), conf); if (fileSystem.exists(new Path(outputPath))) { fileSystem.delete(new Path(outputPath), true); } Job job = Job.getInstance(conf); job.setJarByClass(WeblogPreProcess.class); job.setMapperClass(WeblogPreProcessMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path[] { new Path(inputPath) }); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); return res ? 0 : 1; } static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> { //用来存储网站url分类数据 Set<String> pages = new HashSet(); Text k = new Text(); NullWritable v = NullWritable.get(); //从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤 protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { this.pages.add("/about"); this.pages.add("/black-ip-list/"); this.pages.add("/cassandra-clustor/"); this.pages.add("/finance-rhive-repurchase/"); this.pages.add("/hadoop-family-roadmap/"); this.pages.add("/hadoop-hive-intro/"); this.pages.add("/hadoop-zookeeper-intro/"); this.pages.add("/hadoop-mahout-roadmap/"); } protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); WebLogBean webLogBean = WebLogParser.parser(line); if (webLogBean != null) { // 过滤js/图片/css等静态资源 WebLogParser.filtStaticResource(webLogBean, this.pages); this.k.set(webLogBean.toString()); context.write(this.k, this.v); } } } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new WeblogPreProcess(), args); System.exit(run); } }
package cn.xxx.bigdata.weblog.clickstream; import java.io.IOException; import java.net.URI; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.Locale; import java.util.UUID; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.xxx.bigdata.weblog.mrbean.WebLogBean; import cn.xxx.bigdata.weblog.utils.DateUtil; /** * 步骤二:将清洗之后的日志梳理出点击流pageviews模型数据 * * * @author vander * @date 2019年5月31日 */ public class ClickStreamPageView extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = super.getConf(); //TODO 可以通过args进行传参 String inputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/weblogPreOut"; String outputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/pageViewOut"; FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:9000"), conf); if (fileSystem.exists(new Path(outputPath))) { fileSystem.delete(new Path(outputPath), true); } Job job = Job.getInstance(conf); job.setJarByClass(ClickStreamPageView.class); job.setMapperClass(ClickStreamMapper.class); job.setReducerClass(ClickStreamReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(WebLogBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path[] { new Path(inputPath) }); FileOutputFormat.setOutputPath(job, new Path(outputPath)); boolean b = job.waitForCompletion(true); return b ? 0 : 1; } static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> { Text k = new Text(); WebLogBean v = new WebLogBean(); protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, WebLogBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\001"); if (fields.length < 9) { return; } //将切分出来的各字段set到weblogbean中 this.v.set("true".equals(fields[0]), fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]); //只有有效记录才进入后续处理 if (this.v.isValid()) { //此处用ip地址来标识用户 this.k.set(this.v.getRemote_addr()); context.write(this.k, this.v); } } } static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> { Text v = new Text(); protected void reduce(Text key, Iterable<WebLogBean> values, Reducer<Text, WebLogBean, NullWritable, Text>.Context context) throws IOException, InterruptedException { ArrayList<WebLogBean> beans = new ArrayList(); // 先将一个用户的所有访问记录中的时间拿出来排序 try { for (WebLogBean bean : values) { WebLogBean webLogBean = new WebLogBean(); try { BeanUtils.copyProperties(webLogBean, bean); } catch (Exception e) { e.printStackTrace(); } beans.add(webLogBean); } //将bean按时间先后顺序排序 Collections.sort(beans, new Comparator<WebLogBean>() { public int compare(WebLogBean o1, WebLogBean o2) { try { Date d1 = ClickStreamPageView.ClickStreamReducer.this.toDate(o1.getTime_local()); Date d2 = ClickStreamPageView.ClickStreamReducer.this.toDate(o2.getTime_local()); if ((d1 == null) || (d2 == null)) { return 0; } return d1.compareTo(d2); } catch (Exception e) { e.printStackTrace(); } return 0; } }); /** * 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step * 核心思想: * 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session * 否则,就属于不同的session * */ int step = 1; String session = UUID.randomUUID().toString(); for (int i = 0; i < beans.size(); i++) { WebLogBean bean = (WebLogBean) beans.get(i); // 如果仅有1条数据,则直接输出 if (1 == beans.size()) { // 设置默认停留时长为60s this.v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + 60 + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); context.write(NullWritable.get(), this.v); session = UUID.randomUUID().toString(); break; } // 如果不止1条数据 if (i != 0) { // 求近两次时间差 long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(((WebLogBean) beans.get(i - 1)).getTime_local())); // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息 if (timeDiff < 1800000L) { this.v.set(session + "\001" + key.toString() + "\001" + ((WebLogBean) beans.get(i - 1)).getRemote_user() + "\001" + ((WebLogBean) beans.get(i - 1)).getTime_local() + "\001" + ((WebLogBean) beans.get(i - 1)).getRequest() + "\001" + step + "\001" + timeDiff / 1000L + "\001" + ((WebLogBean) beans.get(i - 1)).getHttp_referer() + "\001" + ((WebLogBean) beans.get(i - 1)).getHttp_user_agent() + "\001" + ((WebLogBean) beans.get(i - 1)).getBody_bytes_sent() + "\001" + ((WebLogBean) beans.get(i - 1)).getStatus()); context.write(NullWritable.get(), this.v); step++; } else { // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit this.v.set(session + "\001" + key.toString() + "\001" + ((WebLogBean) beans.get(i - 1)).getRemote_user() + "\001" + ((WebLogBean) beans.get(i - 1)).getTime_local() + "\001" + ((WebLogBean) beans.get(i - 1)).getRequest() + "\001" + step + "\001" + 60 + "\001" + ((WebLogBean) beans.get(i - 1)).getHttp_referer() + "\001" + ((WebLogBean) beans.get(i - 1)).getHttp_user_agent() + "\001" + ((WebLogBean) beans.get(i - 1)).getBody_bytes_sent() + "\001" + ((WebLogBean) beans.get(i - 1)).getStatus()); context.write(NullWritable.get(), this.v); // 输出完上一条之后,重置step编号 step = 1; session = UUID.randomUUID().toString(); } // 如果此次遍历的是最后一条,则将本条直接输出 if (i == beans.size() - 1) { // 设置默认停留市场为60s this.v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + 60 + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); context.write(NullWritable.get(), this.v); } } } } catch (ParseException e) { e.printStackTrace(); } } private String toStr(Date date) { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.format(date); } private Date toDate(String timeStr) throws ParseException { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.parse(timeStr); } private long timeDiff(String time1, String time2) throws ParseException { Date d1 = toDate(time1); Date d2 = toDate(time2); return d1.getTime() - d2.getTime(); } private long timeDiff(Date time1, Date time2) throws ParseException { return time1.getTime() - time2.getTime(); } } public static void main(String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new ClickStreamPageView(), args); System.exit(run); } } package cn.xxx.bigdata.weblog.clickstream; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.xxx.bigdata.weblog.mrbean.PageViewsBean; import cn.xxx.bigdata.weblog.mrbean.VisitBean; import cn.xxx.bigdata.weblog.utils.DateUtil; /** * 步骤三:输入数据:pageviews模型结果数据,从pageviews模型结果数据中进一步梳理出visit模型 * * * @author vander * @date 2019年5月31日 */ public class ClickStreamVisit extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = super.getConf(); //TODO 可以通过args进行传参 String inputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/pageViewOut"; String outPutPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/clickStreamVisit"; FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:9000"), conf); if (fileSystem.exists(new Path(outPutPath))) { fileSystem.delete(new Path(outPutPath), true); } Job job = Job.getInstance(conf); job.setJarByClass(ClickStreamVisit.class); job.setMapperClass(ClickStreamVisitMapper.class); job.setReducerClass(ClickStreamVisitReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PageViewsBean.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(VisitBean.class); FileInputFormat.setInputPaths(job, new Path[] { new Path(inputPath) }); FileOutputFormat.setOutputPath(job, new Path(outPutPath)); boolean res = job.waitForCompletion(true); return res ? 0 : 1; } static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> { PageViewsBean pvBean = new PageViewsBean(); Text k = new Text(); protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, PageViewsBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\001"); int step = Integer.parseInt(fields[5]); this.pvBean.set(fields[0], fields[1], fields[2], fields[3], fields[4], step, fields[6], fields[7], fields[8], fields[9]); this.k.set(this.pvBean.getSession()); context.write(this.k, this.pvBean); } } static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> { protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Reducer<Text, PageViewsBean, NullWritable, VisitBean>.Context context) throws IOException, InterruptedException { // 将pvBeans按照step排序 ArrayList<PageViewsBean> pvBeansList = new ArrayList(); for (PageViewsBean pvBean : pvBeans) { PageViewsBean bean = new PageViewsBean(); try { BeanUtils.copyProperties(bean, pvBean); pvBeansList.add(bean); } catch (Exception e) { e.printStackTrace(); } } Collections.sort(pvBeansList, new Comparator<PageViewsBean>() { public int compare(PageViewsBean o1, PageViewsBean o2) { return o1.getStep() > o2.getStep() ? 1 : -1; } }); // 取这次visit的首尾pageview记录,将数据放入VisitBean中 VisitBean visitBean = new VisitBean(); // 取visit的首记录 visitBean.setInPage(((PageViewsBean) pvBeansList.get(0)).getRequest()); visitBean.setInTime(((PageViewsBean) pvBeansList.get(0)).getTimestr()); // 取visit的尾记录 visitBean.setOutPage(((PageViewsBean) pvBeansList.get(pvBeansList.size() - 1)).getRequest()); visitBean.setOutTime(((PageViewsBean) pvBeansList.get(pvBeansList.size() - 1)).getTimestr()); // visit访问的页面数 visitBean.setPageVisits(pvBeansList.size()); // 来访者的ip visitBean.setRemote_addr(((PageViewsBean) pvBeansList.get(0)).getRemote_addr()); // 本次visit的referal visitBean.setReferal(((PageViewsBean) pvBeansList.get(0)).getReferal()); visitBean.setSession(session.toString()); context.write(NullWritable.get(), visitBean); } } public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new ClickStreamVisit(), args); } }
weblogparser.jar
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。