赞
踩
[创建文件夹]
sudo mkdir /data
cd /
ls -all
[改变拥有者]
sudo chown neworigin:neworigin /data/
[拷贝]
cp /mnt/hgfs/BigData/第四天/jdk-8u121-linux-x64.tar.gz /data/
[解压]
tar -xzvf jdk-8u121-linux-x64.tar.gz
[查看路径]
pwd
[/etc/environment]
>sudo nano /etc/environment
JAVA_HOME=/data/jdk1.8.0_121
PATH="$PATH:/data/jdk1.8.0_121/bin"
>source /etc/environment
[查看环境]
>java -version
hadoop配置
[拷贝]
cp /mnt/hgfs/BigData/第四天/hadoop-2.7.0.tar.gz /data/
[解压]
tar -xzvf hadoop-2.7.0.tar.gz
[etc/environment]
HADOOP_HOME=/data/hadoop-2.7.0
PATH=$PATH:/data/hadoop-2.7.0/bin:/data/hadoop-2.7.0/sbin
[测试]
>hadoop version
[配置文件]
>cd /data/hadoop-2.7.0/etc/hadoop
[core-site.xml]
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost/</value>
</property>
</configuration>
[hdfs-site.xml]
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
--------------------------------------
cp mapred-site.xml.template mapred-site.xml
[mapred-site.xml]
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
[yarn-site.xml]
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>localhost</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
[设置免密登录]
[安装]
sudo apt-get install ssh
[生成秘钥]
ssh-keygen -t rsa[根目录执行]或
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
cd ~/.ssh/
[拷贝到公钥]
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys 或
cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys
>ssh localhost
>exit
>ssh localhost
[启动hadoop]
>hdfs namenode -format [工作禁止使用]
>start-all.sh
[查看进程]
>jps
[暂停hadoop]
>stop-all.sh
[webui]
s100:50070
[进程]
DataNode //数据节点 DN
ResourceManager//资源管理器 RM
NameNode //名称节点 NN
NodeManager //节点管理器 NM
SecondaryNameNode//辅助名称节点 2NN
[hadoop shell]
>hadoop fs
[创建目录]
>hadoop fs -mkdir -p /user/neworigin/
[上传文件]
>echo helloworld > 1.txt
>hadoop fs -put ~/1.txt /user/neworigin/
[查看文件系统]
>hadoop fs -lsr 或
>hadoop fs -ls -R 或
>hadoop fs -ls -R /
[修改文件权限]
>hadoop fs -chmod 777 1.txt
[删除文件]
>hadoop fs -rm 1.txt
[读文件]
>hadoop fs -cat 1.txt 或
>hadoop fs -text 1.txt
>hdfs dfs -cat 1.txt
[集群搭建]
3台虚拟机(s100, s101, s102)
s100 -- master
s101 -- slave1
s102 -- slave1
[配置网络]
/etc/network/interfaces
[配置网络映射]
/etc/hosts
[更改主机名称]
/etc/hostname
------------------
[创建目录]
>sudo mkdir /data (s100, s101, s102)
>sudo chown neworigin:neworigin /data
[无密登录ssh]
[s100, s101, s102]
>sudo apt-get install ssh //安装
>rm -rf ~/.ssh
[s100]
>ssh-keygen -t rsa -f ~/.ssh/id_rsa//在s100(生产公钥)
>cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
>ssh-copy-id s101//把s100的公钥拷贝到s101
>ssh-copy-id s102//把s100的公钥拷贝到s102
>ssh localhost
>exit
>ssh s101
>exit
>ssh s102
>exit
[s100]
cat ~/.ssh/authorized_keys
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEy23ilBVz3NmX5SniIBtxgLT/aFDCCxdc5eTApyjfXg4ISHYcfXsYxDAtqtW9SJQD7KIRvVmRn9hO4nA5MWQVAmPINP96bh7k1eDp8i+1ObKxTd1GXBAhG3dUg3Z7NqOjFBZCMJpwovsR6opajI02g5a27d6YAxZqbBP7RCzIgfuaVEuHqn2HtOA5f7A+eXcNpyb3bvJxmbMe4gUrPQtP+gIS9T13wBKK0EibojpQ52ZKEZUXJFMpX5EThymhBanSVe4KUr8/jmHGQRTMsQMqv2sPNRyL4Sq/C3KsneX4lJt8j8ubPZvzdMOiwQxdYFDn32qsp19BOjlioZpv2JkZ neworigin@s100
[s101] cat ~/.ssh/authorized_keysssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEy23ilBVz3NmX5SniIBtxgLT/aFDCCxdc5eTApyjfXg4ISHYcfXsYxDAtqtW9SJQD7KIRvVmRn9hO4nA5MWQVAmPINP96bh7k1eDp8i+1ObKxTd1GXBAhG3dUg3Z7NqOjFBZCMJpwovsR6opajI02g5a27d6YAxZqbBP7RCzIgfuaVEuHqn2HtOA5f7A+eXcNpyb3bvJxmbMe4gUrPQtP+gIS9T13wBKK0EibojpQ52ZKEZUXJFMpX5EThymhBanSVe4KUr8/jmHGQRTMsQMqv2sPNRyL4Sq/C3KsneX4lJt8j8ubPZvzdMOiwQxdYFDn32qsp19BOjlioZpv2JkZ neworigin@s100
[s102]cat ~/.ssh/authorized_keysssh-rsa
AAAAB3NzaC1yc2EAAAADAQABAAABAQDEy23ilBVz3NmX5SniIBtxgLT/aFDCCxdc5eTApyjfXg4ISHYcfXsYxDAtqtW9SJQD7KIRvVmRn9hO4nA5MWQVAmPINP96bh7k1eDp8i+1ObKxTd1GXBAhG3dUg3Z7NqOjFBZCMJpwovsR6opajI02g5a27d6YAxZqbBP7RCzIgfuaVEuHqn2HtOA5f7A+eXcNpyb3bvJxmbMe4gUrPQtP+gIS9T13wBKK0EibojpQ52ZKEZUXJFMpX5EThymhBanSVe4KUr8/jmHGQRTMsQMqv2sPNRyL4Sq/C3KsneX4lJt8j8ubPZvzdMOiwQxdYFDn32qsp19BOjlioZpv2JkZ neworigin@s100
[多台主机执行相同命令]
[/usr/local/bin/]
>sudo nano xcall
#!/bin/bash
#获取参数个数
pcount=$#
if((pcount<1));then
echo no args;
exit;
fi
for((host=100;host<103;host=host+1));do
echo ------------s$host-----------------
ssh s$host $@
done
[发送文件]
[scp]
>cp -r /home/neworigin/Desktop/1.txt neworigin@s101:/home/neworigin/Desktop/
[rsync]
远程同步工具,主要用于备份和镜像;支持链接,设备等等;速度快,避免复制相同内容的文件数据;不支持两个远程主机间的复制
>rsync -rvl /home/neworigin/Desktop/1.txt neworigin@s101:/home/neworigin/Desktop/
#!/bin/bash
pcount=$#
if((pcount<1));then
echo no args
exit
fi
p1=$1
fname=`basename $p1`
#echo $fname
pdir=`cd -P $(dirname $p1);pwd`
#echo $pdir
cuser=`whoami`
for((host=101;host<103;host=host+1));do
echo -------------s$host---------------
rsync -rvl $pdir/$fname $cuser@s$host:$pdir
done
[免密码]
>sudo passwd
>su root
>sudo nano /etc/sudoers
neworigin ALL=(ALL:ALL) NOPASSWD:ALL
[安装jdk]
>xsync /data/jdk/
[/etc/environment]
JAVA_HOME=/data/jdk1.8.0_121
PATH="$PATH:/data/jdk1.8.0_121/bin"
[复制]
>cd /data/hadoop-2.7.0/etc
>cp -rf hadoop/ hadoop_tmp
[core-site.xml]
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://s100/</value>
</property>
</configuration>
[hdfs-site.xml]
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/home/neworigin/hadoop/hdfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/home/neworigin/hadoop/hdfs/data</value>
</property>
<property>
<name>dfs.namenode.checkpoint.dir</name>
<value>/home/neworigin/hadoop/hdfs/namesecondary</value>
</property>
</configuration>
[yarn-site.xml]
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>s100</value>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>/home/neworigin/hadoop/nm-local-dir</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16384</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>16</value>
</property>
</configuration>
[slaves]
s100
s101
s102
[新建文件夹]
>xcall mkdir /home/neworigin/hadoop/
[发送]
>xsync /data/hadoop-2.7.0
[配置环境s101, s102]
HADOOP_HOME=/data/hadoop-2.7.0
PATH="$PATH:/data/hadoop-2.7.0/bin:/data/hadoop-2.7.0/sbin"
[启动s100]
>hdfs namenode -format
>start-all.sh //启动
[进程]
>xcall jps
------------s100-----------------
15824 ResourceManager
16065 NodeManager
15122 NameNode
16514 Jps
15358 DataNode
15646 SecondaryNameNode
------------s101-----------------
9233 DataNode
9468 NodeManager
9710 Jps
------------s102-----------------
9192 NodeManager
9434 Jps
8957 DataNode
[webui]
s100:50070
>stop-all.sh //暂停
[jar整理]
tests
sources
core-site.xml hadoop-common-2.7.0.jar core-default.xml
hdfs-site.xml hadoop-hdfs-2.7.0.jarhdfs-default.xml
yarn-site.xml hadoop-yarn-common-2.7.0.jar yarn-default.xml
mapred-site.xml hadoop-mapreduce-client-core-2.7.0.jarmapred-default.xml
[win7配置]
计算机->属性->高级系统设置->环境变量->系统变量
[不要有中文]
HADOOP_HOME=D:\hadoop-2.7.0
Path=$Path;D:\hadoop-2.7.0\bin;D:\hadoop-2.7.0\sbin
[core-site.xml]
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://s100/</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/neworigin/hadoop</value>
</property>
</configuration>
[hdfs-site.xml]
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.tmp.dir}/hdfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/hdfs/data</value>
</property>
<property>
<name>dfs.namenode.checkpoint.dir</name>
<value>file://${hadoop.tmp.dir}/hdfs/namesecondary</value>
</property>
</configuration>
[yarn-site.xml]
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>s100</value>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>file://${hadoop.tmp.dir}/nm-local-dir</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16384</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>16</value>
</property>
</configuration>
[logs日志]
/data/hadoop-2.7.0/logs
gedit hadoop-neworigin-datanode-s100.log
[hdfs shell]
hadoop fs -mkdir -p /home/neworigin/hadoop/ //创建目录
hadoop fs -put ~/1.txt /home/neworigin/hadoop///上传数据
[hdfs-site.xml]
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.tmp.dir}/hdfs/name1,file://${hadoop.tmp.dir}/hdfs/name2</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/hdfs/data1,file://${hadoop.tmp.dir}/hdfs/data2</value>
</property>
<property>
<name>dfs.namenode.checkpoint.dir</name>
<value>file://${hadoop.tmp.dir}/hdfs/namesecondary</value>
</property>
</configuration>
[修过之后重新启动]
>xcall rm -rf /home/neworigin/hadoop
>xcall rm -rf /home/neworigin/hadoop
>start-all.sh
>hadoop fs -put ~/1.txt /home/neworigin/hadoop
>hadoop fs -copyFromLocal ~/1.txt /home/neworigin/hadoop
>hadoop fs -copyToLocal /home/neworigin/hadoop/1.txt ~/
>hadoop fs -ls -R /
>hadoop fs -chmod 777 /home/neworigin/hadoop/1.txt
//显示当前目录结构
>hadoop fs -ls /
//递归显示目录结构
>hadoop fs -lsr /
>hadoop fs -ls -R /
//统计目录下各文件大小
>hadoop fs -du /home/neworigin/hadoop
//汇总统计目录下文件大小
>hadoop fs -dus /home/neworigin/hadoop
//统计文件(夹)数量(文件夹数量、文件数量、文件总大小信息)
>hadoop fs -count /home/neworigin/hadoop/
//移动
>hadoop fs -mv /home/neworigin/hadoop/1.txt /home/neworigin/data/
//复制
>hadoop fs -cp /home/neworigin/data/1.txt /home/neworigin/hadoop/
//删除文件/空白文件夹
>hadoop fs -rm -skipTrash /home/neworigin/data/1.txt
//递归删除
>hadoop fs -rmr -skipTrash /home/neworigin/data
>>hadoop fs -rm -r -skipTrash /home/neworigin/data
//-moveFromLocal 从本地移动
>hadoop fs -moveFromLocal ~/1.txt /home/neworigin/data/
//getmerge 合并到本地
>hadoop fs -getmerge /home/neworigin/ abc.txt
//-cat 查看文件内容
>hadoop fs -cat /home/neworigin/data/1.txt
>hadoop fs -text /home/neworigin/data/1.txt
//-touchz 创建空白文件
>hadoop fs -touchz /home/neworigin/data/2.txt
//-stat 显示文件的统计信息(“%b %n %o %r %Y”依次表示文件大小、文件名称、块大小、副本数、访问时间)
>hadoop fs -stat '%b %n %o %r %Y' /home/neworigin/data/1.txt
//-tail 查看文件尾部内容(-tail 查看文件尾部内容)
>hadoop fs -tail -f /home/neworigin/data/1.txt
//-chmod 修改文件权限
//-chown 修改属主
//-chgrp 修改属组
>hadoop fs -chgrp neworigin /home/neworigin/data/2.txt
//-help 帮助
>hadoop fs -help
[查找fsimage]
xcall ls -all -R /home/neworigin/hadoop/ | grep image
[hdfs API]
新建java项目(Hadoop01)
新建libs文件夹,拷贝143jar包到libs下面
选中143jar包,build path->add
新建java文件URLSytem
package org.neworigin.com;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
public class URLSytem {
public static void main(String[] args) throws Exception {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
//url路径
String urlStr = "hdfs://s100:8020/home/neworigin/hadoop/1.txt";
//url对象
URL url = new URL(urlStr);
//url链接
URLConnection conn = url.openConnection();
//打开输入流
InputStream input = conn.getInputStream();
//文件输出流
FileOutputStream fileout = new FileOutputStream("F:/BigData/hadoop01.txt");
byte [] buf = new byte[1024];
int len = -1;
while((len = input.read(buf)) != -1){
fileout.write(buf, 0, len);
}
input.close();
fileout.close();
System.out.println("over");
}
}
[读写]
private static void checkReturnValue(boolean rv, File p,
FsPermission permission
) throws IOException {
if (!rv) {
throw new IOException("Failed to set permissions of path: " + p +
" to " +
String.format("%04o", permission.toShort()));
}
}
package org.neworigin;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class TestFileSystem {
//声明变量
private static FileSystem fs;
//初始化fs
public static void iniCof(){
try {
Configuration conf = new Configuration();
fs = FileSystem.get(conf);
} catch (Exception e) {
}
}
//写操作
public static void writeFile() throws Exception {
Path path = new Path("hdfs://s100:8020/1.txt");
//输出流
FSDataOutputStream dos = fs.create(path);
dos.write("123456".getBytes());
dos.close();
System.out.println("over");
}
//读文件
public static void readFile() throws Exception {
Path path = new Path("hdfs://s100:8020/1.txt");
//输入流
FSDataInputStream fis = fs.open(path);
//文件输出流
FileOutputStream fileOutputStream = new FileOutputStream("F:/BigData/hadoop02.txt");
//拷贝
IOUtils.copyBytes(fis, fileOutputStream, 1024);
fis.close();
fileOutputStream.close();
System.out.println("over");
}
//写操作,指定副本
public static void writeFileWithReplacation() throws Exception {
Path path = new Path("hdfs://s100:8020/1.txt");
//输出流
FSDataOutputStream dos = fs.create(path, (short)2);
dos.write("123456".getBytes());
dos.close();
System.out.println("over");
}
//读文件,读取指定部分
public static void seekFile() throws Exception {
Path path = new Path("hdfs://s100:8020/1.txt");
//输入流
FSDataInputStream fis = fs.open(path);
//设置偏移量
fis.seek(4);
//文件输出流
FileOutputStream fileOutputStream = new FileOutputStream("F:/BigData/hadoop03.txt");
//拷贝
IOUtils.copyBytes(fis, fileOutputStream, 1024);
fis.close();
fileOutputStream.close();
System.out.println("over");
}
public static void getBlockLocation() throws Exception {
//创建路径
Path path = new Path("hdfs://s100:8020/1.txt");
//得到文件状态
FileStatus fss = fs.getFileStatus(path);
//得到文件长度
long length = fss.getLen();
//得到文件块
BlockLocation[] locs = fs.getFileBlockLocations(fss, 0, length);
for (BlockLocation location : locs) {
System.out.println(location.getOffset());
}
}
public static void main(String[] args) throws Exception {
//调用
iniCof();
//writeFile();
//readFile();
getBlockLocation();
}
}
权限问题:
计算机->属性->高级系统设置->环境变量->系统变量->新建:
变量名:HADOOP_USER_NAME
变量值: neworigin(虚拟机用户命)
定时器
[crontab]
>/etc/init.d/cron start//启动
>/etc/init.d/cron stop//暂停
>sudo nano hello
[1]
1-59 * * * * echo "hello">>/root/1.txt
[2]
* * * * * echo "hello">>/root/1.txt
[3]
* * * * 3 echo "hello">>/root/1.txt
>crontab -u neworigin /home/neworigin/hello
>crontab -l //查看列表
>crontab -e //重新编辑
[hdfs-site.xml]
//副本数
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
//最小副本
<property>
<name>dfs.namenode.replication.min</name>
<value>1</value>
</property>
//最大副本数
<property>
<name>dfs.replication.max</name>
<value>512</value>
</property>
[webui 端口]
<property>
<name>dfs.namenode.http-address</name>
<value>0.0.0.0:50070</value>
</property>
[文件一致性]
public static void syncFile() throws Exception {
Path path = new Path("/data2");
FSDataOutputStream fsDataOutputStream = fs.create(path);
fsDataOutputStream.write("hello".getBytes());
//刷新
fsDataOutputStream.flush();
//同步
fsDataOutputStream.sync();
fsDataOutputStream.close();//close函数隐含执行sync
System.out.println(fs.getFileStatus(path).getLen());
}
[集群间数据复制]
>distcp
>hadoop distcp hdfs://s100/1.txt hdfs://s200/test
//覆盖原来的
>hadoop distcp -overwrite hdfs://s100/1.txt hdfs://s200/test
//更新
>hadoop distcp -update hdfs://s100/1.txt hdfs://s200/test
[s200]
/test/1.txt
[存档]
>hadoop archive -archiveName file.har -p /data/file /data
[查看归档数据]
>hadoop fs -ls har:///data/file.har
源文件
hdfs
#读取日志,上传符合规则
/home/neworigin/data1/1.txt
/home/neworigin/data1/2.txt
/home/neworigin/data1/3.txt
fileName=1.txt
fileName=2.txt
fileName=3.txt#
[upFileToHdfs]
!/bin/bash
#源文件
src_path=/home/neworigin/data1/
#hdfs
des_path=/data/file/
#读取日志,上传符合规则
ls $src_path | while read fileName
do
if [ "3.txt" = "$filename" ]; then
hadoop fs -put $src_path/$fileName $des_path
fi
done
去IOE
-------------EMC//EMC共享存储设备。
Cluster
符号链接:
这样配置环境变量的时候就可以这样写了:
$>ssh localhost
--------------
1.独立模式(standalone|local)
nothing!
本地文件系统。
不需要启用单独进程。
2.pesudo(伪分布模式)
等同于完全分布式,只有一个节点。
SSH: //(Socket),
//public + private
//server : sshd ps -Af | grep sshd
//clint : ssh
//ssh-keygen:生成公私秘钥。
//authorized_keys 需要使用644
//ssh 192.168.231.201 yes
[配置文件]
core-site.xml //fs.defaultFS=hdfs://localhost/
hdfs-site.xml //replication=1
mapred-site.xml//
yarn-site.xml //
3.full distributed(完全分布式)
让命令行提示符显式完整路径
---------------------------
1.编辑profile文件,添加环境变量PS1
[/etc/profile]
export PS1='[\u@\h `pwd`]\$'
2.source
$>source /etc/profile
配置hadoop,使用符号连接的方式,让三种配置形态共存。
----------------------------------------------------
1.创建三个配置目录,内容等同于hadoop目录
${hadoop_home}/etc/local
${hadoop_home}/etc/pesudo
${hadoop_home}/etc/full
2.创建符号连接
$>ln -s
3.对hdfs进行格式化
$>hadoop namenode -format
4.修改hadoop配置文件,手动指定JAVA_HOME环境变量
[${hadoop_home}/etc/hadoop/hadoop-env.sh]
...
export JAVA_HOME=/soft/jdk
...
5.启动hadoop的所有进程
$>start-all.sh
6.启动完成后,出现以下进程
$>jps
33702 NameNode
33792 DataNode
33954 SecondaryNameNode
29041 ResourceManager
34191 NodeManager
7.查看hdfs文件系统
$>hdfs dfs -ls /
8.创建目录
$>hdfs dfs -mkdir -p /user/centos/hadoop
9.通过webui查看hadoop的文件系统
http://localhost:50070/
10.停止hadoop所有进程
$>stop-all.sh
11.centos防火墙操作
[cnetos 6.5之前的版本]
$>sudo service firewalld stop//停止服务
$>sudo service firewalld start//启动服务
$>sudo service firewalld status//查看状态
[centos7]
$>sudo systemctl enable firewalld.service//"开机启动"启用
$>sudo systemctl disable firewalld.service//"开机自启"禁用
$>sudo systemctl start firewalld.service//启动防火墙
$>sudo systemctl stop firewalld.service//停止防火墙
$>sudo systemctl status firewalld.service//查看防火墙状态
[开机自启]
$>sudo chkconfig firewalldon//"开启自启"启用
$>sudo chkconfig firewalldoff//"开启自启"禁用
hadoop的端口
-----------------
50070 //namenode http port
50075 //datanode http port
50090 //2namenodehttp port
8020 //namenode rpc port
50010 //datanode rpc port
hadoop四大模块
-------------------
common
hdfs //namenode + datanode + secondarynamenode
mapred
yarn //resourcemanager + nodemanager
启动脚本
-------------------
1.start-all.sh//启动所有进程
2.stop-all.sh //停止所有进程
3.start-dfs.sh//
4.start-yarn.sh
[hdfs] start-dfs.sh stop-dfs.sh
NN
DN
2NN
[yarn] start-yarn.sh stop-yarn.sh
RM
NM
修改主机名
-------------------
1./etc/hostname
s201
2./etc/hosts
127.0.0.1 localhost
192.168.231.201 s201
192.168.231.202 s202
192.168.231.203 s203
192.168.231.204 s204
完全分布式
--------------------
1.克隆3台client(centos7)
右键centos-7-->管理->克隆-> ... -> 完整克隆
2.启动client
3.启用客户机共享文件夹。
4.修改hostname和ip地址文件
[/etc/hostname]
s202
[/etc/sysconfig/network-scripts/ifcfg-ethxxxx]
...
IPADDR=..
5.重启网络服务
$>sudo service network restart
6.修改/etc/resolv.conf文件
nameserver 192.168.231.2
7.重复以上3 ~ 6过程.
准备完全分布式主机的ssh
-------------------------
1.删除所有主机上的/home/centos/.ssh/*
2.在s201主机上生成密钥对
$>ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
3.将s201的公钥文件id_rsa.pub远程复制到202 ~ 204主机上。
并放置/home/centos/.ssh/authorized_keys
$>scp id_rsa.pub centos@s201:/home/centos/.ssh/authorized_keys
$>scp id_rsa.pub centos@s202:/home/centos/.ssh/authorized_keys
$>scp id_rsa.pub centos@s203:/home/centos/.ssh/authorized_keys
$>scp id_rsa.pub centos@s204:/home/centos/.ssh/authorized_keys
4.配置完全分布式(${hadoop_home}/etc/hadoop/)
[core-site.xml]
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://s201/</value>
</property>
</configuration>
[hdfs-site.xml]
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
</configuration>
[mapred-site.xml]
不变
[yarn-site.xml]
<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>s201</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
[slaves]
s202
s203
s204
[hadoop-env.sh]
...
export JAVA_HOME=/soft/jdk
...
5.分发配置
$>cd /soft/hadoop/etc/
$>scp -r full centos@s202:/soft/hadoop/etc/
$>scp -r full centos@s203:/soft/hadoop/etc/
$>scp -r full centos@s204:/soft/hadoop/etc/
6.删除符号连接
$>cd /soft/hadoop/etc
$>rm hadoop
$>ssh s202 rm /soft/hadoop/etc/hadoop
$>ssh s203 rm /soft/hadoop/etc/hadoop
$>ssh s204 rm /soft/hadoop/etc/hadoop
7.创建符号连接
$>cd /soft/hadoop/etc/
$>ln -s full hadoop
$>ssh s202 ln -s /soft/hadoop/etc/full /soft/hadoop/etc/hadoop
$>ssh s203 ln -s /soft/hadoop/etc/full /soft/hadoop/etc/hadoop
$>ssh s204 ln -s /soft/hadoop/etc/full /soft/hadoop/etc/hadoop
8.删除临时目录文件
$>cd /tmp
$>rm -rf hadoop-centos
$>ssh s202 rm -rf /tmp/hadoop-centos
$>ssh s203 rm -rf /tmp/hadoop-centos
$>ssh s204 rm -rf /tmp/hadoop-centos
9.删除hadoop日志
$>cd /soft/hadoop/logs
$>rm -rf *
$>ssh s202 rm -rf /soft/hadoop/logs/*
$>ssh s203 rm -rf /soft/hadoop/logs/*
$>ssh s204 rm -rf /soft/hadoop/logs/*
10.格式化文件系统
$>hadoop namenode -format
11.启动hadoop进程
$>start-all.sh
rsync
------------------
四个机器均安装rsync命令。
远程同步.
$>sudo yum install rsync
将root用户实现无密登录
------------------------
1.同
编写脚本
---------------
1.xcall.sh
2.xsync.sh
xsync.sh /home/etc/a.txt
rsync -lr /home/etc/a.txt centos@s202:/home/etc
>sudo nano xcall
#!/bin/bash
#获取参数个数
pcount=$#
if((pcount<1));then
echo no args;
exit;
fi
for((host=100;host<103;host=host+1));do
echo ------------s$host-----------------
ssh s$host $@
done
[发送文件]
[scp]
>cp -r /home/neworigin/Desktop/1.txt neworigin@s101:/home/neworigin/Desktop/
[rsync]
远程同步工具,主要用于备份和镜像;支持链接,设备等等;速度快,避免复制相同内容的文件数据;不支持两个远程主机间的复制
>rsync -rvl /home/neworigin/Desktop/1.txt neworigin@s101:/home/neworigin/Desktop/
#!/bin/bash
pcount=$#
if((pcount<1));then
echo no args
exit
fi
p1=$1
fname=`basename $p1`
#echo $fname
pdir=`cd -P $(dirname $p1);pwd`
#echo $pdir
cuser=`whoami`
for((host=101;host<103;host=host+1));do
echo -------------s$host---------------
rsync -rvl $pdir/$fname $cuser@s$host:$pdir
done
ssh权限问题
----------------
1.~/.ssh/authorized_keys
644
2.$/.ssh
700
3.root
配置SSH
-------------
生成密钥对 $>ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
添加认证文件 $>cat ~/.ssh/id_rsa.pub > ~/.ssh/authorized_keys
权限设置,文件和文件夹权限除了自己之外,别人不可写。 $>chmod 700 ~/.ssh $>chmod 644 ~/.ssh/authorized_keys
scp
----------
远程复制.
rsync
---------
远程同步,支持符号链接。
rsync -lr xxx xxx
完全分布式
---------------
1.配置文件
[core-site.xml]
fs.defaultFS=hdfs://s201:8020/
[hdfs-site.xml]
replication=1 //伪分布
replication=3 //完全分布
[mapred-site.xml]
mapreduce.framework.name=yarn
[yarn-site.xml]
rm.name=s201
[slaves]
s202
s203
s204
2.分发文件
a)ssh
openssh-server//sshd
openssh-clients//ssh
openssh //ssh-keygen
b)scp/rsync
3.格式化文件系统
$>hadoop namenode -format
4.启动hadoop所有进程
//start-dfs.sh + start-yarn.sh
$>start-all.sh
5.xcall.sh jps
/usr/local/bin/jps
/usr/local/bin/java
6.查看jps进程
$>xcall.sh jps
7.关闭centos的防火墙
$>sudo service firewalld stop// <=6.5start/stop/status/restart
$>sudo systemctl stop firewalld// 7.0 停止start/stop/status/restart
$>sudo systemctl disable firewalld//关闭
$>sudo systemctl enable firewalld//启用
7.最终通过webui
http://s201:50070/
符号连接
----------------
1.修改符号连接的owner
$>chown -h centos:centos xxx//-h:针对连接本身,而不是所指文件.
2.修改符号链接
$>ln -sfT index.html index//覆盖原有的连接。
hadoop模块
-------------------
common //
hdfs //
mapreduce //
yarn //
进程
------------------
[hdfs]start-dfs.sh
NameNode NN
DataNode DN
SecondaryNamenode2NN
[yarn]start-yarn.sh
ResourceManangerRM
NodeManager NM
hdfs常用命令
--------------------
$>hdfs dfs -mkdir /user/centos/hadoop
$>hdfs dfs -ls -r /user/centos/hadoop
$>hdfs dfs -lsr /user/centos/hadoop
$>hdfs dfs -put index.html /user/centos/hadoop
$>hdfs dfs -get /user/centos/hadoop/index.html a.html
$>hdfs dfs -rm -r -f /user/centos/hadoop
no route
--------------------
关闭防火墙。
$>su root
$>xcall.sh "service firewalld stop"
$>xcall.sh "systemctl disable firewalld"
hdfs
--------------------
500G
1024G = 2T/4T
切割。
寻址时间:10ms左右
磁盘速率 : 100M /s
64M
128M //让寻址时间占用读取时间的1%.
1ms
1 / 100
size = 181260798
block-0 : 134217728
block-1 : 47043070
--------------------
b0.no : 1073741829
b1.no : 1073741830
HA
-----------------------
high availability,高可用性。通常用几个9衡量。
99.999%
SPOF:
-----------------------
single point of failure,单点故障。
secondarynamenode
----------------------
找到所有的配置文件
----------------------
1.tar开hadoop-2.7.3.tar.gz
hadoop-2.7.3\share\hadoop\common\hadoop-common-2.7.3.jar\core-default.xml
hadoop-2.7.3\share\hadoop\hdfs\hadoop-hdfs-2.7.3.jar\hdfs-default.xml
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-core-2.7.3.jar\mapred-default.xml
hadoop-2.7.3\share\hadoop\yarn\hadoop-yarn-common-2.7.3.jar\yarn-site.xml
本地模式
-----------
[core-site.xml]
fs.defaultFS=file://///默认值
配置hadoop临时目录
---------------------
1.配置[core-site.xml]文件
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://s201/</value>
</property>
<!--- 配置新的本地目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/home/centos/hadoop</value>
</property>
</configuration>
//以下属性均由hadoop.tmp.dir决定,在hdfs-site.xml文件中配置。
dfs.namenode.name.dir=file://${hadoop.tmp.dir}/dfs/name
dfs.datanode.data.dir=file://${hadoop.tmp.dir}/dfs/data
dfs.datanode.data.dir=file://${hadoop.tmp.dir}/dfs/data
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
2.分发core-site.xml文件
$>xsync core-site.xml
3.格式化文件系统,只对namenode的本地目录进行初始化。
$>hadoop namenode -format//hdfs namenode -format
4.启动hadoop
$>start-dfs.sh
使用xcall.sh在所有节点上创建jps符号连接,指向/soft/jdk/bin/jps
------------------------------------------------------------------
1.切换到root用户
$>su root
2.创建符号连接
$>xcall.sh "ln -sfT /soft/jdk/bin/jps /usr/local/bin/jps"
3.修改jps符号连接的owner
$>xcall.sh "chown -h centos:centos /usr/local/bin/jps"
4.查看所有主机上的java进程
$>xcall.sh jps
在centos桌面版中安装eclipse
----------------------------
1.下载eclipse linux版
eclipse-jee-mars-R-linux-gtk-x86_64.tar.gz
2.tar开到/soft下,
$>tar -xzvf eclipse-jee-mars-R-linux-gtk-x86_64.tar.gz -C /soft
3.启动eclipse
$>cd /soft/eclipse
$>./eclipse & //后台启动
4.创建桌面快捷方式
$>ln -s /soft/eclipse/eclipse ~/Desktop/eclipse
5.
收集hadoop的所有jar包
-------------------------
使用hadoop客户端api访问hdfs
------------------------------
1.创建java项目
2.导入hadoop类库
......
----------------
1.API
Configuration //配置类,fs.defaultFS(file:///)
FileSystem //抽象类
2.核心配置文件
core-site.xml //文件系统 + 本地临时目录 hadoop.tmp.dir
hdfs-site.xml //relication=3
mapred-site.xml//yarn
yarn-site.xml //rm
3.blocksize
128m
寻址时间 ~= 10ms
磁盘的IO速率 = 100M/s
让寻址时间是读取时间的1 %.
4.centos
5.hadoop
hadoop/share/hadoop/common|hdfs|yarn|mapred|.../lib/jars
yum
------------------
管理依赖。
maven:
------------------
apache maven 。
1.下载maven软件
linux //apache-maven-3.3.9-bin.tar.gz
windows //
2.安装
解压即可。
3.配置环境变量
M2_HOME=安装目录
PATH=%M2_HOME/bin%;...
修改软件仓库
---------------------
1.conf/settings.xml
<mirrors>
<mirror>
<id>nexus-osc</id>
<mirrorOf>central</mirrorOf>
<name>Nexus osc</name>
<url>http://maven.oschina.net/content/groups/public/</url>
</mirror>
<mirror>
<id>nexus-osc-thirdparty</id>
<mirrorOf>thirdparty</mirrorOf>
<name>Nexus osc thirdparty</name>
<url>http://maven.oschina.net/content/repositories/thirdparty/</url>
</mirror>
</mirrors>
<profile>
<id>jdk-1.4</id>
<activation>
<jdk>1.4</jdk>
</activation>
<repositories>
<repository>
<id>nexus</id>
<name>local private nexus</name>
<url>http://maven.oschina.net/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>nexus</id>
<name>local private nexus</name>
<url>http://maven.oschina.net/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
2.修改本地库路径
[默认位置]
~/.m2/repository//windows:C:\Users\Administrator\.m2
//linux :/home/centos/.m2
[conf/settings.xml]
...
<localRepository>/path/to/local/repo</localRepository>
...
体验maven
-----------------
0.查看maven的帮助
cmd>cd bin
cmd>mvn help:system
1.创建项目
编写pom.xml(project object model)文件.
<?xml version="1.0"?>
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.it18zhang</groupId>
<artifactId>helloworld</artifacrId>
<version>1.0.0</version>
</project>
2.创建文件夹
src/main/java //maven默认的源代码目录
3.创建java类
com/it18zhang/helloworld/HelloWorld.java
package com.it18zhang.helloworld;
public class HelloWorld{
public static void main(String[] args){
System.out.println("hello world");
}
}
4.编译java源码类
cmd>mvn clean compile
5.打包
搭建maven私服
-----------------------
1.下载maven web程序(war)
nexus.war
2.部署war文件
复制nexus.war到${tomcat_home}/webapps/即可.
3.配置私服的仓库地址
${tomcat_home}\webapps\nexus\WEB-INF\classes\nexus.properties
nexus-work=h:/maven-repo/nexus
runtime=${bundleBasedir}
nexus-app=${runtime}
4.启动tomcat
5.通过浏览器访问nexus
http://localhost:8080/nexus/
6.复制文件到相应目录下.
配置maven的settings文件,指向私服的地址。
-----------------------------------------
1.编辑settings.xml文件
[${m2_home/conf/settings.xml}
<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<pluginGroups>
</pluginGroups>
<proxies>
</proxies>
<servers>
<server>
<id>releases</id>
<username>admin</username>
<password>admin123</password>
</server>
<server>
<id>snapshots</id>
<username>admin</username>
<password>admin123</password>
</server>
</servers>
<mirrors>
<mirror>
<id>nexus</id>
<mirrorOf>*</mirrorOf>
<url>http://localhost:8080/nexus/content/groups/public/</url>
</mirror>
</mirrors>
<profiles>
<profile>
<id>nexus</id>
<repositories>
<repository>
<id>central</id>
<url>http://localhost:8080/nexus/content/repositories/central/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>central</id>
<url>http://localhost:8080/nexus/content/repositories/central/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
<activeProfiles>
<activeProfile>nexus</activeProfile>
</activeProfiles>
</settings>
maven常用命令
----------------
cmd>mvn clean //清除target目录
cmd>mvn clean compile//清除并编译
cmd>mvn clean package//清除并打包
使用archetype生成项目骨架
---------------------------
cmd>mvn archetype:generate -DarchetypeCatalog=internal
在eclipse中配置mvn
---------------------
1.eclipse ->首选项 -> maven -> user settings
global setting :C:\myprograms\apache-maven-3.3.9\conf\settings.xml
user settings :C:\myprograms\apache-maven-3.3.9\conf\settings.xml
自动解析本地仓库目录:C:\Users\Administrator\.m2\repository
2.ok
缺少winutils.exe文件以及xxx.dll文件
-------------------------------------
1.加压bin.rar文件,将bin/*所有文件覆盖到${hadoop_home}/bin下。
2.设置windows的环境变量
PATH=${hadoop_home}/bin;${hadoop_home}/sbin;...
3.将bin.rar/bin/*.dll + winutils.exe文件放到系统目录下。
系统目录下:
[win7]
C:\Windows\System32
C:\Windows\SysWOW64
windows上配置hadoop_home环境变量
----------------------------------
hadoop_home=d:\downloads\hadoop-2.7.3
path=%hadoop_home%\bin;%hadoop_home%\sbin;...
使用eclipse创建maven项目
--------------------------
1.eclipse -> new project -> maven project -> 指定location -> 选择 archetype 中使用 archetype-quickstart
replication //3
设置文件块
---------------------------
1.只有文件才有副本和块的概念。
文件blocksize默认是128m,最少块大小是1m.blocksize >= 最小值.
2.block默认是128m.
[hdfs-site.xml]
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- -->
<property>
<name>dfs.blocksize</name>
<value>2k</value>
</property>
<property>
<name>dfs.namenode.fs-limits.min-block-size</name>
<value>1024</value>
</property>
</configuration>
3.分发配置文件
$>xsync.sh hdfs-site.xml
4.通过hdfs命令查询key
$>hdfs getconf -confKey dfs.blocksize
5.重启hdfs
$>stop-dfs.sh
$>start-dfs.sh
6.put文件到hdfs
$>hdfs dfs -put 1.txt hadoop
7.使用API创建文件指定副本和块大小设置
@Test
public void putFile() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.231.201:8020/");
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream out = fs.create(new Path("/user/centos/hadoop/a.txt"), true, 1024, (short)2, 1024);
IOUtils.copyBytes(new FileInputStream("D:\\README.txt"), out, 1024);
System.out.println("over");
}
文件最少的副本数
--------------------------
dfs.namenode.replication.min=1//dfs.replication.min(过时了)
dfs.replication.max=512//dfs.replication.max(没过时)
failover //容灾
fault tolerance //容错
512进行一次校验,Crc32实现校验算法,通过本地实现。(hadoop/bin/hadoop.dll)
chunk //512byte
checksum //36byte = 4(校验和类型) + 32(校验和值)
packet //每个packet包含多个chunk,
每个package //<= 65536
504
126 x 4 = 504
65049 - 65016 = 33
65016 126 0 0
createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++, false);
剖析文件写入过程
----------------------
1.DistributedFileSystem dfs = FileSystem.get(conf);
//RPC:remote procedure call,远程过程调用。
FSDataOutputStream out = dfs.create(path);
........
block
--------------
文件块.
128M
最小块:1m
dfs.blocksize=
packet
-------------
包.
64K
chunk
------------
小块
512
DFSOutputStream
-------------------
buf:4608 4.5K
public static enum Type {
NULL (CHECKSUM_NULL, 0),
CRC32 (CHECKSUM_CRC32, 4),
CRC32C(CHECKSUM_CRC32C, 4),
DEFAULT(CHECKSUM_DEFAULT, 0), // This cannot be used to create DataChecksum
MIXED (CHECKSUM_MIXED, 0); // This cannot be used to create DataChecksum
public final int id;
public final int size;
private Type(int id, int size) {
this.id = id;
this.size = size;
}
[-24, -43, -92, -69,
-5, -103, 51, -43,
98, -49, 26, -75,
-121, 78, -44, 96,
-90, 26, 46, 23,
-117, -116, -71, 22,
-70, 124, -58, -111,
-88, 14, -28, -28,
-80, -43, 55, 69]
ExtendedBlock(NN) : BP-333901420-192.168.231.201-1480490556202:blk_1073741842_1019
namenode存放的是filesystem的元数据(路径 + 副本个数 + 块大小 + 权限)
fault tolerance //容错
fail over //容灾
副本放置
------------------
副本放置的机架感知目的在可靠性、可用性、提升网络性能。
[/rack1]
192.168.127.201 S201
192.168.127.202 S202
192.168.127.203 S203
[rack2]
192.168.127.204 S204/rack2/S204
192.168.127.205 S205/rack2/S205
自定义机架感知
-----------------
1.实现org.apache.hadoop.net.DNSToSwitchMapping
package com.it18zhang.myhadoop273_122;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.net.DNSToSwitchMapping;
/**
* 机架感知
*/
public class MyDNSToSwitchMapping implements DNSToSwitchMapping {
public List<String> resolve(List<String> names) {
List<String> list = new ArrayList<String>();
for(String name : names){
Integer ip = null ;
if(name.startsWith("192")){
ip = Integer.parseInt(name.substring(name.lastIndexOf("." + 1)));
}
else{
ip = Integer.parseInt(name.substring(1));
}
if(ip <= 203){
list.add("/rack1/S" + ip);
}
else{
list.add("/rack2/S" + ip);
}
}
return list;
}
public void reloadCachedMappings() {
}
public void reloadCachedMappings(List<String> names) {
}
}
2.配制core-site.xml
<property>
<name>net.topology.node.switch.mapping.impl</name>
<value>com.it18zhang.myhadoop273_122.MyDNSToSwitchMapping</value>
</property>
3.导出jar
使用eclipse的maven build菜单实现导出jar.
跳过测试
clean package -DskipTests
4.分发jar和配置文件到集群。
myhadoop273_122-0.0.1-SNAPSHOT.jar分发到${hadoop_home}/soft/hadoop/share/hadoop/common/lib/
core-site.xml分发到${hadoop_home}/soft/hadoop/etc/hadoop/
自定义机架感知
-----------------
1.实现org.apache.hadoop.net.DNSToSwitchMapping
package com.it18zhang.myhadoop273_122;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.net.DNSToSwitchMapping;
/**
* 机架感知
*/
public class MyDNSToSwitchMapping implements DNSToSwitchMapping {
public List<String> resolve(List<String> names) {
List<String> list = new ArrayList<String>();
for(String name : names){
Integer ip = null ;
if(name.startsWith("192")){
ip = Integer.parseInt(name.substring(name.lastIndexOf("." + 1)));
}
else{
ip = Integer.parseInt(name.substring(1));
}
if(ip <= 203){
list.add("/rack1/S" + ip);
}
else{
list.add("/rack2/S" + ip);
}
}
return list;
}
public void reloadCachedMappings() {
}
public void reloadCachedMappings(List<String> names) {
}
}
2.配制core-site.xml
<property>
<name>net.topology.node.switch.mapping.impl</name>
<value>com.it18zhang.myhadoop273_122.MyDNSToSwitchMapping</value>
</property>
3.导出jar
使用eclipse的maven build菜单实现导出jar.
跳过测试
clean package -DskipTests
4.分发jar和配置文件到集群。
myhadoop273_122-0.0.1-SNAPSHOT.jar分发到${hadoop_home}/soft/hadoop/share/hadoop/common/lib/
core-site.xml分发到${hadoop_home}/soft/hadoop/etc/hadoop/
hdfs文件系统的基本操作
------------------------
并行复制
-----------------------
$>hadoop distcp hdfs://192.168.231.201:8020/user/centos/hadoop hdfs://192.168.231.201:8020/user/centos/data
文件归档
---------------------
har归档产生一个目录,目录名称xxx.har,该目录下有相关数据文件.
_index //索引文件
part-0 //数据文件
$>hadoop archive -archiveName my.har -p hadoop /user/centos///归档
$>hdfs dfs -lsr har:///user/centos/my.har//查看归档内容
数据完成性
-------------------
校验和(checksum,CRC32)。
校验和使用32的整数,使用4个字节存储,开销小于要校验数据的1%.
io.file.buffer.size//指定多少字节校验一次。
不能超过io.file.buffer.size.
hdfs dfs -get -crc xx//下载文件时,同时下载校验和文件
hdfs dfs -get -ignoreCrc xx//下载文件时,不进行校验和处理
客户端读取文件可以关闭校验和
-----------------------------
提高读取速率,但不保证数据正确性。
FileSystem fs = ... ;
fs.setVerifyChecksum(false);
fs.open(...);
RawLocalFileSystem ----|> FileSystem
LocalFileSystem ----|> CheckSumFileSystem
datanode
-------------------
blk_xxxxx //块数据,没有元数据,纯粹的数据.
blk_xxxxx_1032.meta//校验和数据.4字节对应512数据字节,7个字节的头信息。
filesize metadata
11 11//4 + 7
12 11//4 + 7
6094 55//12 x 4 + 7 = 55
压缩
---------------------
减少存储空间,提高传输效率。
压缩格式 压缩工具压缩算法文件扩展名是否可以切割
----------------------------------------------------------------------------------
DEFLATE N/A DEFLATE .deflate No
gzip gzipDEFLATE.gzNo
bzip2 bzip2bzip2.bz2Yes
LZO lzopLZO.lzoNo
LZ4 N/ALZ4.lz4No
Snappy N/A Snappy .snappy No
Codec
-------------------
编解码器.
压缩格式 Hadoop 压缩编解码器
---------------------------------------------------
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec
package com.it18zhang.myhadoop273_122;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;
/**
* 测试压缩编解码器
*/
public class TestCodec {
/**
* 测试gzip
*/
@Test
public void testGzip() throws Exception{
//
Class clazz = GzipCodec.class;
Configuration conf = new Configuration();
//创建gzipo codec实例
CompressionCodec codec = ReflectionUtils.newInstance(clazz, conf);
long start = System.currentTimeMillis() ;
//得到压缩输出流
OutputStream out = codec.createOutputStream(new FileOutputStream("d:/codec/hadoop.gz"));
IOUtils.copyBytes(new FileInputStream("d:/codec/hadoop.pdf"), out, 1024);
out.close();
System.out.println(System.currentTimeMillis() - start);
}
/**
* 测试gzip
*/
@Test
public void testUnGzip() throws Exception{
//
Class clazz = GzipCodec.class;
Configuration conf = new Configuration();
//创建gzipo codec实例
CompressionCodec codec = ReflectionUtils.newInstance(clazz, conf);
long start = System.currentTimeMillis() ;
//解压器
Decompressor dcor = codec.createDecompressor();
//得到压缩输出流
InputStream in = codec.createInputStream(new FileInputStream("d:/codec/hadoop.gz"),dcor);
IOUtils.copyBytes(in,new FileOutputStream("d:/codec/hadoop_gz.pdf"), 1024);
in.close();
System.out.println(System.currentTimeMillis() - start);
}
}
在centos执行hadoop程序
--------------------------
1.在windows eclipse下编译程序并使用maven打包
mvn clean package -DskipTest
2.复制到centos下(共享目录)
3.进入centos的共享目录了
4.使用hadoop命令执行jar
$>hadoop jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_122.TestCodec ./README.txt
压缩性能评测
----------------------
压缩空间
压缩效率
解压缩效率
hadoop使用LZO编解码器
------------------------
1.安装lzo-2.06.tar.gz
a)tar文件
b)进入lz0-2.06目录
c)执行以下命令
$>su root
$>./configure //执行配置
$>make && make install//编译并安装
$>ls -al /usr/lib64 | grep lzo//查看
2.编译打包hadoop-lzo-master工程
a)现在centos安装maven
1)tar maven开文件到/soft
$>tar -xzvf apache-maven-bin.tar.gz -C /soft
2)创建符号连接
$>cd /soft
$>ln -s apache-maven-xxx maven
3)配置环境变量
$>sudo nano /etc/profile
[/etc/profile]
...
export M2_HOME=/soft/maven
export PATH=$PATH:$M2_HOME/bin
$>source /etc/profile
4)备份settings.xml文件
$>cd /soft/maven/conf
$>cp settings.xml settings.xml.bak
5)配置maven settings文件.
[/soft/maven/conf/settings.xml]
<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<!-- localRepository
| The path to the local repository maven will use to store artifacts.
|
| Default: ${user.home}/.m2/repository
<localRepository>/path/to/local/repo</localRepository>
-->
<!-- interactiveMode
| This will determine whether maven prompts you when it needs input. If set to false,
| maven will use a sensible default value, perhaps based on some other setting, for
| the parameter in question.
|
| Default: true
<interactiveMode>true</interactiveMode>
-->
<!-- offline
| Determines whether maven should attempt to connect to the network when executing a build.
| This will have an effect on artifact downloads, artifact deployment, and others.
|
| Default: false
<offline>false</offline>
-->
<!-- pluginGroups
| This is a list of additional group identifiers that will be searched when resolving plugins by their prefix, i.e.
| when invoking a command line like "mvn prefix:goal". Maven will automatically add the group identifiers
| "org.apache.maven.plugins" and "org.codehaus.mojo" if these are not already contained in the list.
|-->
<pluginGroups>
<!-- pluginGroup
| Specifies a further group identifier to use for plugin lookup.
<pluginGroup>com.your.plugins</pluginGroup>
-->
</pluginGroups>
<!-- proxies
| This is a list of proxies which can be used on this machine to connect to the network.
| Unless otherwise specified (by system property or command-line switch), the first proxy
| specification in this list marked as active will be used.
|-->
<proxies>
<!-- proxy
| Specification for one proxy, to be used in connecting to the network.
|
<proxy>
<id>optional</id>
<active>true</active>
<protocol>http</protocol>
<username>proxyuser</username>
<password>proxypass</password>
<host>proxy.host.net</host>
<port>80</port>
<nonProxyHosts>local.net|some.host.com</nonProxyHosts>
</proxy>
-->
</proxies>
<servers>
<server>
<id>releases</id>
<username>admin</username>
<password>admin123</password>
</server>
<server>
<id>snapshots</id>
<username>admin</username>
<password>admin123</password>
</server>
</servers>
<mirrors>
<mirror>
<id>nexus</id>
<mirrorOf>*</mirrorOf>
<url>http://192.168.231.1:8080/nexus/content/groups/public/</url>
</mirror>
</mirrors>
<profiles>
<profile>
<id>nexus</id>
<repositories>
<repository>
<id>central</id>
<!--
<url>http://s0:9090</url>
-->
<url>http://192.168.231.1:8080/nexus/content/repositories/central/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>central</id>
<!--
<url>http://s0:9090</url>
-->
<url>http://192.168.231.1:8080/nexus/content/repositories/central/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
<activeProfiles>
<activeProfile>nexus</activeProfile>
</activeProfiles>
<!-- activeProfiles
| List of profiles that are active for all builds.
|
<activeProfiles>
<activeProfile>alwaysActiveProfile</activeProfile>
<activeProfile>anotherAlwaysActiveProfile</activeProfile>
</activeProfiles>
-->
</settings>
b)解压hadoop-lzo-master.zip
$>zip -d hadoop-lzo-master.zip
c)使用maven编译打包
$>cd hadoop-lzo-master
$>mvn clean package -DskipTests
hadoop压缩编解码器
--------------------
default
deflate
gz
bzip2
lzo //GPL
lz4
snappy //
在centos 7上配置snappy压缩编解码器
--------------------------------------
1.安装snappy
$>sudo yum intall snappy
2.替换hadoop/lib/native/libhadoop.so文件.
$>cp libhadoop.so ${HADOOP_HOME}/lib/native/libhadoop.so.1.0.0
在centos 7上配置LZO压缩编解码器
--------------------------------------
1.安装LZO本地库
$>sudo yum install lzo.x86_64
2.编译打包hadoop-lzo-master
a.下载
略
b.tar
略
c.修改c源代码.
[src/main/native/impl/lzo/LzoCompressor.c]
//liblzo2 = dlopen(HADOOP_LZO_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
liblzo2 = dlopen("liblzo2.so.2", RTLD_LAZY | RTLD_GLOBAL);
[src/main/native/impl/lzo/LzoDeCompressor.c]
//liblzo2 = dlopen(HADOOP_LZO_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
liblzo2 = dlopen("liblzo2.so.2", RTLD_LAZY | RTLD_GLOBAL);
d.编译
$>mvn clean package -DskipTests
e.生成target文件夹,里面有.jar.
将编译生成的hadoop-lzo-master项目安装maven私服上。
---------------------------------------------------
1.在私服的central仓库目录中创建自己的目录.
H:\maven-repo\nexus\storage\central\com\hadoop\gplcompression\hadoop-lzo\0.4.20-SNAPSHOT\hadoop-lzo-0.4.20-SNAPSHOT.jar
H:\maven-repo\nexus\storage\central\com\hadoop\gplcompression\hadoop-lzo\0.4.20-SNAPSHOT\hadoop-lzo-0.4.20-SNAPSHOT-javadoc.jar
H:\maven-repo\nexus\storage\central\com\hadoop\gplcompression\hadoop-lzo\0.4.20-SNAPSHOT\hadoop-lzo-0.4.20-SNAPSHOT-sources.jar
H:\maven-repo\nexus\storage\central\com\hadoop\gplcompression\hadoop-lzo\0.4.20-SNAPSHOT\hadoop-lzo-0.4.20-SNAPSHOT.pom
2.手动更新maven私服的central仓库的索引.
a)使用账号登录私服:admin/admin123
b)刷新中央仓库的browse storage.
c)手动更新中央仓库的索引.
d)刷新索引browse index.
3.复制hadoop-lzo项目的依赖项,添加项目的pom.xml文件中
<dependency>
<groupId>com.hadoop.gplcompression</groupId>
<artifactId>hadoop-lzo</artifactId>
<version>0.4.20-SNAPSHOT</version>
</dependency>
4.手动复制hadoop-lzo-master项目到本地仓库
5.项目中使用com.hadoop.compression.lzo.LzoCodec类库
6.导出eclipse项目成jar包.
7.复制导出的jar到centos的共享目录下。
8.复制hadoop-lzo-0.4.20-SNAPSHOT.jar文件到/soft/hadoop/shared/hadoop/common/lib
9.执行程序
$>hadoop jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_122.TestCodec ./README.txt
执行程序
-------------
$>hadoop jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_122.TestCodec ./README.txt
.deflate : zip size=2444 | comp time=2247 |decomp time=340
.deflate : zip size=2444 | comp time=189|decomp time=180
.gz : zip size=2456 | comp time=208|decomp time=164
.bz2 : zip size=2497 | comp time=319|decomp time=164
.lz4 : zip size=3766 | comp time=98|decomp time=127
.lzo_deflate : zip size=3548 | comp time=217|decomp time=194
.snappy : zip size=3554 | comp time=187 |decomp time=110
file ize //deflate(2444) < gz(2456) < bzip2(2497)< lzo(3548)< snappy(3554)< lz4(3766)
zip time //lz4(98) < snappy(187)< gz(208)< lzo(217)< bz2(319)< deflate(2247|189)
unzip time //snappy(110) < lz4(127) < gz(164)= bz2(164)< deflate(340|180)< lzo(194)
使用CodecPool池化压缩编解码器
-------------------------------
/**
* 测试压缩
*/
public void testCompress(Class codecClass,String srcPath ) throws Exception{
//
Class clazz = codecClass ;
Configuration conf = new Configuration();
//创建gzipo codec实例
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(clazz, conf);
/************************压缩器****************************/
Compressor compressor = CodecPool.getCompressor(codec);
//扩展名
String ext = codec.getDefaultExtension();
long start = System.currentTimeMillis() ;
File outFile = new File(srcPath + ext);
//得到压缩输出流,指定压缩器
OutputStream out = codec.createOutputStream(new FileOutputStream(outFile),compressor);
IOUtils.copyBytes(new FileInputStream(srcPath), out, 1024);
out.close();
//放回编解码器池
CodecPool.returnCompressor(compressor);
System.out.print(ext + "\t: zip size=" + outFile.length() + "\t| comp time=" + (System.currentTimeMillis() - start));
}
hadoop
-----------------
interface WritableComparable extends Writable , Comparable{
write();
readFields();
compare();
}
class IntWritable implements WritableComparable{
...
}
IntWritable 类似于 Integer,里面对int基本数据类型的包装。
ByteWritable
LongWritable
...
ArrayWritable
MapWritable
Text
-----------
可变的.
内部直接操作byte[].
使用utf8编码。
getLength() //字节数组长度
getBytes().length//capacticy
charAt(int i) //ANSI ascii
find("xxx") //返回字符在字节数组中的起始索引
NullWritable
-------------
占位符
单例设计模式(饿汉式)
没有参与串行和反串行过程.
ObjectWritable
---------------------
1.处理基本类型、String以及两种类型构成的数组.
2.自定义不可以。
RawComparator
--------------------
原生比较器.
直接根据byte[]比较大小。
速度快。
避免反串行化。
class WritableComparator implements RawComparator, Configurable{
...
compare(byte[] a1,int o1,int l1 , byte[] a2,int o2,int l2){
...
}
}
使用对比器比较对象的大小
--------------------------
1.实现类PersonComparator,该类实现RawComparator.
2.创建PersonComparator对象,调用compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法.
序列化框架
-----------------
interface org.apache.hadoop.io.serializer.Serialization<T>{
//
boolean accept(Class<?> c);
//序列化器
Serializer<T> getSerializer(Class<T> c);
//反序列化
Deserializer<T> getDeserializer(Class<T> c);
}
//hadoop内置实现WritableSerialization
class org.apache.hadoop.io.serializer.WritableSerialization{
Serializer<T> getSerializer(Class<T> c){
return new WritableSerializer();
}
//反序列化
Deserializer<T> getDeserializer(Class<T> c){
return new WritableDeserializer(getConf(), c);
}
}
[core-default.xml]配置串行化系统
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization,
org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,
org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
</property>
为什么不用java串行化
----------------------
太复杂。
负载重。
avro
-----------------------
1.描述
是数据串行化系统,
丰富的数据结构
紧凑、快速、二进制数据格式。
容器型文件,存储持久化数据。
远程过程调用。
和动态语言的简单集成。
java
-----------------
ObjectOutputStream/ObjectInputStream
size()
hadoop
-----------------
writable
DataOuputStream/DataInputStream
avro
---------------
串行化系统。
json.
跨语言。
基于schema(数据结构).
在hadoop首选串行化技术。
avro特性
---------------
语言中立的串行化系统
多种语言可以进行处理
可压缩可切割
结构丰富
使用json定义schema
avro数据文件是自我描述。
RPC通信时,在连接握手时进行schema交换。
使用avro
------------------------
-1.工作过程
编译avsc文件
编写schema文件(.avsc) -------------> 生成java源文件 ------> 使用java类
0.搭建avro
a)下载avro相关jar包
avro-1.8.0.jar
avro-tools-1.8.0.jar
b)eclipse创建maven项目
c)导入pom.xml依赖
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.8.1</version>
</dependency>
1.创建schema
{
" type " : "record",
" namespace " : "Tutorialspoint",
" name " : "Employee",
" fields " : [
{ "name" : " Name" , "type" : "string" },
{ "name" : "age" , "type" : "int" }
]
}
2.编译schema
cmd>cd d:/avro
cmd>mkdir out
cmd>java -jar avro-tools-1.8.1.jar compile schema emp.avsc out
3.生成java类。
略
4.导入生成的类到eclipse项目下
5.编程实现串行(反串行)
@Test
public void testSerial() throws Exception{
Employee e = new Employee();
e.setName("tom");
e.setAge(12);
//创建数据写入器
DatumWriter<Employee> dw = new SpecificDatumWriter<Employee>(Employee.class);
DataFileWriter<Employee> fw = new DataFileWriter<Employee>(dw);
fw.create(e.getSchema(),new File("d:/avro/employee.avro"));
fw.append(e);
fw.append(e);
fw.append(e);
fw.close();
System.out.println("over");
}
/**
* 测试反串行化
*/
@Test
public void testDeserial() throws Exception{
//创建阅读器
DatumReader<Employee> dr = new SpecificDatumReader<Employee>(Employee.class);
DataFileReader<Employee> fr = new DataFileReader(new File("d:/avro/employee.avro"), dr);
Employee e = new Employee() ;
while(fr.hasNext()){
fr.next(e);
System.out.println(e.getName() + "," + e.getAge());
}
fr.close();
}
不进行编译,直接通过API实现编程
---------------------------------
文件结构
--------------------
1.sequenceFile
二进制key - value.
Writer:1.不压缩 2.record压缩 3.block压缩
体验sequencefile
--------------------
数据格式:
SEQ[Version][KeyClass][ValueClass][CompressCodec]...
SequenceFile.Reader.next(key,value);
1.桥接模式组建hadoop集群
2.编译hadoop源代码.
centos.
win7.
3.snappy + lzo
Hadoop snappy配置
[src/main/native/impl/lzo/LzoCompressor.c]
...
//liblzo2= dlopen(HADOOP_LZO_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
liblzo2 = dlopen("liblzo2.so.2", RTLD_LAZY | RTLD_GLOBAL);
[src/main/native/impl/lzo/LzoDeCompressor.c]
...
//liblzo2= dlopen(HADOOP_LZO_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
liblzo2 = dlopen("liblzo2.so.2", RTLD_LAZY | RTLD_GLOBAL);
java串行化
---------------
ObjectInputStream/ObjectOutputStream
protobuf
-----------------
Message PB
avro
----------------
串行化系统。
schema //json
跨语言。 //互操作性。
writable
----------------
DataOuputStream/DataInputStream//写入纯数据。
sequence File
----------------
序列文件。
key - value
可以切割的。
sync //同步点
none
record压缩 //value
block压缩 //(record...)block
排序:
--------------------
SequenceFile.Sorter//sort()
合并
-------------------
合并文件没有排序处理。
MapFile
-----------------
排序的SequenceFile,有索引。
写入时必须保证顺序性。
按照key来查找.
每个mapfile对应一个目录,该目录下由index + data文件.均为sequenceFile文件。
index : 是key到data中偏移量的映射
data : 纯粹的数据文件。
getClosest(); //得到下一个最近的记录。
管理hadoop
-----------------------
namenode名称节点的本地目录
edit //编辑日志
fsimage //镜像文件
查看镜像文件
-----------------
hdfs oiv //offline fs image viewer
//hdfs oiv -i imageFile -p XML -o image.xml
hdfs oev //offline fs edit viewer,查看编辑日志(hdfs写动作过程)
//hdfs oev -i xxedit -p XML -o edit.xml
hdfs dfsadmin -rollEdits//滚动日志
安全模式
-----------------------------
hdfs dfsadmin -safemode enter//进入安全模式
hdfs dfsadmin -safemode leave//离开安全模式
hdfs dfsadmin -safemode get//查询
hdfs dfsadmin -safemode wait//等待
保存名字空间
---------------------------
保存名字空间。
保存当前的内存中名字空间到存储目录并reset log。要求安全模式.
检查点控制手段
------------------------------------
1.配置2nn检查点周期,默认是一小时(秒数)
[hdfs-site.xml]
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
<description>The number of seconds between two periodic checkpoints.
</description>
</property>
2.编辑日志超过64M,也会创建检查点。
将2nn配置成独立的节点
----------------------
[hdfs-site.xml]
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>192.168.231.206:50090</value>
<description>
The secondary namenode http server address and port.
</description>
</property>
nn故障的话,两种方式恢复工作
-----------------------------
1.复制2nn的数据到新的nn
2.启动nn时,使用-importCheckpoint选项进行检查点导入
start-dfs.sh -importCheckpoint
metasave
-----------------------------
另存集群的元数据统计信息,保存文件到名称节点的logs/目录下
hdfs dfsadmin -metasave xx.meta
本地目录
-----------------------------
1.修改名称节点和数据节点的本地存放目录。
<property>
<name>hadoop.tmp.dir</name>
<value>/home/centos/hadoop</value>
</property>
2.修改名称节点本地目录
<property>
<name>dfs.namenode.name.dir</name>
<!-- 每个目录中的文件内容相同,启动副本作用. -->
<value>file:///home/centos/hadoop/name1,file:///home/centos/hadoop/name2</value>
</property>
3.修改数据节点本地目录
<property>
<name>dfs.datanode.data.dir</name>
<!-- 每个目录中的文件内容不同,数据节点有副本的。 -->
<value>file:///home/centos/hadoop/data1,file:///home/centos/hadoop/data2</value>
</property>
hadoop配额管理
--------------------
1.配额类型
space quota //空间配额
dir quota //目录配额
2.设置目录配额
hdfs dfsadmin -setQuota 2 /user/centos/hadoop
3.空间配合,至少>384M
hdfs dfsadmin -setSpaceQuota xxx hadoop
快照管理
--------------------
快速备份。
hdfs dfsadmin -allowSnapshot dir_name//启用指定目录快照
hdfs dfsadmin -disallowSnapshot dir_name//启用指定目录快照
hdfs dfs [-createSnapshot <snapshotDir> [<snapshotName>]]//创建快照
hdfs dfs [-deleteSnapshot <snapshotDir> <snapshotName>]//删除快照
hdfs dfs -renameSnapshot hadoop sp_1 sp_2//重命名快照
hdfs lsSnapshottableDir//列出所有可以快照的目录
hdfs snapshotDiff hadoop sp_1 sp_2//比较两个快照的不同.
块扫描器
---------------------
数据节点每隔多少小时扫描块数据,进行校验和计算。
[hdfs-site.xml]
<property>
<name>dfs.datanode.scan.period.hours</name>
<!--小时数(3周)-->
<value>504</value>
<description>
0 //默认值==504
-1 //禁用
</description>
</property>
webui:
-----
http://datanode:50075/blockScannerReport?listblocks
均衡器
阈值
阀门
-----------------
start-balancer.sh
回收站,控制文件在trash中的存活时间(分钟数)
--------------------------------------
客户端和服务器端均可进行配置,优先级server > client端。
shell命令删除文件会进入trash
编程删除的文件直接删除了。
编程可以通过moveToTrash()进入.
从trash恢复文件只要移出trash即可。
expunge //
$>hadoop fs -expunge//清除超期的文件。
eclipse.exe //
[core-site.xml]
<!-- 驻留间隔(分钟数) -->
<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>
<!-- 回收站检查间隔(分钟) <= fs.trash.interval -->
<property>
<name>fs.trash.checkpoint.interval</name>
<value>1</value>
</property>
eclipse配置hadoop插件
-----------------------
1.解压hadoop2x-eclipse-plugin.zip
2.复制hadoop2x-eclipse-plugin\release\*.jar文件到eclipse-jee-mars-SR2-win32-x86_64\plugins
3.重启eclipse
4.配置hadoop
windows-->首选项->hadoop/mapreduce ->指定win7的hadoop安装目录
slaves
----------------------
是集群操作所需要访问的文件。
数据节点的上线和下线(commisstion/decommission)
------------------------------------------------
0.准备
[hdfs-site.xml]
<!-- 决定数据节点能否连接到nn -->
<property>
<!-- 指定一个文件的完整路径,没有指定,说明说有节点都可连接 -->
<name>dfs.hosts</name>
<value>/soft/hadoop/etc/hadoop/datanodes.host</value>
</property>
服役新节点
------------------
-2.克隆主机
-1.处理克隆机
ip /etc/sysconfig/network-scripts/ifcfg-ethxxx
hostname /etc/hostname
重启网络服务 // systemctl restart network
修改/etc/resolv.conf//
[s201]
hosts //分发
xcall.sh
xsync.sh
0.配置hdfs-site.xml添加include文件配置
a)在nn创建/soft/hadoop/etc/hadoop/datanodes.host
s202
s203
s204
s205
s206
b)配置dfs.hosts属性
[hdfs-site.xml]
<property>
<!-- 指定一个文件的完整路径,没有指定,说明说有节点都可连接 -->
<name>dfs.hosts</name>
<value>/soft/hadoop/etc/hadoop/datanodes.host</value>
</property>
[yarn-site.xml]
<property>
<name>yarn.resourcemanager.nodes.include-path</name>
<value>/soft/hadoop/etc/hadoop/nms.host</value>
</property>
1.更新名称节点
hdfs dfsadmin -refreshNodes
2.更新slaves文件
s202
s203
s204
s205
s206
3.单独启动新dn
$>hadoop-daemon.sh start datanode
4.查看即可.
5.再平衡
$>start-balancer.sh
include和exclude的优先级
------------------------------
include exclude
no no//不能连接
no yes//不能连接
yes no//可连接
yes yes//可连但退役状态.
退役旧节点
------------------
1.编辑配置文件
a)/soft/hadoop/etc/hadoop/datanodes.exclude.host
s206
b)/soft/hadoop/etc/hadoop/nms.exclude.host
s206
c)[hdfs-site.xml]
<property>
<!-- 指定一个文件的完整路径,不可连接的节点名称 -->
<name>dfs.hosts.exclude</name>
<value>/soft/hadoop/etc/hadoop/datanodes.exclude.host</value>
</property>
d)[yarn-site.xml]
<property>
<!-- 指定一个文件的完整路径,不可连接的节点名称 -->
<name>yarn.resourcemanager.nodes.exclude-path</name>
<value>/soft/hadoop/etc/hadoop/nms.exclude.host</value>
</property>
2.刷新nn和rm
hdfs dfsadmin -refreshNodes//刷新nn
yarn rmadmin -refreshNodes//刷新RM
3.查看webui,发现datanode是decommisstion状态,直到副本复制完成,
如果所有退役节点都报告了Decommissioned已完成,则停掉退役节点。
hadoop-daemon.sh stop datanode
4.从include文件中删除退役的节点
5.刷新nn和rm
hdfs dfsadmin -refreshNodes//刷新nn
yarn rmadmin -refreshNodes//刷新RM
6.再从slaves文件中删除节点
略
----------------
0.分布式
由分布在**不同主机**上的进程**协同**在一起构成整个应用。
block.
1.存储
分布式存储
2.计算
分布式计算.
管理hadoop
----------------
1.配额
空间配额
目录配额
2.快照
snapshot
3.回收站
trash //
4.上下线
dfs.hosts //hdfs
dfs.hosts.exclude//hdfs
GFS
--------------
google file system.
MR:
----------------
MapReduce.
映射和化简。
编程模型。
使用mr计算年度的最高气温
--------------------------
1.1901.gz + 1902.gz
2.编写mapper
[MaxTempMapper.java]
package com.it18zhang.myhadoop273_1211;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* MR:Map
*/
public class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//缺失常量
private static final int MISSING = 9999;
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//取得一整行文本
String line = value.toString();
//提取年份值
String year = line.substring(15, 19);
//定义气温变量
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
//提取质量
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
3.编写Reducer
package com.it18zhang.myhadoop273_1211;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* MR:Reduce
*/
public class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* reduce
*/
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//最大值
int maxValue = Integer.MIN_VALUE;
//提取年份的最大值
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
//output key
context.write(key, new IntWritable(maxValue));
}
}
4.编写App类
package com.it18zhang.myhadoop273_1211;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* App
*/
public class App {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
//创建配置对象
Configuration conf = new Configuration();
//创建job对象
Job job = Job.getInstance(conf);
//设置jar搜索类
job.setJarByClass(App.class);
//设置作业名称
job.setJobName("Max temperature");
//添加输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置Mapper类型
job.setMapperClass(MaxTempMapper.class);
//设置Reducer类型
job.setReducerClass(MaxTempReducer.class);
//设置输出Key类型
job.setOutputKeyClass(Text.class);
//设置输出Value类型
job.setOutputValueClass(IntWritable.class);
//设置
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5.导出jar包
myhadoop273_122-0.0.1-SNAPSHOT.jar
6.put天气文件到hdfs
7.复制myhadoop273_122-0.0.1-SNAPSHOT.jar到共享目录
8.启动yarn进程
start-yarn.sh
9.验证
xcall.sh jps
10.执行hadoop jar运行mr作业
$>hadoop jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.App /user/centos/hadoop/data /user/centos/hadoop/data/out
word count
------------------
In pseudodistributed mode, we have to start daemons, and to do that using the supplied
scripts we need to have SSH installed. Hadoop doesn’t actually distinguish between
pseudodistributed and fully distributed modes; it merely starts daemons on the set of
hosts in the cluster (defined by the slaves file) by SSHing to each host and starting a
daemon process. Pseudodistributed mode is just a special case of fully distributed mode
in which the (single) host is localhost, so we need to make sure that we can SSH to
localhost and log in without having to enter a password.
map(keyin,valuein,keyout,valueout){
map(){
...
String line = valuein.toString();
String[] arr = line.split(" ");
for(String w : arr){
context.write(new Text(w),new IntWritable(1));
}
}
}
reduce
-------------------
public void reduce(Text key,Iterable<IntWritable>,Context ctx){
for(){
++ ;
ctx.write();,,
}
}
hadoop jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.wc.App /user/centos/hadoop/data2 /user/centos/hadoop/out2
调整集群
-----------------
1.配置206是2nn,注释掉dfs.hosts属性和dfs.hosts.exclude属性
[hdfs-site.xml]
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>128m</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///home/centos/hadoop/dfs/name1,file:///home/centos/hadoop/dfs/name2</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///home/centos/hadoop/dfs/data1,file:///home/centos/hadoop/dfs/data2</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>s206:50090</value>
</property>
<!--
<property>
<name>dfs.hosts</name>
<value>/soft/hadoop/etc/hadoop/datanodes.host</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>/soft/hadoop/etc/hadoop/datanodes.exclude.host</value>
</property>
<property>
<name>dfs.namenode.fs-limits.min-block-size</name>
<value>1024</value>
</property>
-->
</configuration>
2.yarn-site.xml文件注释掉inpath等属性
<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>s201</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--
<property>
<name>yarn.resourcemanager.nodes.include-path</name>
<value>/soft/hadoop/etc/hadoop/nms.host</value>
</property>
<property>
<name>yarn.resourcemanager.nodes.exclude-path</name>
<value>/soft/hadoop/etc/hadoop/nms.exclude.host</value>
</property>
-->
</configuration>
3.slaves配置文件如下
[slaves]
s202
s203
s204
s205
4.注释掉回收站
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://s201/</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/centos/hadoop</value>
</property>
<!--
<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>
<property>
<name>fs.trash.checkpoint.interval</name>
<value>1</value>
</property>
<property>
<name>net.topology.node.switch.mapping.impl</name>
<value>com.it18zhang.myhadoop273_122.MyDNSToSwitchMapping</value>
</property>
-->
</configuration>
combiner
------------------
map侧的reduce过程,但不是所有mr都适用。
combiner就是reducer.
MR
-----------
mapreduce
编程模型。
job(map + reduce)
编写MR过程
---------------
1.Mapper
class xxx extends Mapper<KeyIn,ValueIn,KeyOut,ValueOut>{
map(){
...
}
}
2.Reducer
class yyy extends Reducer<KeyIn,ValueIn,KeyOut,ValueOut>{
reduce(){
}
}
3.App
Configuration conf = new
Job job = Job.getInstance(conf);
job.setJarByClass();
job.setName()
FileInputFormat.addInputPath()
FileOutputFormat.setOutputPath()
job.setMapperClass()
job.setCombinerClass(Reducer.class);
job.setReducerClass();
job.setOuputKeyClass
job.setOuputValueClass()
job.waitForCompletion(true) ? 0 : 1
运行作业
---------------
HADOOP_CONF_DIR
start-dfs.sh --config
hadoop --config /soft/hadoop/etc/local jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.wc.App /home/centos/1.txt /home/centos/out
分析mr工作流程(local模式)
------------------------
MapReduce.Job
--> JobSubmitter.submitJob()
---> LocalJobRunner.submit()
--> LocalJobRunner$Job.start();//启动线程
-->LocalJobRunner$Job.run()
--> ThreadPoolExecutor.submit()//MapTaskRunnable
--> MapTaskRunnable.run();
--> MapTask.run();
--> WCMapper().run()
--> ThreadPoolExecutor.submit()//RedecuceTaskRunnable
--> ReduceTaskRunnable.run();
--> RedcueTask.run()
--> WCReducer.run();
--> ...
切片的计算法则
---------------------
切片的法则:remainBytes / splitSize > 1.1
1.split'minSize ,split'maxSize , block size
1 35
1 75
7 95
2.Math.max(minSize, Math.min(maxSize, blockSize));
com.it18zhang.helloworld.HelloWorld
远程调试
----------------
0.查看jdwp帮助
java -agentlib:jdwp=help
1.编写java源代码
2.导出jar包到远程主机
3.远程主机使用java -agentlib:jdwp选项启动java程序(具有远程调试功能)
java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8888 -cp HelloWorld.jar com.it18zhang.helloworld.HelloWorld
4.在eclipse中设置断点
5.客户端的eclipse使用远程调试java程序
debug --> remote debug application.
远程调试MR作业在hadoop集群上
---------------------------------
1.导出wc jar包
2.首先设置HADOOP_OPTS环境变量,为远程调试选项
export HADOOP_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8888
3.通过hadoop jar命令运行mr
hadoop jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.wc.App /user/centos/hadoop/data /user/centos/hadoop/out
4.
C:\Users\Administrator\.m2\repository\org\apache\hadoop\hadoop-yarn-common\2.7.3\hadoop-yarn-common-2.7.3.jar
IPC + RPC
----------------
IPC : inter process communication,进程间通信。
RPC :Remote procedure call,远程过程调用。
1.定义接口
public interface HelloWorldService extends VersionedProtocol {
public String sayHello(String str);
}
2.实现类
/**
* HelloWorldServiceImpl实现类
*/
public class HelloWorldServiceImpl implements HelloWorldService {
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return 0;
}
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
throws IOException {
return null;
}
public String sayHello(String str) {
System.out.println(str);
return "hello " + str;
}
}
3.创建Server端
package com.it18zhang.myhadoop273_1211.rpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
public class MyServer {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Server server = new RPC.Builder(conf).setProtocol(HelloWorldService.class)
.setInstance(new HelloWorldServiceImpl())
.setBindAddress("localhost")
.setPort(8888)
.setNumHandlers(2)
.build();
server.start();
}
}
4.创建Client
package com.it18zhang.myhadoop273_1211.rpc;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
public class MyClient {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
HelloWorldService proxy = RPC.getProxy(HelloWorldService.class, HelloWorldService.versionID,
new InetSocketAddress("localhost", 8888), conf);
String result = proxy.sayHello("tom");
System.out.println(result);
}
}
NIO
-------------
ByteBuffer //HeapByteBuffer
Data runtime area
---------------------
method area //共享
heap //共享
java method stack
native method stack
program counter register
MR提交流程
--------------------
1.Local(LocalJobRunner)
2.完全分布式(YarnRunner)
JMX
--------------------
通过nc做mr作业的集群跟踪
------------------------
hadoop jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.wc.App /user/centos/hadoop/data2 /user/centos/hadoop/out
pom文件加入插件实现资源复制
------------------------------
1.复制maven-antrun-plugin插件目录到本地目录下
C:\Users\Administrator\.m2\repository\org\apache\maven\plugins
2.编写pom.xml文件
[pom.xml]
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>*.xml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<echo>---------开始复制jar包到共享目录下----------</echo>
<delete file="D:/downloads/bigdata/data/myhadoop273_122-0.0.1-SNAPSHOT.jar"></delete>
<copy file="target/myhadoop273_122-0.0.1-SNAPSHOT.jar" toDir="D:/downloads/bigdata/data"></copy>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
centos上安装nc
--------------------
$>sudo yum search nmap-ncat.x86_64//查询
$>sudo yum install nmap-ncat.x86_64//安装
$>which nc //查看是否安装成功
$>nc -h //查看nc帮助
$>nc 192.168.231.1 8888//连接到win7的8888端口
启动hadoop历史服务器(在所有节点上)
-----------------------
$xcall.sh "/soft/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver"//启动
$xcall.sh "/soft/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver"//停止
map阶段的sort溢出问题
-------------------------
1.配置文件修改
[mapred-site.xml]
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>100</value>
<description>排序是内存的mb数量,超过此值时,开启分线程后台进行spill到disk,每个磁盘文件是1m</description>
</property>
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.80</value>
</property>
<!-- map存放运算中间数据的本地目录 -->
<property>
<name>mapreduce.cluster.local.dir</name>
<value>${hadoop.tmp.dir}/mapred/local</value>
</property>
2.分发文件
3.创建本地目录(在所有节点创建)
$>xcall.sh "mkdir -p /home/centos/hadoop/mapred/local"
4.创建文件
5.修改split大小
hdfs :
hadoop jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.wc.App
/user/centos/hadoop/data/big.txt /user/centos/hadoop/out
数据倾斜
------------------
1.大量数据涌向几个节点上运行。
2.重新设计key
key-随机数
3.设计分区函数
输入格式
-----------------
FileInputFormat
LF :line feed//换行\n
CR : carriage return//回车符\r
TextInputFile
SeqenceFileAsTextInputFormat
NLineInputFormat
KeyValueInputFormat
MultipleInputs
WholeFileInputFormat //自定义
Sqoop //数据导出导入工具
class InputFormat<K,V>
class DBInputFormat<T extends DBWritable> extends InputFormat<LongWritable, T>
class MyDBInputFormat extends DBInputFormat<MyDBWritable>
DBInputFormat --> DBWritable
-------------------------
1.创建mysql,table
OutputFormat
----------------
1.文本输出格式的分隔符
mapreduce.output.fileoutputformat.compress
2.MutlipleOutputs
[App.java]
//注册文件输出类型和keyvalue类型
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, Text.class, IntWritable.class);
//输出seqfile
MultipleOutputs.addNamedOutput(job, "seq", SequenceFileOutputFormat.class, Text.class, IntWritable.class);
[Reducer.java]
protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
mos = new MultipleOutputs<Text,IntWritable>(context);
}
@SuppressWarnings("unchecked")
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0 ;
for(IntWritable iw : values){
count = count + iw.get();
}
mos.write("text", key, new IntWritable(count),"text-a");
mos.write("seq", key, new IntWritable(count),"seq-a");
}
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//**** close ****
mos.close();
}
回车符
-------------
\r 13
换行符
\n 10
file:/d:/data/out/_temporary/0/_temporary/attempt_local1678884496_0001_r_000000_0
file:/d:/data/out/_temporary/0/task_local1678884496_0001_r_000000
DBOutput
--------------
1.将数据写入DB
OutputFormat<K,V> ...
public class DBOutputFormat<K extends DBWritable, V> extends OutputFormat<K,V> {
...
}
2.准备数据库表
create table wc(word varchar(50) primary key,c int);
3.创建DBWritable
package com.it18zhang.myhadoop273_1211.outputformat.db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
/**
* 自定义数据库输出
*/
public class MyDBOutWritable implements Writable, DBWritable {
public String word ;
public int count;
//hadoop串行化部分
public void write(DataOutput out) throws IOException {
out.writeUTF(word);
out.writeInt(count);
}
public void readFields(DataInput in) throws IOException {
word = in.readUTF();
count = in.readInt();
}
//数据库部分
public void write(PreparedStatement st) throws SQLException {
st.setString(1, word);
st.setInt(2, count);
}
public void readFields(ResultSet rs) throws SQLException {
word = rs.getString(1);
count = rs.getInt(2);
}
}
4.App设置Map的outKey和outValue类型,默认和reduce相同。
package com.it18zhang.myhadoop273_1211.outputformat.db;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* Multiple多个输出格式
*/
public class App {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Word Count <input path> <output path>");
System.exit(-1);
}
// 创建配置对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]), true);
}
// 创建job对象
Job job = Job.getInstance(conf);
// 设置jar搜索类
job.setJarByClass(App.class);
// 设置作业名称
job.setJobName("Word Count");
//
String driver = "com.mysql.jdbc.Driver" ;
String url = "jdbc:mysql://localhost:3306/big3" ;
String username = "root" ;
String password = "root" ;
//配置数据信息
DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
dbConf.setOutputFieldCount(2);
dbConf.setOutputFieldNames("word","c");
dbConf.setOutputTableName("wc");
DBConfiguration.configureDB(dbConf.getConf(), driver, url, username, password);
// 添加输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(DBOutputFormat.class);
// 设置Mapper类型
job.setMapperClass(WCMapper.class);
// 设置Reducer类型
job.setReducerClass(WCReducer.class);
//****************设置map的输出kv类型********************
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置输出Key类型
job.setOutputKeyClass(MyDBOutWritable.class);
// 设置输出Value类型
job.setOutputValueClass(NullWritable.class);
// 设置
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
InputFormat
-------------------
OutputFormat
------------------
DBOutputFormat
Counter
------------------
1.计数器
2.远程调试
-agentlib:jdwp=dt_sockt=tra...
nc
hadoop jar myhadoop273_122-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.App hadoop/data/19* hadoop/out
3.计数器的限制
name.max=64
count=120
<property>
<name>mapreduce.job.counters.counter.name.max</name>
<value>200000</value>
</property>
<property>
<name>mapreduce.job.counters.max</name>
<value>200000</value>
</property>
Map
-----------------
Ouput.flush()
-->sortAndSpill()
(QuickSort[HeapSort])
sorting
--------------------
hadoop jar myhadoop273_1216-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.sort.App hadoop/data/19* hadoop/out
排序。
1.部分排序
hadoop默认部分排序。
2.全排序
a.使用一个分区
b.自定义分区函数
划分合理的区间。
可能会产生数据倾斜。
c.使用采样器合理划分分区.
3.二次排序
排序
-----------------
1.部分排序
Reduce:按照key排序。
WritableComparable
2.全排序
a)一个reduce
b)自定义分区类
c)采样器
1.创建分区文件.
SequenceFile,Key=IntWritable,V=NullWritable
3.二次排序
辅助排序.
reduce-->KV-->value也需要排序.
key-value
a)设计组合key,extends WritableComparable(自然key,自然value)
public class CompKey implements WritableComparable<CompKey>{
public Text year = new Text();
public IntWritable temp = new IntWritable();
public void write(DataOutput out) throws IOException {
year.write(out);
temp.write(out);
}
public void readFields(DataInput in) throws IOException {
year.readFields(in);
temp.readFields(in);
}
/**
* 比较组合key
*/
public int compareTo(CompKey o) {
String y = year.toString();
int t = temp.get();
String y0 = o.year.toString();
int t0 = o.temp.get();
if(Integer.parseInt(y) != Integer.parseInt(y0) ){
return Integer.parseInt(y) - Integer.parseInt(y0) ;
}
return t - t0 ;
}
}
b)定义分区类
确保同自然key的数据发给同一分区.
public class YearPartitioner extends Partitioner<CompKey, NullWritable> {
public int getPartition(CompKey key, NullWritable value, int numPartitions) {
String year = key.year.toString();
return year.hashCode() % numPartitions;
}
}
c)定义分组对比器
GroupComparator//对哪些数据进行聚合到一组的。
public class YearGroupComparator extends WritableComparator{
public YearGroupComparator(){
super(CompKey.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
CompKey k1 = (CompKey)a ;
CompKey k2 = (CompKey)b ;
int y1 = Integer.parseInt(k1.year.toString());
int y2 = Integer.parseInt(k2.year.toString());
return y1 - y2 ;
}
}
d)定义排序对比器,控制在同一组内数据如何排序
public class CompKeyComparator extends WritableComparator {
public CompKeyComparator(){
super(CompKey.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
CompKey k1 = (CompKey)a ;
CompKey k2 = (CompKey)b ;
int y1 = Integer.parseInt(k1.year.toString());
int y2 = Integer.parseInt(k2.year.toString());
int t1 = k1.temp.get();
int t2 = k2.temp.get();
if(y1 != y2){
return y1 - y2 ;
}
return -(t1 - t2) ;
}
}
e)定义mapper
public class WCMapper extends Mapper<Text, IntWritable, CompKey, NullWritable> {
protected void map(Text key, IntWritable value, Mapper<Text, IntWritable, CompKey, NullWritable>.Context context)
throws IOException, InterruptedException {
CompKey ck = new CompKey();
ck.year = key ;
ck.temp = value ;
context.write(ck, NullWritable.get());
}
}
f)定义reducer
public class WCReducer extends Reducer<CompKey, NullWritable, Text,IntWritable> {
protected void reduce(CompKey key, Iterable<NullWritable> values,
Reducer<CompKey, NullWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
for(NullWritable w : values){
Text year = key.year;
IntWritable temp = key.temp ;
context.write(year, temp);
}
}
}
g)编写App,设置相关参数
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Word Count <input path> <output path>");
System.exit(-1);
}
// 创建配置对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]), true);
}
// 创建job对象
Job job = Job.getInstance(conf);
job.setJarByClass(App.class);
job.setJobName("2nd sort");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//输入格式(seqFile)
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(CompKey.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//设置分区类
job.setPartitionerClass(YearPartitioner.class);
//设置分组对比器
job.setGroupingComparatorClass(YearGroupComparator.class);
job.setSortComparatorClass(CompKeyComparator.class);
job.waitForCompletion(true);
return;
}
join
-------------------
jdbc.
和jdbc相似。
内连接:
外连接:补全。
气温数据
-------------------
1.气温数据的格式(序列文件<Year,Temp>)
year 气温值
2.对气温数据按照year全排序,同时对温度实现二次排序.
3.使用 >1 的reduce个数(5)
hadoop jar myhadoop273_1216-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.distributedcache.App hadoop/data/19* hadoop/out
customers.txt
---------------
1 tom
2 tomas
3 tomaslee
4 tomson
orders.txt
---------------
1 no007 12.34 1
1 no007 12.34 1
1 no007 12.34 1
1 no007 12.34 1
1 no007 12.34 1
1 no007 12.34 1
....
MR
----------------
1.Map
map连接
2.Reduce
连接
map端连接
------------------
1.准备数据
[customers.txt]
[orders.txt]
2.上传文件到hdfs
..
3.创建MR
public class JoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
//所有所有的Customers
Map<String, String> map = new HashMap<String, String>();
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
URI uri = context.getCacheFiles()[0];
InputStream in = uri.toURL().openStream();
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line = null ;
while((line = br.readLine()) != null){
String[] arr = line.split(" ");
map.put(arr[0], arr[0] + " " + arr[1]);
}
}
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
//提取每行订单
String line = value.toString();
String[] arr = line.split(" ");
String oid = arr[0];
String ono = arr[1];
String oprice = arr[2];
String cid = arr[3];
String cinfo = map.get(cid);
String info = cinfo + " " + ono + " " + oprice ;
context.write(new Text(info), NullWritable.get());
}
}
4.App
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
//创建配置对象
Configuration conf = new Configuration();
conf.setInt("mapreduce.job.counters.max", 200000);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]), true);
}
//添加客户文件到缓存中
DistributedCache.addCacheFile(new URI("hadoop/join/data/customers.txt"), conf);
Job job = Job.getInstance(conf);
job.setJarByClass(App.class);
//设置作业名称
job.setJobName("MapJoin");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(JoinMapper.class);
//设置输出Key类型
job.setOutputKeyClass(Text.class);
//设置输出Value类型
job.setOutputValueClass(NullWritable.class);
//设置
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
5.导出运行
hadoop jar myhadoop273_1216-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.join.map.App hadoop/join/data/orders.txt hadoop/join/out
reduce端连接
-----------------
1.定义组合key,类似于二次排序
package com.it18zhang.myhadoop273_1211.join.reduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
/**
* 组合key
*/
public class CompKey implements WritableComparable<CompKey>{
public String id ;
//标记1-Customer 2-Order
public int flag = 1 ;
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeInt(flag);
}
public void readFields(DataInput in) throws IOException {
id = in.readUTF();
flag = in.readInt();
}
/**
* 比较组合key
*/
public int compareTo(CompKey o) {
int id0 = Integer.parseInt(id);
int id1 = Integer.parseInt(o.id);
//
if(flag != o.flag){
return flag - o.flag ;
}
return id0 - id1;
}
}
2.定义分区类CIDPartitioner.java.
3.创建JoinReducer,第一个记录是Customer信息。
4.创建分组比较器CIDGroupComparator.
5.创建排序比较器.
6.App
7.运行
hadoop jar myhadoop273_1216-0.0.1-SNAPSHOT.jar com.it18zhang.myhadoop273_1211.join.reduce.App
compareTo(CompKey o){
if(flag != o.flag){
return flag - o.flag ;
}
if(flag == 1){
return cid = o.cid ;
}
return oid - o.oid ;
hadoop
------------
存储:分布式存储,hdfs,GFS
计算:分布式计算,MR编程.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。