当前位置:   article > 正文

大数据_MR开发示例_mr大数据

mr大数据

引入依赖

	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-core</artifactId>
		<version>1.2.1</version>
	</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

封装对象

package cn.xxx.bigdata.weblog.mrbean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * 原始数据对象-nginx日志
 * 
 * 
 * @author vander
 * @date 2019年5月31日
 */
public class WebLogBean implements Writable {
	private boolean valid = true;// 判断数据是否合法
	private String remote_addr;// 记录客户端的ip地址
	private String remote_user;// 记录客户端用户名称,忽略属性"-"
	private String time_local;// 记录访问时间与时区
	private String request;// 记录请求的url与http协议
	private String status;// 记录请求状态;成功是200
	private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
	private String http_referer;// 用来记录从那个页面链接访问过来的
	private String http_user_agent;// 记录客户浏览器的相关信息

	public void set(boolean valid, String remote_addr, String remote_user, String time_local, String request,
			String status, String body_bytes_sent, String http_referer, String http_user_agent) {
		this.valid = valid;
		this.remote_addr = remote_addr;
		this.remote_user = remote_user;
		this.time_local = time_local;
		this.request = request;
		this.status = status;
		this.body_bytes_sent = body_bytes_sent;
		this.http_referer = http_referer;
		this.http_user_agent = http_user_agent;
	}

	public String getRemote_addr() {
		return this.remote_addr;
	}

	public void setRemote_addr(String remote_addr) {
		this.remote_addr = remote_addr;
	}

	public String getRemote_user() {
		return this.remote_user;
	}

	public void setRemote_user(String remote_user) {
		this.remote_user = remote_user;
	}

	public String getTime_local() {
		return this.time_local;
	}

	public void setTime_local(String time_local) {
		this.time_local = time_local;
	}

	public String getRequest() {
		return this.request;
	}

	public void setRequest(String request) {
		this.request = request;
	}

	public String getStatus() {
		return this.status;
	}

	public void setStatus(String status) {
		this.status = status;
	}

	public String getBody_bytes_sent() {
		return this.body_bytes_sent;
	}

	public void setBody_bytes_sent(String body_bytes_sent) {
		this.body_bytes_sent = body_bytes_sent;
	}

	public String getHttp_referer() {
		return this.http_referer;
	}

	public void setHttp_referer(String http_referer) {
		this.http_referer = http_referer;
	}

	public String getHttp_user_agent() {
		return this.http_user_agent;
	}

	public void setHttp_user_agent(String http_user_agent) {
		this.http_user_agent = http_user_agent;
	}

	public boolean isValid() {
		return this.valid;
	}

	public void setValid(boolean valid) {
		this.valid = valid;
	}

	public String toString() {
		StringBuilder sb = new StringBuilder();
		sb.append(this.valid);
		sb.append("\001").append(getRemote_addr());
		sb.append("\001").append(getRemote_user());
		sb.append("\001").append(getTime_local());
		sb.append("\001").append(getRequest());
		sb.append("\001").append(getStatus());
		sb.append("\001").append(getBody_bytes_sent());
		sb.append("\001").append(getHttp_referer());
		sb.append("\001").append(getHttp_user_agent());
		return sb.toString();
	}

	public void readFields(DataInput in) throws IOException {
		this.valid = in.readBoolean();
		this.remote_addr = in.readUTF();
		this.remote_user = in.readUTF();
		this.time_local = in.readUTF();
		this.request = in.readUTF();
		this.status = in.readUTF();
		this.body_bytes_sent = in.readUTF();
		this.http_referer = in.readUTF();
		this.http_user_agent = in.readUTF();
	}

	public void write(DataOutput out) throws IOException {
		out.writeBoolean(this.valid);
		out.writeUTF(null == this.remote_addr ? "" : this.remote_addr);
		out.writeUTF(null == this.remote_user ? "" : this.remote_user);
		out.writeUTF(null == this.time_local ? "" : this.time_local);
		out.writeUTF(null == this.request ? "" : this.request);
		out.writeUTF(null == this.status ? "" : this.status);
		out.writeUTF(null == this.body_bytes_sent ? "" : this.body_bytes_sent);
		out.writeUTF(null == this.http_referer ? "" : this.http_referer);
		out.writeUTF(null == this.http_user_agent ? "" : this.http_user_agent);
	}
}


