赞
踩
掌握以IDEA创建MapReduce工程
理解MapReduce的基本原理及执行流程
读懂Hadoop官方示例WorldCount的源码
掌握MapReduce编程的基本思路
理解Map函数和Reduce函数的处理逻辑
能够编写MapReduce程序来处理简单任务
【项目背景】
校园社区网站有数百万的注册用户,网站服务器上保留了用户登录网站的日志记录,即用户登录一次网站就在日志文件中记录一次用户的学号。 目前有2019年某一天的原始数据文件login.log,共800万行记录,部分内容如下:
2016001,20190319
2016002,20190319
2016001,20190319
运营部门希望能定期获得用户的登录次数信息,进行用户行为分析,从而制定有效的运营计划。
【解决方案】
通过理论加实践的方式逐层深入剖析MapReduce的分析计算。
通过MapReduce分析计算框架来解决校园社区网站的数据分析问题。
【项目任务】
▲ 任务1:MapReduce原理
▲ 任务2:MapReduce开发环境搭建
▲ 任务3:校园社区网站访问次数统计
▲ 任务4:校园社区网站访问次数排序
【拓展任务】
▲ 任务5:获取成绩表的最高分记录
▲ 任务6:对两个文件中的数据进行合并去重
启动大数据实验平台,浏览器登录大数据平台管理界面;准备MapReduce开发环境配置软件包。
本实验可以在本地测试,也可以在hadoop平台上进行测试,但无论如何都需要在IDEA中写代码。
在hadoop平台上测试需要先搭建好hadoop平台:hadoop平台搭建
接下来演示如何在本地进行测试:
【下载与安装IDEA】
【Windows环境变量配置】
(1)Hadoop解压到D盘
并配置环境变量
(2)将winutils解压后放到hadoop目录中的bin下面
(3)JDK环境变量配置
配置完成
启动大数据实验平台,浏览器登录大数据平台管理界面;准备实验数据;准备WordCount代码。
自行手写WordCount代码:
package com.lx.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount_1 {
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
//获取到Hadoop平台的配置信息
Configuration conf = new Configuration();
//启动一个作业
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount_1.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path in = new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\text1");
Path out = new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\WordCount_1");
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
//MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。
//MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
//hadoop中提供的一种类型
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//hello world hello tom
//hello jack
//定义context 这个类的作用就是一个分词器 将单词给分开
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
//(hello,1)
//(world,1)
//(hello,1)
}
}
}
//这边就是整合数据
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
//(hello,1+1+1) (hello,3)
//(world,1) (world,1)
//(tom,1) (tom,1)
//(jack,1) (jack,1)
}
}
}
【数据准备】
Dear Bear River
Car Car River
Dear Car Bear
【数据上传】
通过HDFS Shell将实验数据上传到HDFS(如果没有搭建好Hadoop平台的,可以在本地进行测试)
(1)首先在/opt/tools/下创建一个文件t.txt,打开文件,里面输入:
Dear Bear River
Car Car River
Dear Car Bear
(2)在hdfs上创建文件夹,使用此命令
hdfs dfs -mkdir /test
然后,将文件t.txt上传到hdfs上,使用此命令
hdfs dfs -put /opt/tools/t.txt /test
(3)执行
hadoop jar /opt/tools/mapreduce.jar com.lx.mr.WordCount /test/t.txt /out/tt.txt
2016001,20190319
2016002,20190319
2016001,20190319
package com.lx.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
public class CountSum_2 {
/**
* 2016001,20190319
* <p>
* (20190319.1)
*/
private static class MyMaper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
//按规则拆分成数组
String[] arry = line.split(",");
String keyout = arry[1];
context.write(new Text(keyout), one);
}
}
/**
* (20190319.(1,1,1,1))
* (20190318,1)
**/
private static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
/**
* (20190319.4)
* (20190318,1)
*
* */
}
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
//一些配置文件信息
Configuration conf = new Configuration();
//创建任务
Job job = Job.getInstance(conf);
//指定执行的map和Reduce的类
job.setMapperClass(MyMaper.class);
job.setReducerClass(MyReducer.class);
// 指定map和任务输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//reduce阶段的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置文件输出路径
FileInputFormat.addInputPath(job, new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\test2"));
FileOutputFormat.setOutputPath(job, new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\CountSum_2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
20160101,100
20160102,200
20160103,151
20160120,120
20160121,1000
package com.lx.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
//hadoop会对键默认地进行排序
public class OneSort_3 {
public static class Map extends Mapper<Object, Text, IntWritable, Text> {
/**
* 20160101,100
* <p>
* (100,20160101) ——>(20160101,100)
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] data = line.split(",");
String date = data[0];
int times = Integer.parseInt(data[1]);
context.write(new IntWritable(times), new Text(date));
}
}
public static class MyReduce extends Reducer<IntWritable, Text, Text, IntWritable> {
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
Path in = new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\test3");
Path out = new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\OneSort_31");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "OneSort_3");
job.setJarByClass(OneSort_3.class);
job.setMapperClass(Map.class);
job.setReducerClass(MyReduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
语文,95
数学,98
英语,99
英语,88
语文,78
数学,100
HighestScore类:
package com.lx.mrh;
import com.lx.mr.OneSort_3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class HighestScore {
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
Path in = new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\text4");
Path out = new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\HighestScore");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "OneSort_3");
job.setJarByClass(HighestScore.class);
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
ScoreMapper类:
package com.lx.mrh;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 语文,95 ——>(语文,(95,78))
* 数学,98 ——>(数学,(98,100))
* 英语,99
* 英语,88
* 语文,78
* 数学,100
*/
public class ScoreMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] data = line.split(",");
context.write(new Text(data[0]), new IntWritable(Integer.parseInt(data[1])));
}
}
ScoreReducer类:
package com.lx.mrh;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int high = 0; //if valude 大于给定的值
for (IntWritable value : values) {
if (value.get() > high) {
high = value.get();
}
}
context.write(key, new IntWritable(high));
}
}
XX.txt文件中:
20160102
20160103
20160105
YY.txt文件中:
20160101
20160102
20160106
注意:这里需要将两个文件放在同一个文件夹下
在获得输入路径时,只需要访问到对应的文件夹即可,MapReduce会自动合并两个文件
package com.lx.mrh;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
public class Combiner {
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
/**
* 20160102 1
* 20160103 1
* 20160105 1
* 20160101 1
* 20160102 1
* 20160106 1
*/
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, new IntWritable(1));
}
}
public static class MyReduce extends Reducer<Text, IntWritable, Text, NullWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, null);
}
}
public static void main(String[] args) throws Exception {
BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
Path in = new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\date");
Path out = new Path("D:\\Users\\ASUS\\IdeaProjects\\mapreduce\\src\\main\\resources\\outdate");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Combiner");
job.setJarByClass(Combiner.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。