当前位置:   article > 正文

hadoop学习笔记_export hadoop

export hadoop

前言: 之前学习hadoop总结的笔记。

1. hadoop 伪分布式式安装
    1.1 修改ip地址,立即生效(service network restart)
    1.2 关闭防火墙
        1.2.1 关闭命令 service iptables stop
        1.2.2 查看防火墙状态 service iptables status
        1.2.3 关闭防火墙自动运行 
            1.2.3.1 关闭命令 chkconfig iptables off 
            1.2.3.2 执行查看 chkconfig --list | grep iptables
    1.3 设置主机名
        1.3.1 修改命令 vi /etc/sysconfig/network 重启查看 (hostname)
    1.4 SSH配置
        1.4.1 ssh-keygen -t rsa (免密码登陆)
        1.4.2 cd ~/.ssh 进行操作 cp id_rsa.pub authorized_keys 
    1.5 ip与hostname绑定
        1.5.1 执行命令 vi /etc/hosts
    1.6 安装jdk和hadoop
        1.6.1 tar -zxvf jdk-1.7.1.tar.gz  tar -zxvf hadoop.tar.gz 
        1.6.2 mv 文件夹 别名
    1.7 配置 etc/profile
        1.7.1  

  1. export JAVA_HOME=/usr/local/jdk
  2.             export SCALA_HOME=/usr/local/scala
  3.             export SPARK_HOME=/usr/local/spark
  4.             export JRE_HOME=${JAVA_HOME}/jre
  5.             export HADOOP_HOME=/usr/local/hadoop
  6.             export CLASS_PATH=.:${JAVA_HOME}/lib:${JAVA_HOME}/lib
  7.             export PATH=.:${PATH}:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${HADOOP_HOME}/bin


        1.7.2 验证: java -version 、 hadoop -version
    1.8 修改hadoop配置文件
        1.8.1 hadoop/conf 
            1.8.1.1 hadoop-env.sh 设置jdk目录
            1.8.1.2 core-site.xml               

  1. <property>
  2.                     <name>fs.default.name</name>
  3.                     <value>hdfs://hadoop:9000</value>
  4.                 </property>
  5.                 <property>
  6.                     <name>hadoop.tmp.dir</name>
  7.                     <value>/usr/local/hadoop/tmp</value>
  8.                 </property>

      1.8.1.3 hdfs-site.xml               

  1.   <property>
  2.                     <name>dfs.replication</name>
  3.                     <value>1</value>
  4.                 </property>
  5.                 <property>
  6.                     <name>dfs.permissions</name>
  7.                     <value>false</value>
  8.                 </property>
  9.                 <property>
  10.                     <name>dfs.name.dir</name>
  11.                     <value>/usr/local/hadoop/hdfs/name</value>
  12.                 </property>

            1.8.1.4 mapred-site.xml

  1. <property>
  2.                     <name>mapred.job.tracker</name>
  3.                     <value>hadoop:9001</value>
  4.                 </property>

        1.8.2 hadoop namenode -format 进行格式化配置文件
            1.8.2.1 start-all    启动hadoop
            1.8.2.2 jps             查看hadoop当前进程
                3249 JobTracker
                3371 TaskTracker
                3176 SecondaryNameNode
                3494 Jps
                2966 NameNode
                3073 DataNode
        1.8.3 C:\Windows\System32\drivers\etc 进行本地映射访问
2. HDFS(Hadoop Distributed File System)
    2.1 hadoop 
        2.1.1 hadoop fs -ls hdfs://hadoop:9000/ 完整的HDFS文件系统访问路径
        2.1.2 hadoop fs -lsr hdfs://hadoop:9000/ 完整的HDFS文件系统列表访问路径
        2.1.3 hadoop fs -ls / 同上一样的效果
        2.1.4 hadoop fs -ls mkdir /File_name 在HDFS文件系统中 创建文件夹或者文件
        2.1.5 hadoop fs -put 'source_path' 'target_source' 上传文件到hdfs
    2.2  hdfs-shell
        2.2.1 -ls      <path> 对hdfs下一级目录查看
        2.2.2 -lsr     <path> 对hdfs目录递归查看
        2.2.3 -mkdir   <path> 创建目录
        2.2.4 -put     <src>   <des> 从linux上传文件到hdfs
        2.2.5 -get     <des>   <src> 从hdfs下载到linux
        2.2.6 -text    <path> 查看文件内容
        2.2.7 -rm      <path> 删除文件
        2.2.8 -rmr     <path> 递归删除文件
        2.2.9 -help ls          hdfs-shell帮助