package cn.xxx.bigdata.weblog.mrbean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * 分析数据对象-页面访问信息
 * 
 * 
 * @author vander
 * @date 2019年5月31日
 */
public class PageViewsBean implements Writable {
	private String session;//session id
	private String remote_addr;//来访的ip地址
	private String timestr;//请求时间
	private String request;//请求url
	private int step;//
	private String staylong;
	private String referal;//来源url
	private String useragent;//客户终端标识
	private String bytes_send;
	private String status;//响应码

	public void set(String session, String remote_addr, String useragent, String timestr, String request, int step,
			String staylong, String referal, String bytes_send, String status) {
		this.session = session;
		this.remote_addr = remote_addr;
		this.useragent = useragent;
		this.timestr = timestr;
		this.request = request;
		this.step = step;
		this.staylong = staylong;
		this.referal = referal;
		this.bytes_send = bytes_send;
		this.status = status;
	}

	public String getSession() {
		return this.session;
	}

	public void setSession(String session) {
		this.session = session;
	}

	public String getRemote_addr() {
		return this.remote_addr;
	}

	public void setRemote_addr(String remote_addr) {
		this.remote_addr = remote_addr;
	}

	public String getTimestr() {
		return this.timestr;
	}

	public void setTimestr(String timestr) {
		this.timestr = timestr;
	}

	public String getRequest() {
		return this.request;
	}

	public void setRequest(String request) {
		this.request = request;
	}

	public int getStep() {
		return this.step;
	}

	public void setStep(int step) {
		this.step = step;
	}

	public String getStaylong() {
		return this.staylong;
	}

	public void setStaylong(String staylong) {
		this.staylong = staylong;
	}

	public String getReferal() {
		return this.referal;
	}

	public void setReferal(String referal) {
		this.referal = referal;
	}

	public String getUseragent() {
		return this.useragent;
	}

	public void setUseragent(String useragent) {
		this.useragent = useragent;
	}

	public String getBytes_send() {
		return this.bytes_send;
	}

	public void setBytes_send(String bytes_send) {
		this.bytes_send = bytes_send;
	}

	public String getStatus() {
		return this.status;
	}

	public void setStatus(String status) {
		this.status = status;
	}

	public void readFields(DataInput in) throws IOException {
		this.session = in.readUTF();
		this.remote_addr = in.readUTF();
		this.timestr = in.readUTF();
		this.request = in.readUTF();
		this.step = in.readInt();
		this.staylong = in.readUTF();
		this.referal = in.readUTF();
		this.useragent = in.readUTF();
		this.bytes_send = in.readUTF();
		this.status = in.readUTF();
	}

	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.session);
		out.writeUTF(this.remote_addr);
		out.writeUTF(this.timestr);
		out.writeUTF(this.request);
		out.writeInt(this.step);
		out.writeUTF(this.staylong);
		out.writeUTF(this.referal);
		out.writeUTF(this.useragent);
		out.writeUTF(this.bytes_send);
		out.writeUTF(this.status);
	}
}


package cn.xxx.bigdata.weblog.mrbean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * 分析数据对象-来访数据
 * 
 * 
 * @author vander
 * @date 2019年5月31日
 */
public class VisitBean implements Writable {
	private String session;// session id
	private String remote_addr;// 来访的ip地址
	private String inTime;// 开始访问时间
	private String outTime;// 结束访问时间
	private String inPage;// 开发访问的url
	private String outPage;// 结束访问的url
	private String referal;// 来源url
	private int pageVisits;// 访问页面数量

