当前位置:   article > 正文

HDFS和MapReduce综合实训_第1关:wordcount词频统计

第1关:wordcount词频统计

第一关 wordCount词频统计

 import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCount {
  /*
  * MapReduceBase类:实现Mapper和Reducer接口的基类    
  * Mapper接口:
  * WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类要实现此接口。    
  */  
  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
  /*
  *LongWritable,IntWritable,Text是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,
  *都能够被串行化,便于在分布式环境中进行数据交换,可以视为long,int,String数据类型的替代。
  */
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();//Text实现了BinaryComparable类,可以作为key值
  /*
  * Mapper接口中的map方法:
  * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
  * 映射一个单个的输入<K1,V1>对到一个中间输出<K2,V2>对
  * 中间输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。  
  * OutputCollector接口:收集Mapper和Reducer输出的<K,V>对。
  * OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output
  * Reporter 用于报告整个应用的运行进度
     */  
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
  /*
       * 原始数据(以test1.txt为例):
     *  tale as old as time
    true as it can be
    beauty and the beast
    map阶段,数据如下形式作为map的输入值:key为偏移量
      <0  tale as old as time>
      <21 world java hello>
      <39 you me too>
       */
       
       /**
     * 解析(Spliting)后以得到键值对<K2,V2>(仅以test1.txt为例)
         * 格式如下:前者是键值,后者数字是值
         * tale 1
         * as 1
         * old 1
         * as 1
         * time 1
         * true 1
         * as 1
         * it 1
         * can 1
         * be 1
         * beauty 1
         * and 1
         * the 1
         * beast 1
         * 这些键值对作为map的输出数据
         */
  //****请补全map函数内容****//
  /*********begin*********/
StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }

 



  /*********end**********/
    }
  }
 
  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
   /*
   * reduce过程是对输入键值对洗牌(Shuffing)形成<K2,list(V2)>格式数据(仅以test1.txt为例):
   * (tablie [1])
   * (as [1,1,1])
   * (old [1])
   * (time [1])
   * (true [1])
   * (it [1])
   * (can [1])
   * (be [1])
   * (beauty [1])
   * (and [1])
   * (the [1])
   * (beast [1])
   * 作为reduce的输入
   *
   */
    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
    //****请补全reduce对<k2, list(v2)> 进行合计得到list(<k3,v3>)过程****//
  /*********begin*********/