3. java进程pid
    3.1  NameNode(是整个文件系统的管理节点,接受用户的操作请求)
        3.1.1 fsimage:元数据镜像文件(储存某一时间内NameNode元数据信息)
        3.1.2 edits: 操作日志文件
        3.1.3 fstime: 保存最近一次checkpoint 的时间
    3.2 SecondaryNameNode(HA的一个解决方案)
        3.2.1 从NameNode的(fsimage,edits)下载元数据信息,然后把二者合并,然后生成新的fsimage在本地保存,然后将它推送到NameNode并重置NameNode的edits
        3.2.2 默认安装在NameNode的节点上(不安全)
    3.3 DataNode(提供真实数据的存储服务)
        3.3.1 基本存储单位(block:块)。HDFS默认Block大小为64MB,以256MB为例,Block=256/64=4块(超过Block按Block划分,如果不足按实际大小划分)
        3.3.2 HDFS中,如果一个文件小于一个数据块大小,并不占用整个数据块存储空间
        3.3.3 Replication 多复本。默认三个
4.对HDFS进行操作
    4.1 URL的方式访问HDFS       

  1. URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); //设置URL的访问形式支持hdfs
  2.         <!--Exception in thread "main" java.net.MalformedURLException: unknown protocol: hdfs-->
  3.         final URL url = new URL(path);
  4.         final InputStream in = url.openStream();
  5.         /*
  6.          * in 输入流
  7.          * out 输出流
  8.          * buffsize 缓存大小
  9.          * close 传输完成后是否关闭  
  10.          */
  11.         IOUtils.copyBytes(in, System.out, 1024, true); 

    4.2 FileSystem操作       

  1.  //FileSystem
  2.          /*
  3.          *   URI arg0 :hdfs路径
  4.          *   Configuration arg1:
  5.          */ 
  6.         final FileSystem fileSystem =    FileSystem.get(new URI(path), new Configuration());
  7.         4.2.1  //创建文件夹 hadoop fs -mkdir file_name
  8.             fileSystem.mkdirs(new Path("/b"));
  9.         4.2.2  //删除文件夹 hadoop fs -rm file_name
  10.             /*
  11.             * Path arg0:hdfs路径
  12.             * boolean arg1:退出删除:如果是文件夹(true),文件(true or false) 
  13.             */
  14.             fileSystem.delete(new Path("/b"),true);
  15.         4.2.3 //上传文件 hadoop fs -put  <src>   <des>
  16.             final FileInputStream in = new FileInputStream(new File("D:/file.txt"));
  17.             final FSDataOutputStream out = fileSystem.create(new Path("/c/"));
  18.             IOUtils.copyBytes(in, out, 1024,true);
  19.         4.2.4 //下载文件 hadoop fs -get file_name  <des>   <src>
  20.             final FSDataInputStream in =  fileSystem.open(new Path("/c/"));
  21.             IOUtils.copyBytes(in, System.out, 1024,true);
  22.         4.2.5 //浏览文件 hadoop fs -text <path>
  23.             final FileStatus[] fs =     fileSystem.listStatus(new Path("/c/"));
  24.             for (FileStatus fileStatus : fs) {
  25.                 System.out.println(fileStatus.isDir()?"文件":"文件夹");
  26.                 System.out.println(fileStatus.getPath().toString() + "  : " + fileStatus.getBlockSize() + " : " + fileStatus.getPermission() + ":" + fileStatus.getLen() );
  27.             }