	public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage,
			String referal, int pageVisits) {
		this.session = session;
		this.remote_addr = remote_addr;
		this.inTime = inTime;
		this.outTime = outTime;
		this.inPage = inPage;
		this.outPage = outPage;
		this.referal = referal;
		this.pageVisits = pageVisits;
	}

	public String getSession() {
		return this.session;
	}

	public void setSession(String session) {
		this.session = session;
	}

	public String getRemote_addr() {
		return this.remote_addr;
	}

	public void setRemote_addr(String remote_addr) {
		this.remote_addr = remote_addr;
	}

	public String getInTime() {
		return this.inTime;
	}

	public void setInTime(String inTime) {
		this.inTime = inTime;
	}

	public String getOutTime() {
		return this.outTime;
	}

	public void setOutTime(String outTime) {
		this.outTime = outTime;
	}

	public String getInPage() {
		return this.inPage;
	}

	public void setInPage(String inPage) {
		this.inPage = inPage;
	}

	public String getOutPage() {
		return this.outPage;
	}

	public void setOutPage(String outPage) {
		this.outPage = outPage;
	}

	public String getReferal() {
		return this.referal;
	}

	public void setReferal(String referal) {
		this.referal = referal;
	}

	public int getPageVisits() {
		return this.pageVisits;
	}

	public void setPageVisits(int pageVisits) {
		this.pageVisits = pageVisits;
	}

	public void readFields(DataInput in) throws IOException {
		this.session = in.readUTF();
		this.remote_addr = in.readUTF();
		this.inTime = in.readUTF();
		this.outTime = in.readUTF();
		this.inPage = in.readUTF();
		this.outPage = in.readUTF();
		this.referal = in.readUTF();
		this.pageVisits = in.readInt();
	}

	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.session);
		out.writeUTF(this.remote_addr);
		out.writeUTF(this.inTime);
		out.writeUTF(this.outTime);
		out.writeUTF(this.inPage);
		out.writeUTF(this.outPage);
		out.writeUTF(this.referal);
		out.writeInt(this.pageVisits);
	}

	public String toString() {
		return this.session + "\001" + this.remote_addr + "\001" + this.inTime + "\001" + this.outTime + "\001"
				+ this.inPage + "\001" + this.outPage + "\001" + this.referal + "\001" + this.pageVisits;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
  • 389
  • 390
  • 391
  • 392
  • 393
  • 394
  • 395
  • 396
  • 397
  • 398
  • 399
  • 400
  • 401
  • 402
  • 403
  • 404
  • 405
  • 406
  • 407
  • 408
  • 409
  • 410
  • 411
  • 412
  • 413
  • 414
  • 415
  • 416
  • 417
  • 418
  • 419
  • 420
  • 421
  • 422
  • 423
  • 424
  • 425

工具类

package cn.xxx.bigdata.weblog.utils;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public abstract class DateUtil {
	
	public static String getYestDate() {
		Calendar instance = Calendar.getInstance();
		instance.add(5, -1);
		Date time = instance.getTime();
		String format = new SimpleDateFormat("yyyy-MM-dd").format(time);
		return format;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

预处理

package cn.xxx.bigdata.weblog.pre;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;

import cn.xxx.bigdata.weblog.mrbean.WebLogBean;

/**
 * 原始数据清洗-获得WebLogBean
 * 
 * 
 * @author vander
 * @date 2019年5月31日
 */
public class WebLogParser {
	public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
	public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);

	public static WebLogBean parser(String line) {
		WebLogBean webLogBean = new WebLogBean();
		String[] arr = line.split(" ");
		if (arr.length > 11) {
			webLogBean.setRemote_addr(arr[0]);
			webLogBean.setRemote_user(arr[1]);
			String time_local = formatDate(arr[3].substring(1));
			if ((null == time_local) || ("".equals(time_local))) {
				time_local = "-invalid_time-";
			}
			webLogBean.setTime_local(time_local);
			webLogBean.setRequest(arr[6]);
			webLogBean.setStatus(arr[8]);
			webLogBean.setBody_bytes_sent(arr[9]);
			webLogBean.setHttp_referer(arr[10]);
			if (arr.length > 12) {
				StringBuilder sb = new StringBuilder();
				for (int i = 11; i < arr.length; i++) {
					sb.append(arr[i]);
				}
				webLogBean.setHttp_user_agent(sb.toString());
			} else {
				webLogBean.setHttp_user_agent(arr[11]);
			}
			if (Integer.parseInt(webLogBean.getStatus()) >= 400) {
				webLogBean.setValid(false);
			}
			if ("-invalid_time-".equals(webLogBean.getTime_local())) {
				webLogBean.setValid(false);
			}
		} else {
			webLogBean = null;
		}
		return webLogBean;
	}

	public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
		if (!pages.contains(bean.getRequest())) {
			bean.setValid(false);
		}
	}

	public synchronized static String formatDate(String time_local) {
		try {
			return df2.format(df1.parse(time_local));
		} catch (ParseException e) {
		}
		return null;
	}
}


package cn.xxx.bigdata.weblog.pre;

import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cn.xxx.bigdata.weblog.mrbean.WebLogBean;
import cn.xxx.bigdata.weblog.utils.DateUtil;
/**
 * 
 * 步骤一:
 * 处理原始日志,过滤出真实pv请求 转换时间格式 对缺失字段填充默认值 对记录标记valid和invalid
 * 
 * 
 * @author vander
 * @date 2019年5月31日
 */
public class WeblogPreProcess extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		Configuration conf = super.getConf();
		//TODO 可以通过args进行传参
		String inputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/input";
		String outputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/weblogPreOut";
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:9000"), conf);
		if (fileSystem.exists(new Path(outputPath))) {
			fileSystem.delete(new Path(outputPath), true);
		}
		Job job = Job.getInstance(conf);
		job.setJarByClass(WeblogPreProcess.class);
		job.setMapperClass(WeblogPreProcessMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.setInputPaths(job, new Path[] { new Path(inputPath) });
		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		job.setNumReduceTasks(0);
		boolean res = job.waitForCompletion(true);
		return res ? 0 : 1;
	}

	static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		//用来存储网站url分类数据
		Set<String> pages = new HashSet();
		Text k = new Text();
		NullWritable v = NullWritable.get();

		//从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤
		protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {
			this.pages.add("/about");
			this.pages.add("/black-ip-list/");
			this.pages.add("/cassandra-clustor/");
			this.pages.add("/finance-rhive-repurchase/");
			this.pages.add("/hadoop-family-roadmap/");
			this.pages.add("/hadoop-hive-intro/");
			this.pages.add("/hadoop-zookeeper-intro/");
			this.pages.add("/hadoop-mahout-roadmap/");
		}

		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			WebLogBean webLogBean = WebLogParser.parser(line);
			if (webLogBean != null) {
				// 过滤js/图片/css等静态资源
				WebLogParser.filtStaticResource(webLogBean, this.pages);

				this.k.set(webLogBean.toString());
				context.write(this.k, this.v);
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration configuration = new Configuration();
		int run = ToolRunner.run(configuration, new WeblogPreProcess(), args);
		System.exit(run);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167

分析获取模型数据

package cn.xxx.bigdata.weblog.clickstream;

import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cn.xxx.bigdata.weblog.mrbean.WebLogBean;
import cn.xxx.bigdata.weblog.utils.DateUtil;

/**
 * 步骤二:将清洗之后的日志梳理出点击流pageviews模型数据
 * 
 * 
 * @author vander
 * @date 2019年5月31日
 */
public class ClickStreamPageView extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		Configuration conf = super.getConf();
		//TODO 可以通过args进行传参
		String inputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/weblogPreOut";
		String outputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/pageViewOut";
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:9000"), conf);
		if (fileSystem.exists(new Path(outputPath))) {
			fileSystem.delete(new Path(outputPath), true);
		}
		Job job = Job.getInstance(conf);
		job.setJarByClass(ClickStreamPageView.class);
		job.setMapperClass(ClickStreamMapper.class);
		job.setReducerClass(ClickStreamReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(WebLogBean.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.setInputPaths(job, new Path[] { new Path(inputPath) });
		FileOutputFormat.setOutputPath(job, new Path(outputPath));

		boolean b = job.waitForCompletion(true);
		return b ? 0 : 1;
	}

	static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {
		Text k = new Text();
		WebLogBean v = new WebLogBean();

		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, WebLogBean>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();

			String[] fields = line.split("\001");
			if (fields.length < 9) {
				return;
			}
			//将切分出来的各字段set到weblogbean中
			this.v.set("true".equals(fields[0]), fields[1], fields[2], fields[3], fields[4], fields[5], fields[6],
					fields[7], fields[8]);
			//只有有效记录才进入后续处理
			if (this.v.isValid()) {
				 //此处用ip地址来标识用户
				this.k.set(this.v.getRemote_addr());
				context.write(this.k, this.v);
			}
		}
	}

	static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {
		Text v = new Text();

		protected void reduce(Text key, Iterable<WebLogBean> values,
				Reducer<Text, WebLogBean, NullWritable, Text>.Context context)
				throws IOException, InterruptedException {
			ArrayList<WebLogBean> beans = new ArrayList();
			// 先将一个用户的所有访问记录中的时间拿出来排序
			try {
				for (WebLogBean bean : values) {
					WebLogBean webLogBean = new WebLogBean();
					try {
						BeanUtils.copyProperties(webLogBean, bean);
					} catch (Exception e) {
						e.printStackTrace();
					}
					beans.add(webLogBean);
				}
				//将bean按时间先后顺序排序
				Collections.sort(beans, new Comparator<WebLogBean>() {
					public int compare(WebLogBean o1, WebLogBean o2) {
						try {
							Date d1 = ClickStreamPageView.ClickStreamReducer.this.toDate(o1.getTime_local());
							Date d2 = ClickStreamPageView.ClickStreamReducer.this.toDate(o2.getTime_local());
							if ((d1 == null) || (d2 == null)) {
								
								return 0;
							}
							return d1.compareTo(d2);
						} catch (Exception e) {
							e.printStackTrace();
						}
						return 0;
					}

				});
				/**
				 * 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
				 * 核心思想:
				 * 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session
				 * 否则,就属于不同的session
				 * 
				 */
				int step = 1;
				String session = UUID.randomUUID().toString();
				for (int i = 0; i < beans.size(); i++) {
					WebLogBean bean = (WebLogBean) beans.get(i);
					// 如果仅有1条数据,则直接输出
					if (1 == beans.size()) {
						// 设置默认停留时长为60s
						this.v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001"
								+ bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + 60
								+ "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001"
								+ bean.getBody_bytes_sent() + "\001" + bean.getStatus());
						context.write(NullWritable.get(), this.v);
						session = UUID.randomUUID().toString();
						break;
					}
					// 如果不止1条数据
					if (i != 0) {
						// 求近两次时间差
						long timeDiff = timeDiff(toDate(bean.getTime_local()),
								toDate(((WebLogBean) beans.get(i - 1)).getTime_local()));
						// 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
						if (timeDiff < 1800000L) {
							this.v.set(session + "\001" + key.toString() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getRemote_user() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getTime_local() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getRequest() + "\001" + step + "\001"
									+ timeDiff / 1000L + "\001" + ((WebLogBean) beans.get(i - 1)).getHttp_referer()
									+ "\001" + ((WebLogBean) beans.get(i - 1)).getHttp_user_agent() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getBody_bytes_sent() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getStatus());
							context.write(NullWritable.get(), this.v);
							step++;
						} else {
							// 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
							this.v.set(session + "\001" + key.toString() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getRemote_user() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getTime_local() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getRequest() + "\001" + step + "\001" + 60
									+ "\001" + ((WebLogBean) beans.get(i - 1)).getHttp_referer() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getHttp_user_agent() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getBody_bytes_sent() + "\001"
									+ ((WebLogBean) beans.get(i - 1)).getStatus());
							context.write(NullWritable.get(), this.v);
							// 输出完上一条之后,重置step编号
							step = 1;
							session = UUID.randomUUID().toString();
						}
						// 如果此次遍历的是最后一条,则将本条直接输出
						if (i == beans.size() - 1) {
							// 设置默认停留市场为60s
							this.v.set(session + "\001" + key.toString() + "\001" + bean.getRemote_user() + "\001"
									+ bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + 60
									+ "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001"
									+ bean.getBody_bytes_sent() + "\001" + bean.getStatus());
							context.write(NullWritable.get(), this.v);
						}
					}
				}
			} catch (ParseException e) {
				e.printStackTrace();
			}
		}

		private String toStr(Date date) {
			SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
			return df.format(date);
		}

		private Date toDate(String timeStr) throws ParseException {
			SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
			return df.parse(timeStr);
		}

		private long timeDiff(String time1, String time2) throws ParseException {
			Date d1 = toDate(time1);
			Date d2 = toDate(time2);
			return d1.getTime() - d2.getTime();
		}

		private long timeDiff(Date time1, Date time2) throws ParseException {
			return time1.getTime() - time2.getTime();
		}
	}

	public static void main(String[] args) throws Exception {
		int run = ToolRunner.run(new Configuration(), new ClickStreamPageView(), args);
		System.exit(run);
	}
}



