当前位置:   article > 正文

编写MapReduce程序,统计每个买家收藏商品数量,实现统计排序功能_使用mapreduce统计数量占比

使用mapreduce统计数量占比

实验材料及说明
Ubuntu系统的/学号(每个人用自己的学号)/salesInfo目录下,有买家的购买记录文件Sales,该文件记录了买家的id,购买商品的id以及购买日期,文件为名为Sales。Sales包含:买家ID、商品ID、购买日期三个字段,数据以“\t”进行分割,样本数据及格式如下:
买家ID 商品ID 购买日期
1000181 1000481 2021-04-04 16:54:31
2000001 1001597 2021-04-07 15:07:52
2000001 1001560 2021-04-07 15:08:27
2000042 1001368 2021-04-08 08:20:30
2000067 1002061 2021-04-08 16:45:33
2000056 1003289 2021-04-12 10:50:55
2000056 1003290 2021-04-12 11:57:35
2000056 1003292 2021-04-12 12:05:29
2000054 1002420 2021-04-14 15:24:12
2000055 1001679 2021-04-14 19:46:04
2000054 1010675 2021-04-14 15:23:53
2000054 1002429 2021-04-14 17:52:45
2000076 1002427 2021-04-14 19:35:39
2000054 1003326 2021-04-20 12:54:44
2000056 1002420 2021-04-15 11:24:49
2000064 1002422 2021-04-15 11:35:54
2000056 1003066 2021-04-15 11:43:01
2000056 1003055 2021-04-15 11:43:06
2000056 1010183 2021-04-15 11:45:24
2000056 1002422 2021-04-15 11:45:49
2000056 1003100 2021-04-15 11:45:54
2000056 1003094 2021-04-15 11:45:57
2000056 1003064 2021-04-15 11:46:04
2000056 1010178 2021-04-15 16:15:20
2000076 1003101 2021-04-15 16:37:27
2000076 1003103 2021-04-15 16:37:05
2000076 1003100 2021-04-15 16:37:18
2000076 1003066 2021-04-15 16:37:31
要求根据要求撰写实验报告,实验报告需要包括实验原理、算法设计思路、代码、代码调试说明、实验过程中碰到的问题和代码改进建议等内容。
一、实验目的
掌握MapReduce的统计排序和HDFS的读写功能。即,要求调用HDFS的Java API,将salesInfo目录下的Sales文件上传到HDFS文件系统的/Sales目录下;调用MapReduce的Java API统计HDFS文件系统/Sales目录下的销售文件中每个买家购买商品的数量;将上一步结果输出到Ubuntu系统,的/data/hdfs目录下,同时请输出降序排序的结果,并按照收藏的商品数从大到小排列。
二、实验环境
(1)Linux Ubuntu 18.XX
(2)jdk-8u162-linux-x64
(3)hadoop-2.X.X-XXX
(4)hadoop-2.X.X-eclipse- XXX.jar
(5)eclipse-java-juno-SR2-linux-gtk-x86_64.tar.gz
(6)hbase-1.X.X-XXX
三、实验原理或流程
1.MapReduce的工作原理
在分布式计算中,MapReduce框架负责处理了并行编程里分布式存储、工作调度,负载均衡、容错处理以及网络通信等复杂问题,现在我们把处理过程高度抽象为Map与Reduce两个部分来进行阐述,其中Map部分负责把任务分解成多个子任务,Reduce部分负责把分解后多个子任务的处理结果汇总起来。
在Hadoop中,每个MapReduce任务都被初始化为一个Job,每个Job又可以分为两种阶段:map阶段和reduce阶段。这两个阶段分别用两个函数表示,即map函数和reduce函数。map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,Hadoop函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式的。
①Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。读取输入文件的内容,并解析成键值对(<key, value>)的形式,输入文件中的每一行被解析成一个<key, value>对,每个<key, value>对调用一次map()函数。用户写map()函数,对输入的<key,value>对进行处理,并输出新的<key,value>对。对得到的<key,value>进行分区操作。不同分区的数据,按照key值进行排序和分组,具有相同key值的value则放到同一个集合中。
②Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。对于多个map任务的输出,按照不同的分区,通过网络传输到不同的Reduce节点。对多个map任务的输出结果进行合并、排序,用户书写reduce函数,对输入的key、value进行处理,得到新的key、value输出结果。将reduce的输出结果保存在文件中。
在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。
2.MapReduce框架的作业运行流程
①ResourceManager:是YARN资源控制框架的中心模块,负责集群中所有资源的统一管理和分配。它接收来自NM(NodeManager)的汇报,建立AM,并将资源派送给AM(ApplicationMaster)。
②NodeManager:简称NM,NodeManager是ResourceManager在每台机器上的代理,负责容器管理,并监控他们的资源使用情况(cpu、内存、磁盘及网络等),以及向ResourceManager提供这些资源使用报告。
③ApplicationMaster:YARN中每个应用都会启动一个AM,负责向RM申请资源,请求NM启动Container,并告诉Container做什么事情。
④Container:资源容器。YARN中所有的应用都是在Container之上运行的。AM也是在Container上运行的,不过AM的Container是RM申请的。Container是YARN中资源的抽象,它封装了某个节点上一定量的资源(CPU和内存两类资源)。Container由ApplicationMaster向ResourceManager申请的,由ResouceManager中的资源调度器异步分配给ApplicationMaster。Container的运行是由ApplicationMaster向资源所在的NodeManager发起的,Container运行时需提供内部执行的任务命令(可以是任何命令,比如java、Python、C++进程启动命令均可)以及该命令执行所需的环境变量和外部资源(比如词典文件、可执行文件、jar包等)。
另外,一个应用程序所需的Container分为两大类,如下:
①运行ApplicationMaster的Container:这是由ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时,可指定唯一的ApplicationMaster所需的资源。
②运行各类任务的Container:这是由ApplicationMaster向ResourceManager申请的,并为了ApplicationMaster与NodeManager通信以启动的。
以上两类Container可能在任意节点上,它们的位置通常而言是随机的,即ApplicationMaster可能与它管理的任务运行在一个节点上。