5.RPC (Remote Procedure Call)远程过程调用--指的是不是同一个进程(至少两个过程[调用方、被调用方])
    5.1 client 主动发起请求,指定ip和port的server方法,把结果返回给 client
    5.2 RPC 是hadoop的基础 
        5.2.1 服务端 MyServer               

  1.  /** Construct an RPC server.
  2.                  * @param instance the instance whose methods will be called 实例的方法将被调用
  3.                  * @param conf the configuration to use 配置使用
  4.                  * @param bindAddress the address to bind on to listen for connection 地址绑定监听连接
  5.                  * @param port the port to listen for connections on 端口上监听连接
  6.                  * @param numHandlers the number of method handler threads to run 方法处理程序线程的数量
  7.                  * @param verbose whether each call should be logged 每个调用是否应该被记录
  8.                  */
  9.             final Server server = RPC.getServer(new MyProtocol(), address, port, new Configuration());
  10.             server.start();

        5.2.2 客户端 MyClient             

  1. /**
  2.                * Get a proxy connection to a remote server 得到一个代理连接到远程服务器
  3.                * @param   Class<? extends VersionedProtocol> protocol protocol class 协议类
  4.                * @param long clientVersion client version 客户端版本
  5.                * @param  InetSocketAddress addr remote address 远程地址
  6.                * @param Configuration  conf configuration to use  配置使用
  7.                * @param  int connTimeout time in milliseconds before giving up 时间以毫秒为单位之前放弃
  8.                * @return the proxy
  9.                * @throws IOException if the far end through a RemoteException
  10.                */
  11.              MyProtocolInterface protocol =    (MyProtocolInterface) RPC.waitForProxy( MyProtocolInterface.class, 
  12.                 23456L, new InetSocketAddress(MyServer.address, MyServer.port), new Configuration());
  13.              String result = protocol.hello("hadoop"); //代码调用发生在服务端
  14.              System.out.println(result);
  15.              RPC.stopProxy(protocol);//关闭网络连接    

        5.2.3 协议接口 
              报错解决
            /* Exception in thread "main" java.lang.IllegalArgumentException: rpc.MyProtocol is not an interface */
            public interface MyProtocolInterface  extends VersionedProtocol {
            public abstract String hello(String name);        
            }
    5.3 RPC 实例理解
        5.3.1 RPC 是一个远程过程调用
        5.3.2 当客户端调用服务端的方法,意味调用的是服务端的对象的方法
        5.3.3 如果服务端的对象允许客户端进行调用,那么这个对象必须实现VersionedProtocol接口,因为RPC内部使用的是JDK的代理对象