int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   




  /*********end**********/
  //****请将list(<k3,v3>)统计输出****//
  /*********begin*********/




  /*********end**********/
  }
}
  public static void main(String[] args) throws Exception {
    /**
       * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
       * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
       */  
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  /*
  * 需要配置输入和输出的HDFS的文件路径参数
  * 可以使用"Usage: wordcount <in> <out>"实现程序运行时动态指定输入输出
  */
    if (otherArgs.length != 2) {
       System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");//Job(Configuration conf,String jobName)设置job名称
    job.setJarByClass(WordCount.class);//为job设置Mapper类
      /*********begin*********/
      //****请为job设置Mapper类****//
        // job.setCombinerClass(IntSumReducer.class);//为job设置Combiner类
job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);





      //****请为job设置Reduce类****//
      //****请设置输出key的参数类型****//
      //****请设置输出value的类型****//
     

      /*********end**********/
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//为map-reduce任务设置InputFormat实现类,设置输入路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类,设置输出路径
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

第二关 HDFS文件读写

import java.io.IOException;

import java.sql.Date;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;


 

public class hdfs {

    public static void main(String[] args) throws IOException {

//throws IOException捕获异常声明

//****请根据提示补全文件创建过程****//

/*********begin*********/

     Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(conf);

            System.out.println(fs.getUri());  

//实现文件读写主要包含以下步骤:

//读取hadoop文件系统配置

//实例化设置文件,configuration类实现hadoop各模块之间值的传递

//FileSystem是hadoop访问系统的抽象类,获取文件系统, FileSystem的get()方法得到实例fs,然后fs调动create()创建文件,调用open()打开文件,调用close()关闭文件

           

//*****请按照题目填写要创建的路径,其他路径及文件名无法被识别******//

         Path file = new Path("/user/hadoop/myfile");

/*********end**********/

        if (fs.exists(file)) {

             System.out.println("File exists.");

        } else

            {

//****请补全使用文件流将字符写入文件过程,使用outStream.writeUTF()函数****//

                /*********begin*********/

                FSDataOutputStream outStream = fs.create(file);

           outStream.writeUTF("china cstor cstor cstor china");

         outStream.close();

               

               

               

                /*********end**********/

           

        }

       

//****请补全读取文件内容****//

/*********begin*********/

// 提示:FSDataInputStream实现接口,使Hadoop中的文件输入流具有流式搜索和流式定位读取的功能

    FSDataInputStream inStream = fs.open(file);

        String data = inStream.readUTF();    

       

       

/*********end**********/

       

//输出文件状态

//FileStatus对象封装了文件的和目录的元数据,包括文件长度、块大小、权限等信息

        FileSystem hdfs = file.getFileSystem(conf);

        FileStatus[] fileStatus = hdfs.listStatus(file);

        for(FileStatus status:fileStatus)

        {

           System.out.println("FileOwer:"+status.getOwner());//所有者

           System.out.println("FileReplication:"+status.getReplication());//备份数

           System.out.println("FileModificationTime:"+new Date(status.getModificationTime()));//目录修改时间

           System.out.println("FileBlockSize:"+status.getBlockSize());//块大小

        }

        System.out.println(data);

        System.out.println("Filename:"+file.getName());

        inStream.close();

        fs.close();

    }

  }


第三关 倒排索引

import java.io.IOException;

import java.util.HashMap;

import java.util.Hashtable;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.util.Iterator;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;

public class InvertedIndex {

    public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>

    {

        public void map(LongWritable key, Text value, Context context)  

                throws IOException, InterruptedException

         

        {  

            FileSplit fileSplit = (FileSplit)context.getInputSplit();

            String fileName = fileSplit.getPath().getName();

           

            String word;

            IntWritable frequence=new IntWritable();

            int one=1;

            Hashtable<String,Integer>   hashmap=new Hashtable();//key关键字设置为String

            StringTokenizer itr = new StringTokenizer(value.toString());

           

//****请用hashmap定义的方法统计每一行中相同单词的个数,key为行值是每一行对应的偏移****//

/*********begin*********/

for(;itr.hasMoreTokens(); )

            {  

               

                word=itr.nextToken();

                if(hashmap.containsKey(word)){

                    hashmap.put(word,hashmap.get(word)+1);

                }else{

                    hashmap.put(word, one);

               

                }

           

            }




 

/*********end**********/            

                           

            for(Iterator<String> it=hashmap.keySet().iterator();it.hasNext();){

                word=it.next();

                frequence=new IntWritable(hashmap.get(word));

                Text fileName_frequence = new Text(fileName+"@"+frequence.toString());//以<K2,“单词 文件名@出现频次”> 的格式输出

                context.write(new Text(word),fileName_frequence);

            }

           

        }

    }

    public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{

        protected void reduce(Text key,Iterable<Text> values,Context context)

                        throws IOException ,InterruptedException{

//****请合并mapper函数的输出,并提取“文件@1”中‘@’后面的词频,以<K2,list(“单词 文件名@出现频次”)>的格式输出****//

/*********begin*********/

String fileName="";

            int sum=0;

            String num;

            String s;

            for (Text val : values) {

                   

                    s= val.toString();

                    fileName=s.substring(0, val.find("@"));

                    num=s.substring(val.find("@")+1, val.getLength());

                    sum+=Integer.parseInt(num);

            }

        IntWritable frequence=new IntWritable(sum);

        context.write(key,new Text(fileName+"@"+frequence.toString()));




 

/*********end**********/                

        }

    }

   

    public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>

    {   @Override

        protected void reduce(Text key, Iterable<Text> values, Context context)

                throws IOException, InterruptedException

        {   Iterator<Text> it = values.iterator();

            StringBuilder all = new StringBuilder();

            if(it.hasNext())  all.append(it.next().toString());

            for(;it.hasNext();) {

                all.append(";");

                all.append(it.next().toString());                  

            }

//****请输出最终键值对list(K3,“单词", “文件1@频次; 文件2@频次;...")****//

/*********begin*********/


 

context.write(key, new Text(all.toString()));


 

/*********end**********/        

        }

    }

    public static void main(String[] args)

    {

        if(args.length!=2){

            System.err.println("Usage: InvertedIndex <in> <out>");

            System.exit(2);

        }

       

      try {

                Configuration conf = new Configuration();

                String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

               

                Job job = new Job(conf, "invertedindex");

                job.setJarByClass(InvertedIndex.class);

                job.setMapperClass(InvertedIndexMapper.class);

            //****请为job设置Combiner类****//

/*********begin*********/

job.setCombinerClass(InvertedIndexCombiner.class);

/*********end**********/                                

                job.setReducerClass(InvertedIndexReducer.class);

               

                job.setOutputKeyClass(Text.class);

            //****请设置输出value的类型****//

/*********begin*********/

job.setOutputValueClass(Text.class);

/*********end**********/                                    

                FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

                FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

               

                System.exit(job.waitForCompletion(true) ? 0 : 1);

     

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

第四关 网页排序-PageRank算法

import java.io.IOException;

import java.text.DecimalFormat;

import java.text.NumberFormat;

import java.util.StringTokenizer;

import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class PageRank {

  public static class MyMapper   extends Mapper<Object, Text, Text, Text>

  {

        private Text id = new Text();

        public void map(Object key, Text value, Context context ) throws IOException, InterruptedException

        {

            String line = value.toString();

//判断是否为输入文件

            if(line.substring(0,1).matches("[0-9]{1}"))

            {

                  boolean flag = false;

                  if(line.contains("_"))

                  {

                        line = line.replace("_","");

                        flag = true;

                  }

//对输入文件进行处理

                  String[] values = line.split("\t");

                  Text t = new Text(values[0]);

                  String[] vals = values[1].split(" ");

                  String url="_";//保存url,用作下次计算

                  double pr = 0;

                  int i = 0;

                  int num = 0;

                 

                  if(flag)

                  {

                      i=2;

                      pr=Double.valueOf(vals[1]);

                      num=vals.length-2;

                  }

                  else

                  {

                      i=1;

                      pr=Double.valueOf(vals[0]);

                      num=vals.length-1;

                  }

                 

                  for(;i<vals.length;i++)

                  {

                      url=url+vals[i]+" ";

                      id.set(vals[i]);

                      Text prt = new Text(String.valueOf(pr/num));

                      context.write(id,prt);

                  }

                  context.write(t,new Text(url));

              }

          }

  }

  public static class MyReducer  extends Reducer<Text,Text,Text,Text>

  {

              private Text result = new Text();

              private Double pr = new Double(0);

             

         public void reduce(Text key, Iterable<Text> values,  Context context  ) throws IOException, InterruptedException

         {

              double sum=0;

              String url="";

             

//****请通过url判断否则是外链pr,作计算前预处理****//

/*********begin*********/

             

              for(Text val:values)

              {

                  if(!val.toString().contains("_"))

                  {

                      sum=sum+Double.valueOf(val.toString());

                  }

                  else

                 {

                      url=val.toString();

                  }

              }

              pr=0.15+0.85*sum;

              String str=String.format("%.3f",pr);

              result.set(new Text(str+" "+url));

              context.write(key,result);

/*********end**********/            

     

             

//****请补全用完整PageRank计算公式计算输出过程,q取0.85****//

/*********begin*********/


 

/*********end**********/    

          }

 }

    public static void main(String[] args) throws Exception

    {

             String paths="file:///tmp/input/Wiki0";//输入文件路径,不要改动

            String path1=paths;

            String path2="";

            for(int i=1;i<=5;i++)//迭代5次

              {

                System.out.println("This is the "+i+"th job!");

                System.out.println("path1:"+path1);

                System.out.println("path2:"+path2);

                Configuration conf = new Configuration();

                Job job = new Job(conf, "PageRank");

                path2=paths+i;    

                job.setJarByClass(PageRank.class);

                job.setMapperClass(MyMapper.class);

        //****请为job设置Combiner类****//

/*********begin*********/

job.setCombinerClass(MyReducer.class);

/*********end**********/                    

                job.setReducerClass(MyReducer.class);

                job.setOutputKeyClass(Text.class);

                job.setOutputValueClass(Text.class);

                FileInputFormat.addInputPath(job, new Path(path1));

                FileOutputFormat.setOutputPath(job, new Path(path2));

                path1=path2;      

             job.waitForCompletion(true);

            System.out.println(i+"th end!");

        }

      }

 }

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/388236
推荐阅读
相关标签
  

闽ICP备14008679号