赞
踩
实验材料及说明
在Ubuntu系统的/学号(每个人用自己的学号)/salesInfo目录下,有买家的购买记录文件Sales,该文件记录了买家的id,购买商品的id以及购买日期,文件为名为Sales。Sales包含:买家ID、商品ID、购买日期三个字段,数据以“\t”进行分割,样本数据及格式如下:
买家ID 商品ID 购买日期
1000181 1000481 2021-04-04 16:54:31
2000001 1001597 2021-04-07 15:07:52
2000001 1001560 2021-04-07 15:08:27
2000042 1001368 2021-04-08 08:20:30
2000067 1002061 2021-04-08 16:45:33
2000056 1003289 2021-04-12 10:50:55
2000056 1003290 2021-04-12 11:57:35
2000056 1003292 2021-04-12 12:05:29
2000054 1002420 2021-04-14 15:24:12
2000055 1001679 2021-04-14 19:46:04
2000054 1010675 2021-04-14 15:23:53
2000054 1002429 2021-04-14 17:52:45
2000076 1002427 2021-04-14 19:35:39
2000054 1003326 2021-04-20 12:54:44
2000056 1002420 2021-04-15 11:24:49
2000064 1002422 2021-04-15 11:35:54
2000056 1003066 2021-04-15 11:43:01
2000056 1003055 2021-04-15 11:43:06
2000056 1010183 2021-04-15 11:45:24
2000056 1002422 2021-04-15 11:45:49
2000056 1003100 2021-04-15 11:45:54
2000056 1003094 2021-04-15 11:45:57
2000056 1003064 2021-04-15 11:46:04
2000056 1010178 2021-04-15 16:15:20
2000076 1003101 2021-04-15 16:37:27
2000076 1003103 2021-04-15 16:37:05
2000076 1003100 2021-04-15 16:37:18
2000076 1003066 2021-04-15 16:37:31
要求根据要求撰写实验报告,实验报告需要包括实验原理、算法设计思路、代码、代码调试说明、实验过程中碰到的问题和代码改进建议等内容。
一、实验目的
掌握MapReduce的统计排序和HDFS的读写功能。即,要求调用HDFS的Java API,将salesInfo目录下的Sales文件上传到HDFS文件系统的/Sales目录下;调用MapReduce的Java API统计HDFS文件系统/Sales目录下的销售文件中每个买家购买商品的数量;将上一步结果输出到Ubuntu系统,的/data/hdfs目录下,同时请输出降序排序的结果,并按照收藏的商品数从大到小排列。
二、实验环境
(1)Linux Ubuntu 18.XX
(2)jdk-8u162-linux-x64
(3)hadoop-2.X.X-XXX
(4)hadoop-2.X.X-eclipse- XXX.jar
(5)eclipse-java-juno-SR2-linux-gtk-x86_64.tar.gz
(6)hbase-1.X.X-XXX
三、实验原理或流程
1.MapReduce的工作原理
在分布式计算中,MapReduce框架负责处理了并行编程里分布式存储、工作调度,负载均衡、容错处理以及网络通信等复杂问题,现在我们把处理过程高度抽象为Map与Reduce两个部分来进行阐述,其中Map部分负责把任务分解成多个子任务,Reduce部分负责把分解后多个子任务的处理结果汇总起来。
在Hadoop中,每个MapReduce任务都被初始化为一个Job,每个Job又可以分为两种阶段:map阶段和reduce阶段。这两个阶段分别用两个函数表示,即map函数和reduce函数。map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,Hadoop函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式的。
①Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。读取输入文件的内容,并解析成键值对(<key, value>)的形式,输入文件中的每一行被解析成一个<key, value>对,每个<key, value>对调用一次map()函数。用户写map()函数,对输入的<key,value>对进行处理,并输出新的<key,value>对。对得到的<key,value>进行分区操作。不同分区的数据,按照key值进行排序和分组,具有相同key值的value则放到同一个集合中。
②Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。对于多个map任务的输出,按照不同的分区,通过网络传输到不同的Reduce节点。对多个map任务的输出结果进行合并、排序,用户书写reduce函数,对输入的key、value进行处理,得到新的key、value输出结果。将reduce的输出结果保存在文件中。
在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。
2.MapReduce框架的作业运行流程
①ResourceManager:是YARN资源控制框架的中心模块,负责集群中所有资源的统一管理和分配。它接收来自NM(NodeManager)的汇报,建立AM,并将资源派送给AM(ApplicationMaster)。
②NodeManager:简称NM,NodeManager是ResourceManager在每台机器上的代理,负责容器管理,并监控他们的资源使用情况(cpu、内存、磁盘及网络等),以及向ResourceManager提供这些资源使用报告。
③ApplicationMaster:YARN中每个应用都会启动一个AM,负责向RM申请资源,请求NM启动Container,并告诉Container做什么事情。
④Container:资源容器。YARN中所有的应用都是在Container之上运行的。AM也是在Container上运行的,不过AM的Container是RM申请的。Container是YARN中资源的抽象,它封装了某个节点上一定量的资源(CPU和内存两类资源)。Container由ApplicationMaster向ResourceManager申请的,由ResouceManager中的资源调度器异步分配给ApplicationMaster。Container的运行是由ApplicationMaster向资源所在的NodeManager发起的,Container运行时需提供内部执行的任务命令(可以是任何命令,比如java、Python、C++进程启动命令均可)以及该命令执行所需的环境变量和外部资源(比如词典文件、可执行文件、jar包等)。
另外,一个应用程序所需的Container分为两大类,如下:
①运行ApplicationMaster的Container:这是由ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时,可指定唯一的ApplicationMaster所需的资源。
②运行各类任务的Container:这是由ApplicationMaster向ResourceManager申请的,并为了ApplicationMaster与NodeManager通信以启动的。
以上两类Container可能在任意节点上,它们的位置通常而言是随机的,即ApplicationMaster可能与它管理的任务运行在一个节点上。
(2)用流程图表示算法的实现过程
四、实验内容
实验内容需要包括以下内容:
在hdfs中创建文件夹:
package yang; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class mkdirDemo { public static void main(String [] args) throws IOException, URISyntaxException { FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), new Configuration()); fs.mkdirs(new Path("/Sales"));//创建文件夹 // fs.create(new Path("/testsales.txt"));//创建文件 fs.close(); System.out.println("success"); } }
创建文件结果:
在hdfs中上传文件:
package yang; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class uplownd { public static void copyFromLocal(String source, String dest)throws IOException, URISyntaxException { // 读取hadoop文件系统的配置 Configuration conf = new Configuration(); URI uri = new URI("hdfs://localhost:9000"); // FileSystem是用户操作HDFS的核心类,它获得URI对应的HDFS文件系统 FileSystem fileSystem = FileSystem.get(uri, conf); // 源文件路径 Path srcPath = new Path(source); // 目的路径 Path dstPath = new Path(dest); // 查看目的路径是否存在 if (!(fileSystem.exists(dstPath))) { // 如果路径不存在,即刻创建 fileSystem.mkdirs(dstPath); } // 得到本地文件名称 String filename = source.substring(source.lastIndexOf('/') + 1,source.length()); try { // 将本地文件上传到HDFS fileSystem.copyFromLocalFile(srcPath, dstPath); System.out.println("File " + filename + " copied to " + dest); } catch (Exception e) { System.err.println("Exception caught! :" + e); System.exit(1); } finally { fileSystem.close(); } } }
上传文件结果:
在hdfs中进行统计
package yang; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class shangpintongji { public static class doMapper extends Mapper<Object, Text, Text, IntWritable> { //第一个Object表示输入key的类型;第二个Text表示输入value的类型;第三个Text表示表示输出键的类型;第四个IntWritable表示输出值的类型 public static final IntWritable one = new IntWritable(1); public static Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {//抛出异常 StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");//以空格分割 //StringTokenizer是Java工具包中的一个类,用于将字符串进行拆分 word.set(tokenizer.nextToken()); //返回当前位置到下一个分隔符之间的字符串 context.write(word, one); //将word存到容器中,记一个数 } } public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(); job.setJobName("shangpintongji"); job.setJarByClass(shangpintongji.class); job.setMapperClass(doMapper.class); job.setReducerClass(doReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path in=new Path("hdfs://localhost:9000/Sales/Sales"); Path out=new Path("hdfs://localhost:9000/shiyan/out1"); FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out); boolean flag = job.waitForCompletion(true); System.out.println(flag); System.exit(flag? 0 : 1); } }
统计结果:
降序排序:
package liangchenchen; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class paixu { public static class Map extends Mapper<Object , Text , IntWritable,Text >{ private static Text goods=new Text(); private static IntWritable num=new IntWritable(); public void map(Object key,Text value,Context context)throws IOException,InterruptedException{ String line=value.toString(); String arr[]=line.split("\t"); num.set(Integer.parseInt(arr[1])); goods.set(arr[0]); context.write(num, goods); } } public static class Reduce extends Reducer< IntWritable, Text, IntWritable, Text> //input is temperature (IntWritable) and data (Text) { //private static IntWritable result= new IntWritable(); public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ //reduce端接收到<key,value-list>之后, //将输入的key直接复制给输出的key,用for循环遍历value-list并将里面的元素设置为输出的value, //然后将<key,value>逐一输出,根据value-list中元素的个数决定输出的次数。 for(Text val:values){ System.out.println("-------"+val); context.write(key,val); //output data of the same temperature } } } //使Sort阶段的Key降序排列的比较器 public static class IntWritableDecreasingComparator extends IntWritable.Comparator { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{ Configuration conf=new Configuration(); Job job =new Job(conf,"sort"); job.setJarByClass(paixu.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://localhost:9000/shiyan/out/part-r-00000"); Path out=new Path("hdfs://localhost:9000/shiyan/out2"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); job.setSortComparatorClass(IntWritableDecreasingComparator.class); System.exit(job.waitForCompletion(true)?0:1); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。