6. MapReduce 一种分布式框架
    6.1 MR(Map和Reduce),用户需要实现map()和reduce()两个函数,这两个函数的形象是key、value对,表示函数的输入信息
    6.2 MapReduce 执行顺序
        6.2.1 读取hdfs中的文件,每一个解析成<k,v> 每一个k,v调用一次map()函数
        6.2.2 覆盖map()函数,接收产生的<k,v>处理 转换成新的<k,v>输出
            6.2.1.1                     

  1.  /** @param K1 key the input key.   k1  每一行的起始位置 偏移量(offset)
  2.                    * @param V1 value the input value. v1  每一行中的文本的内容
  3.                    * @param OutputCollector<K2, V2> output collects mapped keys and values. k2 每一行中的每个单词
  4.                    * @param Reporter reporter facility to report progress. v2 每一行中的每个单词出现的次数,默认1
  5.                    */
  6.                    static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
  7.                         @Override
  8.                         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  9.                                final String[] splited = value.toString().split("/");
  10.                                for (String world : splited) {
  11.                                  context.write(new Text(world), new LongWritable(1L));
  12.                             }
  13.                         }
  14.                    }

        6.2.3 对产生的<k,v> 和新产生的 <k,v> 进行一个分区(默认是一个区)
        6.2.4 对不同分区中的数据进行排序(按照k)、分组。分组指的是相同key的value放到一个集合中。
        6.2.5 (可选)对分组后的数据进行规约。
        6.2.6 多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上。
        6.2.7 对多个map的输出进行合并、排序。覆盖reduce函数,接收的是分组后的数据,实现自己的业务逻辑,处理后,产生新的<k,v>输出。
        6.2.8 对reduce输出的<k,v>写到hdfs中。
            6.2.7.1  

  1.  /*
  2.                    * @param k2 key the key. k2 每一行中的每个单词
  3.                    * @param Iterator<V2>  values the list of values to reduce.v2 每一行中的每个单词出现的次数,默认1
  4.                    * @param   OutputCollector<K3, V3> output to collect keys and combined values.  k3  整个文件中的不同单词
  5.                    * @param  Reporter reporter facility to report progress.  v3  整个文件中不同单词出现的次数
  6.                    */
  7.                    static class MyReducer extends Reducer<Text,LongWritable, Text, LongWritable >{
  8.                         @Override
  9.                         protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  10.                         Long sum = 0l;
  11.                         for (LongWritable v2 : v2s) {
  12.                                 sum += v2.get();
  13.                             }
  14.                            context.write(k2, new LongWritable(sum));
  15.                         }
  16.                    }      

         6.2.9 提交作业到jobtracker
                

  1. Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
  2.                  FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),new Configuration());
  3.                  if(fileSystem.exists(new Path(OUTPUT_PATH))){
  4.                     fileSystem.delete(new Path(OUTPUT_PATH), true);
  5.                  }
  6.                  //指定输入文件
  7.                  FileInputFormat.setInputPaths(job, INPUT_PATH);
  8.                  //指定哪个类用格式输入文件
  9.                  job.setInputFormatClass(TextInputFormat.class);--可省略                 
  10.                  //设置字定义mapper类
  11.                  job.setMapperClass(MyMapper.class);
  12.                 // 指定<k2,v2>输出类型
  13.                  job.setMapOutputKeyClass(Text.class);--可省略
  14.                  job.setMapOutputValueClass(KpiWriterable.class);--如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
  15.                  //指定分区
  16.                  job.setPartitionerClass(HashPartitioner.class);--可省略
  17.                  job.setNumReduceTasks(1);            --可省略    
  18.                  //4-4 TODO 排序、分组、(可选)归纳
  19.                  //设置自定义reduce
  20.                  job.setReducerClass(MyReducer.class);
  21.                  //指定<k3,v3>输出类型
  22.                  job.setMapOutputKeyClass(Text.class);
  23.                  job.setMapOutputValueClass(KpiWriterable.class);                 
  24.                  //指定输出到哪里
  25.                  FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
  26.                  //设置输出文件的格式化类
  27.                  job.setOutputFormatClass(TextOutputFormat.class);--可省略                 
  28.                  //提交到jobtracker
  29.                  job.waitForCompletion(true);


        6.2.10 mapreduce执行过程 
            6.2.10.1 map、reduce键值对格式
                    函数        输入键值对    输出键值对
                    map()         <k1,v1>      <k2,v2>
                    reduce()     <k2,{v2}>      <k3,v3>    
            6.2.10.2 map任务处理
                6.2.10.2.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
                6.2.10.2.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
                6.2.10.2.3 对输出的key、value进行分区。
                6.2.10.2.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
                6.2.10.2.5 (可选)分组后的数据进行归约。
            6.2.10.3 reduce任务处理
                6.2.10.3.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
                6.2.10.3.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
                6.2.10.3.3 把reduce的输出保存到文件中.    
        6.2.11 JobTracker
            6.2.11.1 负责接收用户提交的作业,负责启动、跟踪任务执行。
            6.2.11.2 JobSubmissionProtocol是JobClient与JobTracker通信的接口。
            6.2.11.3 InterTrackerProtocol是TaskTracker与JobTracker通信的接口。
        6.2.12 TaskTracker
            6.2.12.1 负责执行任务。
        6.2.13 JobClient
            6.2.13.1 是用户作业与JobTracker交互的主要接口。
            6.2.13.2 负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。
        6.2.14 MapReduce驱动默认的设置
            InputFormat(输入)    TextInputFormat
            MapperClass(map类)    IdentityMapper
            MapOutputKeyClass    LongWritable
            MapOutputValueClass    Text
            PartitionerClass    HashPartitioner
            ReduceClass        IdentityReduce
            OutputKeyClass        LongWritable
            OutputValueClass    Text
            OutputFormatClass    TextOutputFormat
        6.2.15 Writable接口<MR的任意Key和Value必须实现Writable接口.>
            6.2.15.1 Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.                

  1. public interface Writable {
  2.                   /** 
  3.                    * Serialize the fields of this object to <code>out</code>.
  4.                    */
  5.                   void write(DataOutput out) throws IOException;
  6.                   /** 
  7.                    * Deserialize the fields of this object from <code>in</code>.  
  8.                    */
  9.                   void readFields(DataInput in) throws IOException;
  10.                 }

            6.2.15.2 MR的任意key必须实现WritableComparable接口
                public interface WritableComparable<T> extends Writable, Comparable<T> {}
        6.2.16 MapReduce 0.*版本的书写
            6.2.16.1 final JobConf job = new JobConf(Old.class);
                 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
                 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
                 JobClient.runJob(job);

        6.2.17 MapReduce 1.*版本的书写
            6.2.17.1 final Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
                 FileInputFormat.setInputPaths(job, INPUT_PATH);
                 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
                 job.waitForCompletion(true);

        6.2.18 /**
             * 问:为什么使用Combiner?
             * 答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
             * 
             * 问:为什么Combiner不作为MR运行的标配,而是可选步骤哪?
             * 答:因为不是所有的算法都适合使用Combiner处理,例如求平均数。
             *
             * 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作哪?
             * 答:combiner操作发生在map端的,处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。
             *
             */
      6.3 代码如何提交到jobtracker
        6.3.1 调用的job.waitForCompletion(true)进行提交
            6.3.1.1 实际上执行的 connect();
                    info = jobClient.submitJobInternal(conf);
            6.3.1.2 在connect()方法中,实际上创建了一个JobClient对象。
                   在调用该对象的构造方法时,获得了JobTracker的客户端代理对象JobSubmissionProtocol。
                   JobSubmissionProtocol的实现类是JobTracker。
            6.3.1.3 在jobClient.submitJobInternal(conf)方法中,调用了
                JobSubmissionProtocol.submitJob(...),
                即执行的是JobTracker.submitJob(...)。