package cn.xxx.bigdata.weblog.clickstream;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import cn.xxx.bigdata.weblog.mrbean.PageViewsBean;
import cn.xxx.bigdata.weblog.mrbean.VisitBean;
import cn.xxx.bigdata.weblog.utils.DateUtil;

/**
 * 步骤三:输入数据:pageviews模型结果数据,从pageviews模型结果数据中进一步梳理出visit模型
 * 
 * 
 * @author vander
 * @date 2019年5月31日
 */
public class ClickStreamVisit extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		Configuration conf = super.getConf();
		//TODO 可以通过args进行传参
		String inputPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/pageViewOut";
		String outPutPath = "hdfs://node01:9000/weblog/" + DateUtil.getYestDate() + "/clickStreamVisit";
		FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:9000"), conf);
		if (fileSystem.exists(new Path(outPutPath))) {
			fileSystem.delete(new Path(outPutPath), true);
		}
		Job job = Job.getInstance(conf);
		job.setJarByClass(ClickStreamVisit.class);
		job.setMapperClass(ClickStreamVisitMapper.class);
		job.setReducerClass(ClickStreamVisitReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(PageViewsBean.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(VisitBean.class);
		FileInputFormat.setInputPaths(job, new Path[] { new Path(inputPath) });
		FileOutputFormat.setOutputPath(job, new Path(outPutPath));

		boolean res = job.waitForCompletion(true);
		return res ? 0 : 1;
	}

	static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {
		PageViewsBean pvBean = new PageViewsBean();
		Text k = new Text();

		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, PageViewsBean>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\001");
			int step = Integer.parseInt(fields[5]);

			this.pvBean.set(fields[0], fields[1], fields[2], fields[3], fields[4], step, fields[6], fields[7],
					fields[8], fields[9]);
			this.k.set(this.pvBean.getSession());
			context.write(this.k, this.pvBean);
		}
	}

	static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {
		protected void reduce(Text session, Iterable<PageViewsBean> pvBeans,
				Reducer<Text, PageViewsBean, NullWritable, VisitBean>.Context context)
				throws IOException, InterruptedException {
			// 将pvBeans按照step排序
			ArrayList<PageViewsBean> pvBeansList = new ArrayList();
			for (PageViewsBean pvBean : pvBeans) {
				PageViewsBean bean = new PageViewsBean();
				try {
					BeanUtils.copyProperties(bean, pvBean);
					pvBeansList.add(bean);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {
				public int compare(PageViewsBean o1, PageViewsBean o2) {
					return o1.getStep() > o2.getStep() ? 1 : -1;
				}
			});
			// 取这次visit的首尾pageview记录,将数据放入VisitBean中
			VisitBean visitBean = new VisitBean();
			// 取visit的首记录
			visitBean.setInPage(((PageViewsBean) pvBeansList.get(0)).getRequest());
			visitBean.setInTime(((PageViewsBean) pvBeansList.get(0)).getTimestr());
			// 取visit的尾记录
			visitBean.setOutPage(((PageViewsBean) pvBeansList.get(pvBeansList.size() - 1)).getRequest());
			visitBean.setOutTime(((PageViewsBean) pvBeansList.get(pvBeansList.size() - 1)).getTimestr());
			// visit访问的页面数
			visitBean.setPageVisits(pvBeansList.size());
			// 来访者的ip
			visitBean.setRemote_addr(((PageViewsBean) pvBeansList.get(0)).getRemote_addr());
			// 本次visit的referal
			visitBean.setReferal(((PageViewsBean) pvBeansList.get(0)).getReferal());
			visitBean.setSession(session.toString());
			context.write(NullWritable.get(), visitBean);
		}
	}

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new ClickStreamVisit(), args);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345

导出jar包

weblogparser.jar

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/793044
推荐阅读
相关标签
  

闽ICP备14008679号