当前位置:   article > 正文

MapReduce初体验——统计指定文本文件中每一个单词出现的总次数_编写mapreduce程序,在给定的文本中统计输出每一个单词出现的总次数。 输入数据tes

编写mapreduce程序,在给定的文本中统计输出每一个单词出现的总次数。 输入数据tes

        相信小伙伴们看了小菌上一篇博客《什么是MapReduce(入门篇)》后,对MapReduce的概念有了更深的认知!本篇博客,小菌为大家带来的则是MapReduce的实战——统计指定文本文件中每一个单词出现的总次数。

        我们先来确定初始的数据源,即wordcount.txt文件!

在这里插入图片描述

        跟之前使用API一样,我们同样需要在IDEA中使用JAVA代码来书写MapReduce!

项目POM文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast</groupId>
    <artifactId>mapreduce</artifactId>
    <version>1.0-SNAPSHOT</version>
    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.Hadoop</groupId>
            <artifactId>Hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Hadoop</groupId>
            <artifactId>Hadoop-common</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Hadoop</groupId>
            <artifactId>Hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.Hadoop</groupId>
            <artifactId>Hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>
</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

定义一个mapper类

package demo02;

//首先要定义四个泛型的类型
//keyin:  LongWritable    valuein: Text
//keyout: Text            valueout:IntWritable

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Auther: 封茗囧菌
 * @Date: 2019/11/11 17:43
 * @Description:
 * 需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
 */
 
// long,int,String  ... 是java中的数据类型,hadoop中并不识别.hadoop中有对应的数据类型
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {

    //map  方法的生命周期:  框架每传一行数据就被调用一次
    //key :  这一行的起始点在文件中的偏移量
    //value : 这一行的内容

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/**
 * 代码中  key   是行首字母的【偏移量】-->无规律可言,行首字母到所有内容最前端的
 *        value  是一行真正的数据
 */

        //1.将Text类型的value  转换成 string
        String datas = value.toString();

        //2.将这一行用 " " 切分出各个单词
        String[] words = datas.split(" ");

        //3.遍历数组,输出<单词,1>【一个单词输出一次】
        for (String word : words) {

            //输出数据
            //context   上下文对象
            context.write(new Text(word),new LongWritable(1));

        }

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

定义一个reducer类

package demo02;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Auther: 封茗囧菌
 * @Date: 2019/11/11 17:56
 * @Description:
 */
//计算单词的总和
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {


    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

        /**
         * key 表示的是单词
         * values 表示的是好多个1
         */

        //定义一个计数,用于求和
        int sum = 0;

        //遍历这一组  kv  的所有v ,累加到count中
        for (LongWritable value: values) {

            //.get可以将LongWritable类型转换成Integer
            sum +=value.get();

        }
         //输出结果
        context.write(key, new LongWritable(sum));

    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

定义一个主类,用来描述job并提交job

package demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @Auther: 封茗囧菌
 * @Date: 2019/11/11 18:06
 * @Description:
 */
public class WordCountRunner{
    //把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里……)描述成一个job对象
    //把这个描述好的job提交给集群去运行

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
         
        //创建一个Configuration实体类对象
        Configuration conf = new Configuration();
        Job wcjob = Job.getInstance(conf);
        // 指定我这个job所在的jar包
        // wcjob.setJar("/home/hadoop/wordcount.jar");
        wcjob.setJarByClass(WordCountRunner.class);

        wcjob.setMapperClass(WordCountMapper.class);
        wcjob.setReducerClass(WordCountReducer.class);

        //设置我们的业务逻辑Mapper 类的输出 key 和 value  的数据类型
        wcjob.setMapOutputKeyClass(Text.class);
        wcjob.setMapOutputValueClass(LongWritable.class);

        //设置我们的业务逻辑 Reducer 类的输入key 和 value 的数据类型
        wcjob.setMapOutputKeyClass(Text.class);
        wcjob.setOutputValueClass(LongWritable.class);

        long startTime=System.currentTimeMillis();   //获取开始时间

         
        //指定要处理的数据所在的位置
        FileInputFormat.setInputPaths(wcjob,"G:\\wordcount.txt");
        //指定处理完成之后的结果所保存的位置
        FileOutputFormat.setOutputPath(wcjob, new Path("E:\\result"));

        // 向yarn集群提交这个job
        boolean res = wcjob.waitForCompletion(true);
        long endTime=System.currentTimeMillis(); //获取结束时间
        System.out.println(res?0:1);
        System.out.println("程序运行时间: "+(endTime-startTime)+"ms");


    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

错误提醒:如果遇到这个错误,
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/":root:supergroup:drwxr-xr-x
直接将hdfs-site.xml当中的权限关闭即可

<property>
                <name>dfs.permissions</name>
                <value>false</value>
</property>
  • 1
  • 2
  • 3
  • 4

        重启hdfs集群,重新运行即可。(可以把项目打成jar包放在虚拟机上运行,也可以在IDEA上直接运行!这里介绍的是在IDEA上运行的效果)

        让我们来查看一下效果!
        在我们本地的E盘上,出现了result文件夹
在这里插入图片描述
        打开进入并用Notepad++ 打开文件查看内容!发现统计的结果已经呈现在里面了!说明我们的程序运行成功了!
在这里插入图片描述
        思路回顾:
        每读取一行数据,MapReduce就会调用一次map方法,在map方法中我们把每行数据用空格" "分隔成一个数组,遍历数组,把数组中的每一个元素作为key,1作为value作为map的输出传递给reduce。reduce把收集到的数据根据key值进行分区,把每个分区的内容进行单独计算,并把结果输出。

        本期的分享就到这里了,小伙伴们有什么疑惑或好的建议可以积极在评论区留言~,小菌会持续更新新鲜好玩的技术,喜欢的小伙伴们不要忘了点赞,关注小菌呐ヾ(◍°∇°◍)ノ゙。






在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/337653
推荐阅读
相关标签
  

闽ICP备14008679号