7. Hadoop的基本类型(必须实现Writerable接口)
    7.1 java类型和hadoop类型对照
        7.1.1 Long    LongWritable
              Integer    IntWritable
              Boolean    BooleanWritable
              String    Text
        7.1.2 java类型如何转化为hadoop基本类型
            7.1.2.1 调用hadoop类型的构造方法,或者调用set()方法。
                    new LongWritable(123L);
        7.1.3 hadoop基本类型如何转化为java类型
            7.1.2.2 对于Text,需要调用toString()方法,其他类型调用get()方法
8. Reduce获取命令行
    8.1

  1. public class WorldCount extends Configured implements Tool{
  2.     public static void main(String[] args) throws Exception {
  3.         ToolRunner.run(new Configuration(), new CacheFileApp(), args);  
  4.     }
  5.     @Override
  6.     public int run(String[] args) throws Exception {
  7.             Path in = new Path(args[0]);  
  8.         Path out = new Path(args[1]);  
  9.     }
  10. }

9. Combiners(组合器)编程
    9.1 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
    9.2 combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
    9.3 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
    9.4 注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
    9.5  

  1. static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
  2.            @Override
  3.         protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  4.               System.out.println("输入分组:" + k2.toString() +",....");
  5.                long times = 0L;
  6.                for (LongWritable count : v2s) {
  7.                 times += count.get();
  8.                 //输入的k2,v2    的键值对数量
  9.                 System.out.println("键值对:<"+ k2.toString()  +", " + count.get()+">");
  10.             }
  11.                context.write(k2,new LongWritable(times));
  12.            }
  13.        }            

10. Partitioner(分区)
    10.1 Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
    10.2 HashPartitioner是mapreduce的默认partitioner。
        10.2.1 计算方法
            which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
    10.3 分区的例子必须打成jar运行(job.setJarByClass)
        10.3.1     

  1. static class MyPartitioner extends HashPartitioner<Text, KpiWriterable> {
  2.             @Override
  3.             public int getPartition(Text key, KpiWriterable value, int numReduceTasks) {
  4.                 return (key.toString().length() == 11) ? 0 : 1;
  5.             }
  6.         }

        10.3.2 根据业务需要,产生多个输出文件
         10.3.3 多个reduce任务在运行,提高整体job的运行效率
