赞
踩
Hadoop简介
Hadoop项目由多个子项目组成。与其他项目不同,这个项目更像一个生态系统。其中,核心项目包括HDFS、MapReduce框架、YARN和ZooKeeper。HDFS是一个符合Hadoop要求的分布式文件系统的实现。这个定义看起来比较复杂,其实解释起来一点儿都不难。首先Hadoop本身不包括文件系统,只有一个规范。任何实现了这些规范的文件系统都可以被Hadoop识别并使用。HDFS只是一个实现了这些规范的软件。如果读者有兴趣可以自己实现一个。MapReduce是一个计算框架。这个框架提供很多API和一组编程规范。开发人员可以借助这个框架制定计算规则,并且向Hadoop提交作业。YARN可以理解为一个工具,这个工具帮助各种分布式计算框架在分布式环境中工作。ZooKeeper是个HA工具。这个工具能够保证Hadoop集群能在一两台master宕机的情况下,任然可以正常工作。
好吧,大家请想象如下场景。我们需要一个分布式环境以完成复杂的计算任务。这个环境有着如下要求。首先,这个系统能够联合众多廉价的设备,而不是成本高昂的设备。目前市场上有昂贵的中型机、大型机等高端服务器,也有更多的PC结构的x86服务器。这些服务器价格更低。第二、我们希望这个环境更开放,能够适应尽量多的计算要求。基于这个基本的需求,我们可以做如下几个工作。显然我们需要一个分布式的文件系统保存数据。然后我们需要一个规则来满足计算要求。因为这些计算工作往往是在集群内的多台机器上完成的,而不像我们熟悉的Office或其他软件是运行在一台PC或其他的什么单独的设备上的。为了提供更大的自由度,这个规则被分为两个部分。一个是更基础一些的规则,一个是更常用一些的规则。更基础一些的规则说明如何和分布式系统交流,另一个规则则规定开发人员如何处理数据。最后,为了保证集群能够在现实环境中工作,我们需要提供HA的功能。实验室环境允许集群宕机。但是现实环境要求更严格,在部分节点宕机后,集群仍要继续工作。
现在我们有了四个规范。第一个是分布式计算环境规范,包括分布式文件系统规范。第二个是分布式环境的使用规范。第三个是弹性计算规范。第四个是高可用性规范。到目前为止我们还没有任何具体的实现,只有一堆写着纸上的规范。
接下来我们可以逐步实现这些规范。HDFS、MapReduce、YARN、ZooKeeper等都可以看成是这些规范的实现。如果读者有兴趣,同样可以自己尝试着实现这些规范。
除了这些核心项目,hadoop还包括像Pig、HBase、HLive这些常用的工具。
Ambari是一个Web界面的Hadoop集群管理工具。
Avro 是一个数据序列化工具。
Cassandra 是一个工作在Hadoop集群之上的数据库。
HBase 是一个运行在Hadoop之上的NoSQL数据库。
HIVE是数据仓库软件。
Mahout 是一个运行在Hadoop之上的机器学习和数据挖掘工具。
Pig 一个并行计算的框架。
Spark一个层级比较高的并行计算框架。
我相信,随着深度学习的发展,Hadoop的生态圈还会逐步扩大。越来越多的新项目将会加入这个阵营。
Hadoop安装及配置
一. 准备阶段
1. 网络环境规划。
首先您需要对目标网络进行规划。在这里,我们使用VMWare虚拟机模拟一个拥有7台设备的简单集群环境。不熟悉VMWare的同学可以使用其他虚拟机代替,比如Virtual Box或KVM。在后面的部分,我们会讨论Hadoop在单机和伪分布模式下的部署和配置。届时,读者可以任选一台作为练习设备。
设备名及IP规划如下:
表格 1 集群环境下的设备规划表
设备名 | IP | 角色 |
hadoop-01 | 192.168.31.201 | master |
hadoop-02 | 192.168.31.202 | second master |
hadoop-03 | 192.168.31.203 | slave |
hadoop-04 | 192.168.31.204 | slave |
hadoop-05 | 192.168.31.205 | slave |
hadoop-06 | 192.168.31.206 | slave |
hadoop-07 | 192.168.31.207 | slave |
2. 安装Linux系统。这里我们选择CentOS 7作为例子。
3. 修改操作系统的IP地址以及DNS。
系统安装完后需要将系统IP设置为规划IP。在CentOS 7系统中,您可以选用nmcli命令查看正在使用的连接名称。
$ nmcli connection show |
本例中,系统连接名称为eno16777736。在系统的/etc/sysconfig/network-scripts目录下可以找到ifcfg-eno16777736文件。该文件为该连接的配置文件。
$ ls /etc/sysconfig/network-scripts ifcfg-eno16777736 |
接下来,您需要使用VI修改ifcfg-eno16777736文件,不熟悉VI命令的同学可以参考附件中的VI简易教程。修改/etc下的文件首先需要使用root用户登录系统。不加参数的su命令为切换到root用户。exit命令为退出当前用户。
$ su Password: ###### $ vi /etc/sysconfig/network-scripts/ifcfg-eno16777736 |
命令行界面将会进入VI普通模式。
TYPE=Ethernet BOOTPROTO=static DEFROUTE=yes PEERROUTES=yes IPV4_FAILURE_FATAL=no IPV6INIT=yes IPV6_AUTOCONF=yes IPV6_DEFROUTE=yes IPV6_PEERDNS=yes IPV6_PEERROUTERS=yes IPV6_FAILURE_FATAL=no NAME=eno16777736 UUID=61eb247f-41e8-bc5f-8443b8c948b0 DEVICE=eno16777736 NOBOOT=yes IPADDR=192.168.31.201 NETMASK=255.255.255.0 GATEWAY=192.168.31.1 ~ ~ ~ ~ “/etc/sysconfig/network-scripts/ifcfg-eno16777736” 20L, 373C |
接下来我们需要修改DNS设置。同样在root身份下,使用VI编辑/etc/resolv.conf文件。
$ vi /etc/resolv.conf |
命令行界面将会进入VI普通模式。切换到编辑模式后,我们需要在该文件中添加目标DNS的IP地址。
nameserver 192.168.31.1 |
修改完成后,网络服务需要重新启动以加载修改后的配置文件。
$ service network restart Restarting network (via systemctl) [ OK ] |
接下来,我们可以使用PING命令验证上面的配置是否正确。
$ ping www.baidu.com PING www.a.shifen.com (111.13.100.91) 56(84) bytes of data. 64 bytes from 111.13.100.91: icmp_seq=1 ttl=53 time=7.60 ms 64 bytes from 111.13.100.91: icmp_seq=1 ttl=53 time=9.26 ms 64 bytes from 111.13.100.91: icmp_seq=1 ttl=53 time=7.30 ms |
4. 修改主机名。
CentOS 7默认的主机名为localhost,我们需要将其修改为设备规划表中的设备名,以区别网络中的设备。修改主机名需要修改/etc/hostname文件和/etc/hosts文件。前者修改主机名,后者修改主机名的IP映射。如果您的网络中存在DNS,则需要修改DNS中相关设置。
$ vi /etc/hostname 首先编辑hostname文件 |
Hadoop-01 在VI界面中编辑主机名后保存退出 |
$ vi /etc/hosts 编辑hosts文件 |
192.168.31.201 hadoop-01 在VI界面中添加主机名的IP映射 |
5. 创建用户及用户组。
6. 创建SSH密钥,实现SSH“免密”登录。
Hadoop需要频繁通过SSH与集群中其他设备进行通讯。如果以默认的用户名/密码作为验证手段,会极大的增加管理员的工作量,并且降低整个集群的工作效率。所以我们需要使用证书验证的方式,实现“免密”登录。
首先我们需要创建密钥。密钥包括公钥和私钥两种。对于加密技术比较感兴趣的同学可以参考附件中Open SSL的参考手册深入学习。这里我们使用ssh-keygen命令产生密钥。加密算法,我们选择RSA算法。
$ ssh-keygen –t rsa Generating public/private rsa key pair. Enter file in which to save the key (/home/Hadoop/.ssh/id_rsa): Created directory ‘/home/Hadoop/.ssh’. Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /home/Hadoop/.ssh/id_rsa. Your public key has been saved in /home/Hadoop/.ssh/id_rsa.pub. The key fingerprint is: f7:35:99:7c:17:14:49:45:66:a9:74:5e:4d:b2:ec:b0 Hadoop@hadoop-01 The key’s randomart image is: +--[ RSA 2048]----+ | o*%| | o.0o| | o *..| | * +.| | S . E B o| | | +-----------------+ $ cat .ssh/id_rsa.pub >> .ssh/authorized_keys 生成authorized_keys $ chmod –R 700 .ssh 修改.ssh目录以及其下所有文件的访问权限 |
生成密钥后,我们可以使用SSH登录到本机,验证我们的密钥是否安装正确。
$ ssh Hadoop@hadoop-01 The authenticity of host 'hadoop-01 (::1)' can't be established. ECDSA key fingerprint is e3:0c:d1:5c:ea:2e:cd:3f:af:2f:a5:09:31:ed:30:c4. Are you sure you want to continue connecting (yes/no) ? |
由于.ssh目录下的host_knowns文件中没有注册hadoop-01这个设备,所以系统提示hadoop-01这台目标设备不可信任,询问是否要继续。我们选择yes后,系统会将hadoop-01这台设备自动加入host_knowns文件,下次再使用ssh登录时就不会再提示了。
Enter passphrase for key ‘/home/Hadoop/.ssh/id_rsa’: Last login: Mon Sep 19 08:34:01 2016 |
这里需要输入密钥的密码。如果密码为空,则表示该密钥不需要密码,也就不会出现上面的提示。虽然集群一般工作在内网环境中,环境安全性比较好,并且密码为空这种方法比较简单。但是我并不推荐大家使用这种方法。密码为空的密钥已经丧失了其作为安全工具的意义。从安全的角度来讲并不是一种好的方式。作为替代方案,我们推荐使用ssh-agent。在Tom White所著的《Hadoop权威指南》一书中也提到了这种方法。
ssh-agent是一个密钥缓存,可以帮助用户转发密钥,实现免密登录。首先我们在命令行安装ssh-agent。
$ yum install ssh-agent |
安装完成后,修改/etc/ssh/ssh_config文件。添加ForwardAgent yes。或者在命令行执行:
$ echo "ForwardAgent yes" >> /etc/ssh/ssh_config |
接下来修改用户名下的ssh配置:
$ touch ~/.ssh/config $ssh-agent bash $ssh-add |
ssh-add命令添加密钥。使用带-i的ssh-add命令可以查看ssh-agent中已保存的密钥。另外,在.bashrc文件中加入相应的脚本,可以实现开机启动。
if [ -f ~/.agent.env ]; then . ~/.agent.env >/dev/null if ! kill -0 $SSH_AGENT_PID >/dev/null 2>&1; then echo "Stale agent file found. Spawning new agent..." eval `ssh-agent |tee ~/.agent.env` ssh-add fi else echo "Starting ssh-agent..." eval `ssh-agent |tee ~/.agent.env` ssh-add fi |
7. 安装Java。
Hadoop运行在JVM中,所以需要先安装Java环境。Java环境分为JRE和JDK两种。JRE是Java Runtime Environment的缩写。JDK是Java Development Kit的缩写。其中JDK包含开发工具以及JRE。Java环境的发行版本中有GNU组织的GCC工具,也有OpenJDK等不同的版本。目前还没有相应的测试数据表明这些兼容的环境可以安全的执行Hadoop。所以本文推荐以Oracle的JDK为Java环境。可选择的JDK版本为:
表格 2 可选JDK版本 (出自hadoop官网)
版本 | 状态 | 测试报告提供方 |
oracle 1.7.0_15 | Good | Cloudera |
oracle 1.7.0_21 | Good (4) | Hortonworks |
oracle 1.7.0_45 | Good | Pivotal |
openjdk 1.7.0_09-icedtea | Good (5) | Hortonworks |
oracle 1.6.0_16 | Avoid (1) | Cloudera |
oracle 1.6.0_18 | Avoid | Many |
oracle 1.6.0_19 | Avoid | Many |
oracle 1.6.0_20 | Good (2) | LinkedIn, Cloudera |
oracle 1.6.0_21 | Good (2) | Yahoo!, Cloudera |
oracle 1.6.0_24 | Good | Cloudera |
oracle 1.6.0_26 | Good(2) | Hortonworks, Cloudera |
oracle 1.6.0_28 | Good | |
oracle 1.6.0_31 | Good(3, 4) | Cloudera, Hortonworks |
这里我们选择较新的版本 oracle 1.7.0_15作为Java环境。首先我们需要从oracle官网下载安装包。JDK在官网上有rpm和tar.gz两种格式。RPM是CentOS下默认的安装包格式,可以使用rpm命令直接安装。tar.gz本身是压缩的打包文件,无需安装,解压即可。我们这里选择tar.gz格式的版本下载。关于rpm格式版本的安装和配置请参阅附件相关内容。在hadoop的官方wiki文档中明确指出,JDK需要使用64位的。
下载安装后,我们可以通过SCP命令上传到Linux服务器。
$ scp jdk_1.7.0_15.tar.gz hadoop@hadoop-01:/home/hadoop |
回到hadoop-01,解压在/home/hadoop目录下上传上来的jdk_1.7.0_15.tar.gz文件,并且修改解压后的文件归属及访问权限。然后将解压后的文件夹移至/opt目录。
$ tar –xzvf jdk_1.7.0_15.tar.gz 解压 $ chown Hadoop:Hadoop jdk_1.7.0_15 修改目录归属 $ chmod –R 700 jdk_1.7.0_15 修改目录访问权限 $ sudo mv jdk_1.7.0_15 /opt/jdk_1.7.0_15 移动解压后的目录到/opt下 |
接下来我们设置环境变量。使用vi编辑.bashrc文件,添加如下内容:
export JAVA_HOME=/opt/jdk_1.7.0_15 export CLASSPATH=${JAVA_HOME}/lib export PATH=${PATH}:${JAVA_HOME}/bin |
配置结束后,使用source命令重载~/.bashrc文件,使刚才的设置生效。然后在命令行输入java –version命令查看刚才的设置是否正确。
8. 下载和解压Hadoop,与JDK类似,将解压后的目录移至/opt目录下。
二. 安装部署Hadoop
Hadoop的部署分为单机、伪分布和分布式三种模式。我们首先介绍单机部署。单机部署常见于开发环境,方便开发人员调试。单机模式下,Hadoop运行在单Java进程中,不需要其他配置,解压后可以直接运行样例程序。
现在我们可以执行例子来感受下 Hadoop 的运行。Hadoop 附带了丰富的例子,包括 wordcount、terasort、join、grep 等。
$ ./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar 可以查看所有的例子 |
在此我们选择运行 grep 例子,我们将 input 文件夹中的所有文件作为输入,筛选当中符合正则表达式 dfs[a-z.]+ 的单词并统计出现的次数,最后输出结果到 output文件夹中。运行完毕后,我们可以再output目录中找到输出结果。
$ cd ~ 进入hadoop用户目录 $ mkdir input 创建输入目录 $ cd /opt/hadoop* 进入hadoop目录 $ ./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep ~/input ~/output ‘dfs[a-z.]+’ … 省略日志输出 $ cat ~/output/* 使用cat命令检查执行结果 1 dfsadmin |
对于其他样例,用户可以自行试验。需要注意的是,Hadoop本身不会自动覆盖结果文件,所以再次运行前需要先将output目录删除。
1. Hadoop伪分布
所谓的伪分布式是指在一台设备中,开启两个独立的java进程。。一个作为NameNode进程,另一个作为DataNode进程运行。此时,这台设备既是NameNode,又是DataNode。
1. 首先,我们需要编辑hadoop用户目录下的.bashrc文件,将Hadoop加入环境变量中。
export HADOOP_HOME=/opt/hadoop-2.7.3 export HADOOP_INSTALL=$HADOOP_HOME export HADOOP_MAPRED_HOME=$HADOOP_HOME export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME export YARN_HOME=$HADOOP_HOME export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin |
2.
我并不建议大家修改hadoop-env.sh文件,因为这样有可能出现错误,导致hadoop运行异常。如果需要修改该文件,建议大家先复制备份。
Hadoop 的配置文件位于 /opt/hadoop/hadoop/etc/hadoop/ 中,伪分布式需要修改2个配置文件 core-site.xml 和 hdfs-site.xml 。Hadoop的配置文件是 xml 格式,每个配置以声明property的name和value的方式来实现。
<!-- core-site.xml --> <?xml version=”1.0”?> <configuration> <property> <name>hadoop.tmp.dir</name> <value>file:/home/hadoop/tmp</value> <final>true</final> <description>临时目录被设定在hadoop用户目录下<description> </property> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> <final>true</final> </property> </configuration>
<!-- hdfs-site.xml --> <?xml version=”1.0”?> <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:/home/hadoop/tmp/dfs/name</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:/home/hadoop/tmp/dfs/data</value> </property> </configuration> |
2. 配置文件修改完毕后执行hdfs命令,格式化HDFS。
INFO common.Storage: Storage directory /home/deepmind/tmp/dfs/ name has been successfully formatted. INFO namenode.FSImageFormatProtobuf: Saving image file /home/d eepmind/tmp/dfs/name/current/fsimage.ckpt_0000000000000000000 using no compressi on INFO namenode.FSImageFormatProtobuf: Image file /home/deepmind /tmp/dfs/name/current/fsimage.ckpt_0000000000000000000 of size 355 bytes saved i n 0 seconds. INFO namenode.NNStorageRetentionManager: Going to retain 1 ima ges with txid >= 0 INFO util.ExitUtil: Exiting with status 0 |
3.
日志信息中“Storagedirectory /home/deepmind/tmp/dfs/ namehas been successfully formatted.”表示HDFS已经格式化完成。如果日志中提示“Exitingwith status 0”也是格式化成功的标志。如果退出状态不是0,那么格式化过程就会出错。具体错误请仔细查看日志。
接下来启动NameNode和DataNode。
$ ./sbin/start-dfs.sh |
启动过程中会有“Areyou sure you want to continue connecting (yes/no)?”提示,输入yes,同意连接即可。启动过程结束后,我们可以使用jps命令查看Java进程,也可以使用浏览器打开http://localhost:50070,查看更多信息。
$ jps 2986 DataNode 3146 SecondaryNameNode 3261 Jps 2862 NameNode |
Figure 1 启动完成后页面
2. Hadoop集群配置
首先我们要介绍高可用性(High Avaliability)和仲裁日志管理 (Quorum Journal Manager)的概念。在Hadoop2.0之前,集群环境中,NameNode只能有一个。一旦NameNode无法访问,比如NameNode设备宕机,整个集群就无法访问了。在这种情况下,一般只有两种方法。
● 当计划外的事故发生的时候,整个集群无法访问。只能在管理员重新启动NameNode设备后,重新投入使用。
● 管理员安排停产维护的时间窗口,在此时间计划内,机群无法工作。
Hadoop 2.0之后的高可用性功能解决了以上的问题。它允许集群内存在一个主动式(或被动式)的热备节点。在活动的NameNode节点不能正常工作,或管理员主动发起切换的情况下,NameNode的职责将会快速切换到备用节点上。
一个典型的Hadoop集群可以拥有两个独立的NameNode节点。在同一时间点,一台NameNode被设置为活动节点,另外一台被设置为备用节点。活动节点负责处理所有客户端对机群的的操作。热备节点只是简单的作为从属节点,等待在必要时完成状态的快速切换。
为了保持热备节点和活动节点在状态上的同步,两个节点之间需要通过叫”JournalNodes”的守护进程通讯。当活动节点执行任何命名空间修改时,它会持续记录一份关于大多数JournalNode的修改记录。热备节点可以访问这些记录,并且持续观察这些记录的变化。当NameNode职责切换发生的时候,在热备节点切换为活动节点前,热备节点需要确认它已经从所有的JounalNode节点读取了相关信息。并且热备节点需要从这些信息中推断命名空间状态是否已经全部同步完成。
为了实现快速切换,热备节点必须有最新的关于数据块在机群中位置的信息。所以DataNode必须将数据块位置信息发送到活动和热备这两个NameNode节点。
保证同一时刻,集群内只有一台活动的NameNode是非常重要的。否则,命名空间状态很快就会在两台NameNode之间产生偏差。这会导致数据丢失或者其他错误的结果。为了防止这种情况出现,JournalNode在同一时刻只允许一台NameNode对其进行写操作。切换操作后,新的NameNode将接管“写”操作权,可以写入JournalNode了。
实际操作中,我们首先需要修改hdfs-site.xml文件。首先我们要定义NameService。NameService可以看成是一个集群的总称。一个NameService可以包括多个NameNode。每个NameNode下注册若干DataNode。
<!-- 定义Name Service --> <property> <name>dfs.nameservices</name> <value>ns1</value> </property> <!-- 定义Name Service下的NameNode节点 --> <property> <name>dfs.ha.namenodes.ns1</name> <value>nn1,nn2</value> </property> <!-- 定义各NameNode节点的HTTP和RPC服务地址 --> <property> <name>dfs.namenode.rpc-address.ns1.nn1</name> <value>hadoop-01:8020</value> </property> <property> <name>dfs.namenode.rpc-address. ns1.nn2</name> <value>hadoop-02:8020</value> </property> <property> <name>dfs.namenode.http-address.ns1.nn1</name> <value>hadoop-01:50070</value> </property> <property> <name>dfs.namenode.http-address.ns1.nn2</name> <value>hadoop-02:50070</value> </property> |
如果您使用了https协议,那么dfs.namenode.http-address属性需要换为dfs.namenode.https-address。
在集群环境中,活动NameNode需要和热备NameNode同步数据。两个NameNode必须有可读写的共享目录用于同步数据。这个目录应该可以被两个NameNode挂载,并且具有读写权限。这个属性在dfs.namenode.shared.edits.dir中设置。
<property> <name>dfs.namenode.shared.edits.dir</name> <value>file:///mnt/filer1/dfs/ha-name-dir-shared</value> </property> |
DFS客户端需要知道当前的活动NameNode节点是哪台设备。Hadoop提供dfs.client.failover.proxy.provider.[nameserviceID]属性告诉客户端应该使用那种方法获取活动NameNode信息。
<property> <name>dfs.client.failover.proxy.provider.mycluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> |
还记得前面提到的吗?一个集群中只允许有一个活动的NameNode。当切换发生的时候,我们必须指明隔离方法来确保之前的活动NameNode节点已经处于待机或其进程已经结束。常用的隔离方法是sshfence。该方法需要用户指明SSH私钥的位置。
<property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/home/hadoop/.ssh/id_rsa</value> </property> |
另外一种方法是执行shell脚本。这种方法要求hadoop集群建立在Shell环境下。该环境一般应用于Linux、Unix或Unix类操作系统中。
<property> <name>dfs.ha.fencing.methods</name> <value>shell(/path/to/my/script.sh arg1 arg2 ...)</value> </property> |
不同于sshfence的方法。shell脚本直接运行在操作系统层面。同时,为了提供更大的灵活性,hadoop允许将配置文件中已的属性作为环境变量传入shell脚本。作为参数的属性名需要将名中的‘.’替换为‘_’。比如dfs.ha.fencing.methods就相应的替换为dfs_ha_fencing_methods。如果脚本执行成功则返回0,否则hadoop将会尝试启动下一个隔离方法。这里需要说明的是,默认的隔离方法没有超时设置,如果用户需要设置超时处理,就需要自己编写脚本了。
最后,管理员需要修改core-site.xml文件,指明默认的Name Service,并且添加Jornual Node的共享目录。
<property> <name>fs.defaultFS</name> <value>hdfs://mycluster</value> </property> <property> <name>dfs.journalnode.edits.dir</name> <value>/path/to/journal/node/local/data</value> </property> |
大家还记得在伪分布式中fs.defaultFS是怎么配置的吗?伪分布式中fs.defaultFS指向本机。在集群环境中,fs.defaultFS指向NameService。
MapReduce客户端应用
首先我们要明白,Hadoop以服务器的方式独立运行。并且它的运行状况可以被运维人员监控。开发人员使用Hadoop暴露的API与Hadoop通信。常见的方法是通过ClientAPI、Steaming或者Pipes。Client API支持Java、Python、C等多种语言。每种语言我们都可以找到相应的类库。Streaming和Pipes提供符合Unix规范的访问方式。由于本书不是一般专门介绍Hadoop的书,有兴趣的读者可以在Hadoop的官网上获取更多信息。这里主要介绍MapReduce框架下的编程方法。
MapReduce是一个编程框架。这个框架意味着MapReduce提供了一套API、生命周期和通信方法。ClientAPI大家可以通过在线文档获得API列表和更详细的信息。我们这里只关注一些我们常用的API。首先是Job、Mapper和Reducer类。Hadoop更提倡大家通过实现抽象类来实现自己的目的。但是如果我们仔细观察这些抽象类的定义,我们就会发现,正如我们之前所说的那样,Hadoop通过定义一系列的接口来定义上面提到的规范。
MapReduce计算框架,顾名思义,整个计算过程分为Map和Reduce两个阶段。在Map阶段,我们的数据被分成多个单位。比如我们有100TB的数据,在Map阶段,这些数据可以被划分为10,000个或更多的部分并且对每一个单独的部分进行计算。Reduce阶段则将Map计算的计算结果进行汇总,然后继续计算。这就像是统计局的工作。无数政府部门将数据汇总到统计局。然后统计局再进行计算,最后得出统计结论,再向政府和公众汇报。但是在现实情况中往往由于需要汇总的Map结果太多,Reduce面对这些结果往往有点儿力不从心。所以我们引入一个新的计算节点,Combine。Combine其实就是Reduce。让我们重新看下Reduce的工作。Reduce以Map过程的计算结果作为自身的输入,然后根据这些数据进行进一步的计算。也就是说Reduce的工作包括两部分。一部分是合并数据,另一部分是进一步的计算。现在我们加入一个新的Reduce,让之前的Reduce的工作更单纯的集中在计算的部分。新加入的Reduce就是Combine。
好吧,现在我们给出一个例子来分析MapReduce的编程方法。这个例子就是Hadoop官方附带的例子,在hadoop/share/mapreduce/hadoop-mapreduce-examples-2.7.3.jar中包含这个例子的二进制实现。
首先我们需要启动Hadoop。还记得我们之前介绍过的,Hadoop是一个项目集合吗?启动Hadoop也就意味着启动Hadoop下的各个子项目,他们是Hadoop-Common、HDFS、YARN。MapReduce是一个计算框架,所以不需要启动。当我们执行我们的程序的时候,MapReduce框架实际上会随着我们的程序启动。别忘了,MapReduce首先是一套API。
我们还是需要简单介绍一下HDFS的一些操作命令。在以后的章节中我们会逐步介绍我们会用到的命令。和其他的文件系统一样,HDFS有自己的一套命令。这些命令被设计成我们更熟悉的Unix风格。HDFS的一般命令格式形如以下形式。
bin/hadoop fs <args> |
ls
调用形式:hadoop fs -ls [-d] [-h] [-R] <args>
参数:
• -d: 文件目录按照一般文件形式显示。
• -h: 以方便阅读的形式显示文件大小。
• -R: 递归显示子文件中的文件。
样例:
hadoop fs -ls hdfs://192.168.31.209:9000/
退出代码:
0表示成功,-1表示失败。
mkdir
调用方式: hadoop fs -mkdir [-p] <paths>
在单节点模式下,paths参数就是本地目录。在伪分布式和分布式部署下,paths是core-site.xml文件中fs.defaultFS的value部分。
参数:
• -p参数表示递归的创建子目录。
样例:
• hadoopfs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
• hadoop fs -mkdirhdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir
退出代码:
0表示成功,-1表示失败。
put
调用形式: hadoop fs -put<localsrc> ... <dst>
将本地数据提交到HDFS系统中。
参数:
localsrc 是输入文件的列表,一系列本地文件。
dst 是目标文件,一般是hdfs文件。
样例:
• hadoop fs -put localfile/user/hadoop/hadoopfile
• hadoopfs -put localfile1 localfile2 /user/hadoop/hadoopdir
• hadoop fs -put localfilehdfs://nn.example.com/hadoop/hadoopfile
• hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfileReads the input from stdin.
退出代码:
0代表成功,-1代表成功。
get
调用格式: hadoop fs -get [-ignorecrc] [-crc]<src> <localdst>
从HDFS中获取文件到本地。
样例:
• hadoopfs -get /user/hadoop/file localfile
• hadoop fs -get hdfs://nn.example.com/user/hadoop/filelocalfile
退出代码:
0表示成功,-1表示失败。
找一个文本文件,比如README.txt之类的,通过put方法上传到HDFS文件系统。因为MapReduce只能从HDFS中读取数据。下面是样例代码:
import java.io.IOException; import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
代码分为三部分,Mapper的实现、Reducer的实现和WordCount。WordCount类包含main函数,是整个程序的入口。需要注意的是,main函数中创建了Job对象。Job对象。Job对象设置Mapper类、Combine类和Reducer类,以及Key和Value的类型。
这里要重点解释的是FileInputFormat.addInputPath和FileOutputFormat.setOutputPath。这两条API的调用过程中需要两个参数,第一个是Job对象,第二个是数据输入或输出的路径。这个路径的开始是由core-site.xml中fs.defaultFS中制定的。在伪分布式部署环境中这个值往往是localhost、本机主机名或本机IP地址。在分布式部署环境中,这个值是namespace。
但是大家不要忘了,Hadoop是工作在集群环境下,并且往往面对超出内存容量的数据量。对操作系统原理还有影响的同学应该记得虚拟内存是怎么回事。也就是说Hadoop也需要借鉴虚拟内存的理念,通过占用一定的磁盘空间形成一个平均读写速度小于内存大于硬盘,同时,尺寸大于内存而小于磁盘的联合空间。这样就需要序列化技术。最简单的序列化就是把内存数据按顺序保存到文件中。从文件读取数据到内存变成对象的过程称之为反序列化。序列化和反序列化互为逆操作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。