(2)用流程图表示算法的实现过程
在这里插入图片描述

四、实验内容
实验内容需要包括以下内容:
在hdfs中创建文件夹:

package yang;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class mkdirDemo {
	public static void main(String [] args) throws IOException, URISyntaxException {
	FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), new Configuration());
	fs.mkdirs(new Path("/Sales"));//创建文件夹
//	fs.create(new Path("/testsales.txt"));//创建文件
	fs.close();		
	System.out.println("success");
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

创建文件结果:

在hdfs中上传文件:

package yang;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class uplownd { 
	    public static void copyFromLocal(String source, String dest)throws IOException, URISyntaxException {
	        // 读取hadoop文件系统的配置
	        Configuration conf = new Configuration();
	        URI uri = new URI("hdfs://localhost:9000");
	        // FileSystem是用户操作HDFS的核心类,它获得URI对应的HDFS文件系统
	        FileSystem fileSystem = FileSystem.get(uri, conf);
	        // 源文件路径
	        Path srcPath = new Path(source);
	        // 目的路径
	        Path dstPath = new Path(dest);
	        // 查看目的路径是否存在
	        if (!(fileSystem.exists(dstPath))) {
	            // 如果路径不存在,即刻创建
	            fileSystem.mkdirs(dstPath);
	        }
	        // 得到本地文件名称
	        String filename = source.substring(source.lastIndexOf('/') + 1,source.length());
	        try {
	            // 将本地文件上传到HDFS
	            fileSystem.copyFromLocalFile(srcPath, dstPath);
	            System.out.println("File " + filename + " copied to " + dest);
	        } catch (Exception e) {
	            System.err.println("Exception caught! :" + e);
	            System.exit(1);
	        } finally {
	            fileSystem.close();
	        }
	    }
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

上传文件结果:
在这里插入图片描述

在hdfs中进行统计

package yang;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class shangpintongji  {
	 public static class doMapper extends Mapper<Object, Text, Text, IntWritable> {
	//第一个Object表示输入key的类型;第二个Text表示输入value的类型;第三个Text表示表示输出键的类型;第四个IntWritable表示输出值的类型  
	  public static final IntWritable one = new IntWritable(1);
	  public static Text word = new Text();
	  @Override
	  protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {//抛出异常 
	   StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");//以空格分割
	//StringTokenizer是Java工具包中的一个类,用于将字符串进行拆分 
	   word.set(tokenizer.nextToken());
	//返回当前位置到下一个分隔符之间的字符串
	  context.write(word, one);
	//将word存到容器中,记一个数
	  }
	 }
	 public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	  private IntWritable result = new IntWritable();
	  protected void reduce(Text key, Iterable<IntWritable> values, Context context)
	    throws IOException, InterruptedException {
	   int sum = 0;
	   for (IntWritable value : values) {
	    sum += value.get();
	   }
	   result.set(sum);
	   context.write(key, result);
	  }
	 } 
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		  Job job = Job.getInstance();
		  job.setJobName("shangpintongji");
		  job.setJarByClass(shangpintongji.class);
		  job.setMapperClass(doMapper.class);
		  job.setReducerClass(doReducer.class);
		  job.setOutputKeyClass(Text.class);
		  job.setOutputValueClass(IntWritable.class);
	      Path in=new Path("hdfs://localhost:9000/Sales/Sales");
		  Path out=new Path("hdfs://localhost:9000/shiyan/out1");
		  FileInputFormat.addInputPath(job, in);
		  FileOutputFormat.setOutputPath(job, out);
		  boolean flag = job.waitForCompletion(true);
		  System.out.println(flag);
		  System.exit(flag? 0 : 1);
		 }
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