11. Shuffle
    11.1 每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
    11.2 写磁盘前,要partition,sort。如果有combiner,combine排序后数据。
    11.3 等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
        11.1.1 Reducer通过Http方式得到输出文件的分区。
        11.2.2 TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
        11.3.3 排序阶段合并map输出。然后走Reduce阶段。
12. hadoop分布式安装
    12.1 分布结构
        12.1.1 主节点(1个,是hadoop0):NameNode、JobTracker、SecondaryNameNode
        12.1.2 从节点(2个,是hadoop1、hadoop2):DataNode、TaskTracker
    12.2 各节点重新产生ssh加密文件
    11.3 编辑各个节点的/etc/hosts,在该文件中含有所有节点的ip与hostname的映射信息
    12.4 两两节点之间的SSH免密码登陆
        12.4.1 ssh-copy-id -i  hadoop1
        12.4.2 scp /root/.ssh/authorized_keys   hadoop1:/root/.ssh/ 
    12.5 把hadoop0的hadoop目录下的logs和tmp删除
    12.6 把hadoop0中的jdk、hadoop文件夹复制到hadoop1和hadoop2节点
        12.6.1 scp  -r /usr/local/jdk
        12.6.2 hadoop1:/usr/local/
    12.7 把hadoop0的/etc/profile复制到hadoop1和hadoop2节点,在目标节点中执行source  /etc/profile
    12.8 编辑hadoop0的配置文件slaves,改为从节点的hostname,分别是hadoop1和hadoop2
    12.9 格式化,在hadoop0节点执行hadoop namenode -format
    12.10 启动,在hadoop0节点执行start-all.sh
    12.11 注意:对于配置文件core-site.xml和mapred-site.xml在所有节点中都是相同的内容。
    12.12 动态的增加一个hadoop节点
        12.12.1 配置新节点的环境
        12.12.2 把新节点的hostname配置到主节点的slaves文件中
        12.12.3 在新节点,启动进程
                12.12.3.1 hadoop-daemon.sh start datanode
                12.12.3.2 hadoop-daemon.sh start tasktracker
        12.12.4 在主节点执行脚本 hadoop dfsadmin -refreshNodes
13. ZooKeeper 是 Google 的 Chubby一个开源的实现,是 Hadoop 的分布式协调服务
    13.1 它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等
    13.2 可以用来保证数据在ZooKeeper集群之间的数据的事务性一致。
    13.3 搭建ZooKeeper服务器集群
        13.3.1 ZooKeeper服务器集群规模不小于3个节点,要求各服务器之间系统时间要保持一致。
        13.3.2 在hadoop0的/usr/local目录下,解压缩zk....tar.gz,设置环境变量
        13.3.3 在conf目录下,修改文件 vi zoo_sample.cfg  zoo.cfg
        13.3.4 编辑该文件,执行vi zoo.cfg
            13.3.4.1 修改dataDir=/usr/local/zk/data
            13.3.4.2 新增server.0=hadoop0:2888:3888
            13.3.4.3 server.1=hadoop1:2888:3888
            13.3.4.4 server.2=hadoop2:2888:3888
        13.3.5 创建文件夹mkdir /usr/local/zk/data
        13.3.6 在data目录下,创建文件myid,值为0
        13.3.7 把zk目录复制到hadoop1和hadoop2中
        13.3.8 把hadoop1中相应的myid的值改为1
            13.3.8.1 把hadoop2中相应的myid的值改为2
        13.3.9 启动,在三个节点上分别执行命令zkServer.sh start
        13.3.10 检验,在三个节点上分别执行命令zkServer.sh status 