统计结果:
在这里插入图片描述在这里插入图片描述
在这里插入图片描述

降序排序:

package liangchenchen;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class paixu {
	  public static class Map extends Mapper<Object , Text , IntWritable,Text >{
	    	private static Text goods=new Text();
	    	private static IntWritable num=new IntWritable();
	    	public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
	    		String line=value.toString();
	    		String arr[]=line.split("\t");
	    		num.set(Integer.parseInt(arr[1]));
	    		goods.set(arr[0]);
	    		context.write(num, goods);
	    	}
	    }
	    public static class Reduce extends Reducer< IntWritable, Text, IntWritable, Text>
	  //input is temperature (IntWritable) and data (Text)
	  {
	  	//private static IntWritable result= new IntWritable();
	  	
	  	public void reduce(IntWritable key,Iterable<Text> values,Context context) 
	  			throws IOException, InterruptedException{
	  		//reduce端接收到<key,value-list>之后,
	  		//将输入的key直接复制给输出的key,用for循环遍历value-list并将里面的元素设置为输出的value,
	  		//然后将<key,value>逐一输出,根据value-list中元素的个数决定输出的次数。
	  		
	  		for(Text val:values){
	  			System.out.println("-------"+val);
	  			context.write(key,val);  //output data of the same temperature
	  			}
	  		}
	  	}
	  //使Sort阶段的Key降序排列的比较器
		  public static class IntWritableDecreasingComparator extends
			IntWritable.Comparator {
		           public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			       return -super.compare(b1, s1, l1, b2, s2, l2);
		}
		}
		    
		        public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{
		            Configuration conf=new Configuration();
		             Job job =new Job(conf,"sort");
		             job.setJarByClass(paixu.class);
		             job.setMapperClass(Map.class);
		             job.setReducerClass(Reduce.class);
		             job.setOutputKeyClass(IntWritable.class);
		             job.setOutputValueClass(Text.class);
		             job.setInputFormatClass(TextInputFormat.class);
		             job.setOutputFormatClass(TextOutputFormat.class);
		             Path in=new Path("hdfs://localhost:9000/shiyan/out/part-r-00000");
		   		     Path out=new Path("hdfs://localhost:9000/shiyan/out2");
		             FileInputFormat.addInputPath(job,in);
		             FileOutputFormat.setOutputPath(job,out);
		             job.setSortComparatorClass(IntWritableDecreasingComparator.class);
		             System.exit(job.waitForCompletion(true)?0:1);

		       }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/781056
推荐阅读
相关标签
  

闽ICP备14008679号