14. Hive(Hadoop Database)
    14.1 一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。
         HBase利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协调工具。
        14.1.1 表(table),储存管理数据
        14.1.2 行健(row key),类似mysql的主键,行健是HBase表天然自带的
        14.1.3 HBase中列族是需要在定义表时指定的,列是在插入记录时动态增加的。
            HBase表中的数据,每个列族单独一个文件。
        14.1.4 时间戳(timestamp),列(也称作标签、修饰符)的一个属性。
            行键和列确定的单元格,可以存储多个数据,每个数据含有时间戳属性,数据具有版本特性。    
            如果不指定时间戳或者版本,默认取最新的数据。
        14.1.5 存储的数据都是字节数组。
        14.1.6 表中的数据是按照行键的顺序物理存储的。

    14.2 HBase的物理模型
        14.2.1 HBase是适合海量数据(如20PB)的秒级简单查询的数据库。
        14.2.2 HBase表中的记录,按照行键进行拆分, 拆分成一个个的region。
            许多个region存储在region server(单独的物理机器)中的。
            这样,对表的操作转化为对多台region server的并行查询。
        14.2.3.HBase的体系结构
            14.2.3.1 HBase是主从式结构,HMaster、HRegionServer
    14.3 Hive 伪分布安装
        14.3.1 解压缩、重命名、设置环境变量
        14.3.2 修改$HBASE_HOME/conf/hbase-env.sh,修改内容如下:            
            export JAVA_HOME=/usr/local/jdk
            export HBASE_MANAGES_ZK=true
        14.3.3 修改$HBASE_HOME/conf/hbase-site.xml,修改内容如下:            

  1. <property>
  2.               <name>hbase.rootdir</name>
  3.               <value>hdfs://hadoop:9000/hbase</value>
  4.             </property>
  5.             <property>
  6.               <name>hbase.cluster.distributed</name>
  7.               <value>true</value>
  8.             </property>
  9.             <property>
  10.               <name>hbase.zookeeper.quorum</name>
  11.               <value>hadoop</value>
  12.             </property>
  13.             <property>
  14.               <name>dfs.replication</name>
  15.               <value>1</value>
  16.             </property>

        14.3.4 (可选)文件regionservers的内容为hadoop
        14.3.5 启动hbase,执行命令start-hbase.sh
        启动hbase之前,确保hadoop是运行正常的,并且可以写入文件
        14.3.6 验证:
            14.3.6.1 执行jps,发现新增加了3个java进程,分别是HMaster、HRegionServer、HQuorumPeer
            14.3.6.2 使用浏览器访问http://hadoop:60010
        14.3.7 hbase的机群搭建过程
            14.3.7.1 集群结构
                14.3.7.1.1 主节点(hmaster)是hadoop
                14.3.7.1.2 从节点(region server)是hadoop1和hadoop2
            14.3.7.2 修改hadoop0上的hbase的几个文件
                14.3.7.2.1 修改hbase-env.sh的最后一行export HBASE_MANAGES_ZK=false
                14.3.7.2.2 修改hbase-site.xml文件的hbase.zookeeper.quorum的值为hadoop,hadoop1,hadoop2
                14.3.7.2.3 修改regionservers文件(存放的region server的hostname),内容修改为hadoop1、hadoop2
            14.3.7.2 复制hadoop中的hbase文件夹到hadoop1、hadoop2中    
            14.3.7.3 复制hadoop中的/etc/profile到hadoop1、hadoop2中,在hadoop1、hadoop2上执行source /etc/profile
            14.3.7.4 首先启动hadoop,然后启动zookeeper集群。
        最后在hadoop上启动hbase集群。
15. Pig 基于hadoop的一个数据处理的框架
    15.1 MapReduce是使用java进行开发的,Pig有一套自己的数据处理语言,Pig的数据处理过程要转化为MR来运行。
        15.1.1 Pig的数据处理语言是数据流方式的,类似于初中做的数学题。
        15.1.2 Pig数据类型
            15.1.2.1 int、long、float、double、chararry、bytearray
            15.1.2.2 复合数据类型:Map、Tuple、Bag  
            15.1.2.3 Bag的类型如{('age',31),('name','张三')}
    15.2 安装Pig
        15.2.1 把pig-0.11.1.tar.gz复制到/usr/local下
        15.2.2 使用命令tar -zxvf  pig-0.11.1.tar.gz解压缩
        15.2.3 使用命令mv pig-0.11.1  pig 进行重命名
        15.2.4 编辑文件vi /etc/profile 设置环境变量
        15.2.5 编辑文件$PIG_HOME/conf/pig.properties,增加两行如下内容
            fs.default.name=hdfs://hadoop:9000
            mapred.job.tracker=hadoop:9001   

 


 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/634034
推荐阅读
相关标签
  

闽ICP备14008679号