赞
踩
本文介绍了Hadoop体系中最重要的HDFS原理,包括Hadoop体系介绍、HDFS架构及重要组件介绍、运行流程分析、 数据一致性模型、数据压缩、小文件处理、序列化、容错、HDFS文件存储、通讯协议等,最后介绍了HDFS常用命令。
Hadoop由HDFS、MapReduce、HBase、Hive和ZooKeeper等成员组成,其中最基础最重要元素为底层用于存储集群中所有存储节点文件的文件系统HDFS(Hadoop Distributed File System)来执行MapReduce程序的MapReduce引擎。
HDFS是一个高度容错性的分布式文件系统,可以被广泛的部署于廉价的PC上。它以流式访问模式访问应用程序的数据,这大大提高了整个系统的数据吞吐量,因而非常适合用于具有超大数据集的应用程序中。
HDFS的架构如图所示。HDFS架构采用主从架构(master/slave)。
一个典型的HDFS集群包含一个NameNode节点和多个DataNode节点。NameNode节点负责整个HDFS文件系统中的文件的元数据的保管和管理,集群中通常只有一台机器上运行NameNode实例,DataNode节点保存文件中的数据,集群中的机器分别运行一个DataNode实例。
在HDFS中,NameNode节点被称为名称节点,DataNode节点被称为数据节点。DataNode节点通过心跳机制与NameNode节点进行定时的通信。
HDFS拥有如下特点:
NameNode是分布式文件系统中的管理者,存储文件系统的meta-data,主要负责管理文件系统的namespace(命名空间),集群配置信息,block(存储块)的复制。
这里的NameSpace
就是指HDFS类似经典的层级文件组织架构,可创建、删除文件,从一个目录移动文件到另一个文件,重命名文件或目录等。
NameSpace由NameNode管理,会记录所有对HDFS文件系统NameSpace的操作到EditsLog。
文件副本数被称为该文件的复制因子,该信息由NameNode保存。
保存最近一次checkpoint的时间
注:以上这些文件(Fsimage EditLog Fstime)是保存在linux的文件系统中,而不是HDFS
truncate
旧的EditLog(因为所记录的操作事务已经合并到新的Fsimage)dfs.namenode.checkpoint.period
指定两次checkpoint的最大时间间隔,默认3600秒。dfs.namenode.checkpoint.txns
文件系统操作事务累积阈值fs.checkpoint.size
规定edits文件的最大值,一旦超过这个值则强制checkpoint,不管是否到达最大时间间隔。默认大小是64M。fsimage
的文件中,每次保存fsimage之后到下次保存之间的所有HDFS相关操作,将会记录在editlog
文件中DataNode
的HeartBeat
,记录DataNode上报的block信息NameNode会维护一个fsimage文件,也就是namenode中metedata
的镜像,但是fsimage不会随时与namenode内存中的metedata保持一致,而是每隔一段时间通过合并edits文件来更新内容。Standby Namenode
主要工作就是合并fsimage和edits文件,以此更新NameNode的metedata。
NameNode上维护一个故障转移控制器进程(FailoverController,比如zkfc),监控着Active NameNode(心跳),并在失效时进行故障切换。
ZKFC可参考:
NameNode迁移执行过程:
从NameNode上下载元数据信息(fsimage,edits),然后把二者合并,生成新的fsimage,在本地保存,并将其推送到NameNode,替换旧的fsimage。
Fsimage
在该方案中主备 NameNode 之间通过一组JournalNode
同步元数据信息,一条数据只要成功写入多数的 JournalNode 即认为写入成功。 通常配置奇数个(2N+1)个 JournalNode,这样,只要 N+1 个写入成功就认为数据写入成功,此时最多容忍 N-1 个 JournalNode 挂掉,比如 3 个 JournalNode 时,最多允许 1 个 JournalNode 挂掉,5 个 JournalNode 时,最多允许 2 个 JournalNode 挂掉。
StandBy NameNode观察到EditLog变化,就会读取并更新其内部的Fsimage。 一旦Active NameNode挂掉,StandBy NameNode会保证读取所有EditLog。其实原理和上面通用的方案步骤相同。
基于QJM的HDFS高可用架构如下图所示:
Blockreport
,包含该DataNode的所有block列表hdfs文件块的副本,默认是三个。所以一般存hdfs文件的硬盘不用做RAID。
文件副本分别存在以下位置:
HDFS 运行在跨越大量机架的集群之上。两个不同机架上的节点是通过交换机实现通信的,在大多数情况下,相同机架上机器间的网络带宽优于在不同机架上的机器。
在开始的时候,每一个数据节点自检它所属的机架 id,然后在向名字节点注册的时候告知它的机架 id。HDFS 提供接口以便很容易地挂载检测机架标示的模块。一个简单但不是最优的方式就是将副本放置在不同的机架上,这就防止了机架故障时数据的丢失,并且在读数据的时候可以充分利用不同机架的带宽。这个方式均匀地将复制分散在集群中,这就简单地实现了组建故障时的负载均衡。然而这种方式增加了写的成本,因为写的时候需要跨越多个机架传输文件块。
下面展示了NameNode拥有的两个文件的副本元数据,及在多个DataNode上的分布情况:
因为Namenode保存所有文件有内存扩展限制,所以在超大集群扩展时会成为瓶颈。2.x引入Federal HDFS概念,允许启动多个NameNode来进行扩展,每个NameNode管理文件系统命名空间的一部分。例如,NameNode1管理/user目录下的所有文件,NameNode2管理/share目录下的所有文件。
每个NameNode会维护一个NameSpace Volume(命名空间卷),包括了命名空间的源数据,以及有一个容纳该Namespace下的文件的所有Block的Block池。也就是说,每个NameNode之间相互独立不影响。所以,DataNode需要注册到所有NameNode,并且存放来自多个Block池的Block。
在进行数据读写的流程分析前,我们先讲三个基本概念:
例如,在client端向DataNode传数据的时候,HDFSOutputStream会有一个chunk buffer,写满一个chunk后,会计算校验和并写入当前的chunk,之后再把带有校验和的chunk写入packet。
当一个packet写满后,packet会进入DataQueue
队列,其他的DataNode就是从这个DataQueue获取client端上传的数据并存储的。同时一个DataNode成功存储一个packet之后会返回一个ack packet
,放入ack Queue
中。
packet
(数据包),并写入内部DataQueue
(数据队列)。pipeline
管道。当DataStreamer把数据流式传输到管道中的第一个DataNode后,该DataNode会先存放数据包(packet),然后转发给流水线中的第二个DataNode。第二个DataNode收到后同样转发给第三个DataNode。注意这里是流的形式在管道传输,而不是一个block或一个文件,所以效率很高。ackQueue
用来存放那些在DataNode写数据成功后发回的 ack
。只有当所有管道中的DataNode都发回了 ack 后,才会将该packet
从管道中移除。flush
所有剩余的数据包到DataNode 管道中,然后联系NameNode来标记文件已经写入完成,并等待发回的ack
。关闭pipeline,将ackQueue
中的所有packet 放到 DataQueue
的前段,这样管道中失败DataNode节点下游的DataNode不会丢失任何packet。当前写入的Block(位于正确的DataNode的上的副本)会被给与新的标识然后传递给NameNode。这样做的目的是在失败的DataNode恢复后删掉这个不完整的Block(因为写入失败了)。
在出错DataNode从管道中删除后,会在两个工况良好的DataNode间建立新的pipeline。随后将该block的剩余的数据写入管道中正常的DataNode。
最后,NameNode将在新的其他DataNode上创建该block的副本。接下来,其他block就按正常流程处理即可。
这样情况很少发生。只要写入的副本数达到了dfs.namenode.replication.min
(默认为1)就会成功,然后会按dfs.replication
(默认为3)异步复制该block到其他节点上。
如果有更多副本,那就随机选择,原则是尽量在一个机架上少放分本
HDFS不能自己定义网络拓扑结构,必须手动配置。顺序关系如下:
同节点->同机架上的不同节点->同数据中心不同机架->不同数据中心
HDFS为了均衡性能设计了一套数据一致性模型。
HDFS写入的数据不能保证立即可见(如果是写入的新建的文件,文件可见数据不可见)。也就是说即使已经做了flush
操作该文件长度也有可能为0。
具体来说,数据流必须当数据超过一个Block后,这第一个和之后的Block才对新的reader可见。总之,正在写入的Block对其他reader不可见,除非已经写完该Block。
HDFS提供了FSDataOutputStream.hflush方法来使得数据对reader可见。注意,hflush不能保证数据被DataNode写入磁盘而只能保证到内存,也就是说如果机器断电等故障可能导致数据丢失。
还有一个hsync方法,和hflush很像,不同之处是hsync除了保证数据对reader可见还会强制持久化数据。hsync有一定性能开销,要适当选择hsync时机。
close方法关闭文件流,并且在此之前会执行hflush。
HDFS中如果存在大量小文件会占用大量NameNode内存,十分低效。但必须注意,比如1MB的小文件用128MB的Block存储,也只会实际占用1MB磁盘。
Hadoop存档文件或HAR文件,高效的文件存档工具,将文件存入HDFS Block,减少NameNode内存使用,对文件访问透明,甚至可以直接作为MapReduce的输入。HAR是通过Archive工具来创建,需要运行MR程序来并行处理输入文件。HAR主要由索引文件和数据文件组成。
建立HAR开销、不可修改、作为MR输入时低效
读写过程,数据完整性如何保持?
HDFS会对写入的所有数据计算校验和,并在读取时验证校验和(io.bytes.per.checksum)。一般使用CRC-32校验,占用4个字节。
具体来说,HDFS做法主要是通过校验和验证数据完整性。因为每个chunk中都有一个校验位,一个个chunk构成packet,一个个packet最终形成block,故可在block上求校验和。
需要注意的是,校验和文件也有可能损坏,但因为文件很小所以概率很小。
HDFS 的client端即实现了对 HDFS 文件内容的校验和 (checksum) 检查:
HDFS中文件块目录结构具体格式如下:
${dfs.datanode.data.dir}/
├── current
│ ├── BP-526805057-127.0.0.1-1411980876842
│ │ └── current
│ │ ├── VERSION
│ │ ├── finalized
│ │ │ ├── blk_1073741825
│ │ │ ├── blk_1073741825_1001.meta
│ │ │ ├── blk_1073741826
│ │ │ └── blk_1073741826_1002.meta
│ │ └── rbw
│ └── VERSION
└── in_use.lock
in_use.lock表示DataNode正在对文件夹进行操作
rbw是“replica being written”的意思,该目录用于存储用户当前正在写入的数据。
Block元数据文件(*.meta)由一个包含版本、类型信息的头文件和一系列校验值组成。校验和也正是存在其中。
Client写入数据时,会带上数据的校验和。在写入数据的管道上的DataNode收到来组Client或上游DataNode的数据后会先进行数据校验再存储数据和校验和到本地。如果PipeLine最后一个DataNode检验失败则会抛一个IOException子类异常给Client,由Client决定下一步操作(如重试)。
Clietn读取数据时会利用从DataNode读取到的校验和(隐藏文件内)和读到的数据进行校验比对。如果校验成功会通知DataNode,DataNode更新日志,可帮助检测磁盘损坏。
DataNode上运行着DataBlockScanner,定期自检其上的数据。
如果需要检查已损坏文件:
fileSystem.setVerifyChecksum(false)
,然后再调用open()
来读文件hdfs dfs -get -ignoreCrc
或hdfs dfs -copyToLocal -ignoreCrc
hdfs dfs -checksum
更多关于压缩的内容可见:
数据文件压缩可以减少磁盘空间开销,加速数据在网络和磁盘上传输速度。
LZO在实现了索引后可切分。可在Github-Hadoop-LZO下载LZO代码库,包含一些如索引工具的工具包,可以用来构建LZO切分点索引,就可以让LZO支持MR任务切分了。
以上压缩格式都有1(侧重压缩速度优化)到9(侧重压缩空间优化)的选项来控制速度和空间的权衡。使用例子:
gzip -9 file
压缩算法需要权衡空间和时间,压缩速度快的代价往往是压缩比低。
压缩格式 | Codec | 工具 | 算法 | 扩展名 | 是否可切分 | 特点 |
---|---|---|---|---|---|---|
Gzip | org.apache.hadoop.io.compress.GzipCodec | gzip | DEFLATE | .gz | 否 | GZip压缩比高,大部分Linux系统自带Gzip命令,Hadoop原生就支持使用很方便;速度较慢,而且不支持切片。 |
bzip2 | org.apache.hadoop.io.compress.Bzip2Codec | bzip2 | bzip2 | .bz2 | 是 | BZip2压缩比最高,但速度实在太慢了 |
LZ4 | org.apache.hadoop.io.compress.Lz4Codec | 无 | LZ4 | .lz4 | 否 | 压缩比强于gzip,压缩速度慢于gzip |
LZO | com.hadoop.compression.lzo.LzopCodec | lzop | LZOP | .lzo | 是(取决于使用的库) | 压缩比尚可,速度快,支持切片(需要建立索引,且文件修改后要重建索引,还需将 InputFormat 指定为Lzo)。支持hadoop native库,但不是Hadoop自带,需要自己安装。 |
Snappy | org.apache.hadoop.io.compress.SnappyCodec | snappy | 否 | 压缩比最低,但速度最快,但不支持切片。支持hadoop native库,但不是Hadoop自带,需要自己安装。需要注意的是,实际生产环境中Snappy的表现通常比 LZO 好,应当进行测试对比再决定。 |
注:以上Codec表示压缩/解压缩算法的一种实现。
注:是否可切分指的是搜索数据流的任意位置并进一步往下读取数据,可切分更适合MapReduce。
注:LzopCodec在LZO格式上附带了额外的文件头,如果要纯LZO请使用LzoCodec,扩展名为.lzo_deflate
注:Gzip不能切分的原因:比如一个Gzip压缩后的文件大小1GB,分为8个Block,但因Gzip采用DEFLATE算法将数据压缩存储在连续的压缩块中,这些压缩块有标记,所以无法从数据流中找出一个压缩块的开始、结束位置,这样一来就不能8个MapTask分别读取8个Block,而只能一个MapTask顺序处理所有Block,效率较低。
总的来说,LZO,Snappy压缩速度更快,比gzip快一个数量级,但压缩比较低。Snappy解压速度比LZO高出很多。
Hadoop应用处理的数据集非常大,因此需要借助于压缩,使用哪种压缩格式与待处理的文件的大小格式和所使用的工具相关。
对于gzip,MR不会直接尝试切分.gzip格式文件,而是要读取这个文件的所有块,然后用一个map任务处理所有这些块。但这会牺牲数据本地性,因为大多数文件快没有存储在该map任务的节点。
LZO可以在预处理的时候使用Hadoop LZO库中的索引工具,来构建切分点索引。
下面按照效率从高到低排列:
注意:对大文件来说,不要使用不支持切分整个文件的压缩格式,因为会失去数据的本地特性,进而造成 MapReduce应用效率低下
因为HDFS中各个节点通信用的RPC协议,该过程会将消息序列化成二进制流然后通过网络发送到远程节点。到达目的地后,再由接收端将二进制流反序列化为原始消息,做进一步处理。
RPC序列化格式的四大理想属性是:
Hadoop使用的是自己的序列化格式Writable,它符合紧凑、速度快,但扩展性不好。
主要方法为两个:
一个Hadoop序列化Int类型数据,以及反序列化为IntWritable的例子:
private static byte[] hadoopSerialize(Writable writable) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(outputStream); writable.write(dataOutputStream); dataOutputStream.close(); return outputStream.toByteArray(); } private static IntWritable hadoopDeserialize(byte[] bytes) throws IOException { IntWritable intWritable = new IntWritable(); ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); DataInputStream dataInputStream = new DataInputStream(inputStream); intWritable.readFields(dataInputStream); dataInputStream.close(); return intWritable; } @Test public void testDeserialize() throws IOException { // 封装Java int类型,初值111 IntWritable writable = new IntWritable(111); // 也可set值为112 writable.set(112); byte[] bytes = hadoopSerialize(writable); System.out.println("bytes.length=" + bytes.length); // 一个整数4个字节 Assert.assertEquals(bytes.length, 4); IntWritable readWritable = hadoopDeserialize(bytes); System.out.println("readWritable=" + readWritable.get()); Assert.assertEquals(readWritable.get(), 112); }
IDL,全称Interface Description Language
,即接口定义语言,可不依赖具体开发语言来进行序列化声明,系统自动为每种语言生成特定类型。
流行的序列化框架:
解决了Writable多语言支持不足的问题。
考虑到某些分布式场景中大对象存在单个文件中不能实现扩展,所以在HDFS有一些高层次容器。
SequenceFile
面向行
SequenceFile
是一个二进制key/value键值对持久数据结构
可以自己选择key(比如LongWritable类型所表示的时间戳),值可以使Writable类型(表示日志记录的数量)。
SequenceFile也可以作为小文件的容器,以获得更高效的存储和处理。
HDFS和MR针对大文件优化,所以处理小文件效率低,我们可以先用SequenceFile来包装小文件再进行统一高效存储和处理。
SequenceFile.writer写入时会在顺序写文件过程中插入特殊字符来分隔若干记录,称为同步标识。
注意该同步标识体积小,且始终处于每条记录的边界处即不会出现在记录内部。所以可以将SequenceFile作为MR输入,因为该文件可使用SequenceFileInputFormat读取并切分map处理。
SequenceFile写例子如下:
@Test public void testSequenceFileWriter() throws IOException { String[] DATA = { "One, two, buckle my shoe" , "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; String uri = "your path"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); // 存在SequenceFile中的键值对不一定是Writable类型 // 只要能被指定的Serialization序列化/反序列化即可 IntWritable key = new IntWritable( ); Text value = new Text(); SequenceFile.Writer writer = null; try { // writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path), // SequenceFile.Writer.filesystem(fs), // SequenceFile.Writer.keyClass(key.getClass()), // SequenceFile.Writer.valueClass(value.getClass())); // 创建SequenceFile,有很多重载方法, // 比如指定压缩类型、Progressable回调用于通知写入进度等 writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i++) { // 设定key和value,可复用这两个对象在append吼反复设置值 key.set(100 - i); value.set(DATA[i % DATA.length]); // writer.getLength为SequenceFile当前位置 System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); // append到SequenceFile文末 writer.append(key, value); } }finally { // 关闭writer IOUtils.closeStream(writer); } }
以上代码以追加方式写入了100个Key-Value对到SequenceFile,Key为100到1的降序排列整数。该例子输出结果如下:
还可以通过调用Writer.sync
方法再数据流当前位置插入一个同步点,这样的SequenceFile可作为MR输入被切分使用(不同MapTask分别处理,参见SequenceFileInputFormat)。
SequenceFile读例子如下
@Test public void testSequenceFileReader() throws IOException { String uri = "your path"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try { // 创建一个SequenceFileReader reader = new SequenceFile.Reader(fs, path, conf); // getKeyClass和getValueClass可获取SequenceFile中使用的类型 // 这样可以处理使用Writable键值对的任一SequenceFile Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); // 获取同步点,即同步标识位置 long position = reader.getPosition(); // reader.next方法迭代读取SequenceFile记录 // 读到的记录与使用的序列化框架(io.serializations属性)相关 // 比如Writable类型时next(key, value)读取到的是下一个键值对 // 而其他类型请使用其他方法 // 比如Thrift使用 next(Object key)和 getCurrentValue(Object value) while (reader.next(key, value)) { // 若是同步点(即同步标识位置)就用*表示 String syncSeen = reader.syncSeen() ? "*" : ""; System.out.printf(" [%s%s]t%slt%s(n", position, syncSeen, key, value); // beginning of next record position = reader.getPosition(); } } finally { IOUtils.closeStream(reader); } }
以上代码运行结果如下,其中带*的表示是同步点:
如果需要搜索指定位置内容,可以使用seek
方法但必须传入记录边界位置否则报错;也可以使用sync
方法定位到下一个同步点。
SequenceFile由文件头(包括魔法值、版本号、key/value的Class名称,数据压缩、用户定义的元数据、随机生成的该文件的同步标识(Sync
))和若干Record组成。
Record的Value默认未开启压缩,可按单条(Key无压缩)或Block压缩(多条Record)。下图分别为SequnceFile格式总览、未开启压缩Record、单Record压缩:
开启Block压缩如图:
可以使用如下命令以文本形式查看HDFS中的SequenceFile:
hadoop fs -text number.seq | head
该命令的原理是通过文件魔法值来探测到文件类型并转为文本,可识别Gzip
、SequenceFile
、Avro
、其他格式视为纯文本。
MapFile
面向行java.util,map
的持久化形式。列式存储和行存储区别如下:
面向列的格式中,文件中的行或Hive中的一张表被分割成若干row split
,每个Split面向列存储。
可参考:
每个Orc文件由1个或多个stripe组成以及末尾的File Footer
组成
每个stripe250MB大小
Stripe相当于RCFile里的RowGroup概念,不过大小由4MB提高到250MB,能提升顺序读的吞吐率。
每个Stripe里有三部分组成,分别是Index Data,Row Data,Stripe Footer。
Index Data
用于选择Stripe和RowGroup。可只扫描部分记录,即根据Row Group Index中的min/max跳过WHERE
条件中不包含的stripes,跳过大量不需要的数据。
Index Data包含每列的最大、最小值和每列所在的行,还可以包含bit field
、 bloom filter
。
行索引里面提供了偏移量,以跳过不需要的行到正确的压缩块位置。默认情况下,最多可以跳过10000行。
因为可以通过过滤预测跳过很多行,因而可以在表的secondary keys
进行排序,从而可以大幅减少执行时间。比如你的表的主分区是交易日期,那么你可以对次分区(state、zip code以及last name)进行排序。
Row Data
所有列按列为单位分隔组成的多行数据,并对每列进行编码,分为多个Stream存储。
Stripe Footer
存每个Stripe的行数、每个列的数据类型信息等。
File Footer
每个文件有一个File Footer,存有每个Stripe的行数以及每个Column的数据类型等信息。
PostScript
每个文件的尾部是一个PostScript
,这里面记录了整个文件的压缩类型以及FileFooter的长度信息等。
读取文件时,会seek
到文件尾部读PostScript,从里面解析到File Footer长度。
再读FileFooter,从里面解析到各个Stripe信息。
再读各个Stripe,即从后往前读。
可参考:
Parquet面向列,可在查询时跳过无关列
基于Google Dremel的数据模型和算法,支持嵌套编码
可以真正列式存储格式来保存深度嵌套结构的数据,读取嵌套字段也不需要其他字段。
自带多种列编码方式,还可以使用Snappy、Gzip、LZO等压缩算法进一步压缩!
实践中,我们采用LZO格式压缩的文件大小1T左右,使用Parquet+LZO后仅为400多GB!
支持NULL
在definition levels
游程长度编码,不会在数据中编码。
语言无关,广泛适用
Parquet文件格式规范parquet-format与语言无关,不与任何一种数据处理框架绑定在一起,适配多种语言和组件,而且不同语言有规范实现方便读写:
可切分并行处理
对比ORCFile
Parquetarquet主要特点是支持嵌套格式,ORCfile主要特点是strips中有轻量级的index data。
存储格式
parquet-format项目用来定义Parquet内部数据模型、存储格式、元数据等。
对象模型转换
对象模型即数据在内存中的表示模型,比如Avro、Protocol Buffers、Thrift等。
parquet-mr项目用来将外部数据类型与Parquet内部数据类型进行映射转换。flink-parquet
也是用这了该项目的parquet-avro
来定义Parquet文件元数据和parquet-hadoop
下的ParquetWriter
来写入Parquet数据。
基本物理数据类型
注意不包括String类型
逻辑数据类型
建立在基本类型之上,高效编码。可参考LogicalTypes.md
以下是所有逻辑数据类型的解释和底层基本类型实现方式
Parquet文件元数据有三个维度:
file
如果文件元数据损坏,则文件将丢失
Column Chunk
如果列元数据已损坏,则该列块将丢失,但其他行组中该列的列块不受影响
Page Header(一个Column Chunk多Page共用一个Page Header)
如果Page Header丢失,则该Chunk内所有Page数据丢失。
如果一个Page数据出问题了,则该Page丢失。
文件头Header
PAR1:4字节MagicNumber,识别Parquet文件格式
Block
每个Parquet文件含若干Block
Row Group
每个Block存一个Row Group,包含所有列的多行。
逻辑概念,用于对row进行分区。
一般在hdfs上推荐一个block为1GB,一个HDFS文件1个bolock。
Column Chunk
每个Row Group又由若干Column Chunk列块组成,且在一个Row Group内每个列精确一个Column Chunk。
Column Chunk按列划分,内含该列在该RowGroup的全部数据,这些数据在文件中是连续的。
Page
每个Column Chunk又由若干连续的Page
(这些Page类型可不同)组成,他们共享一个header。且Reader可以跳过不感兴趣的Page。
由于每个Page值都来自于同一个列,所以可以以Page为单位统一编码、压缩(编码和压缩信息放在PageMetadata),那么每个Page应该是不可再分割的。
每个Page包含3份信息,在PageHeader之后:
definition level
。required节点的该值为0,optional的该值为0或1。
Page Checksum
数据Page可以单独校验。 这允许在HDFS文件级别禁用校验和,以更好地支持单行查找。 Page校验和是使用标准CRC32算法对页的压缩数据(不包括页头本身)进行计算的。
PageIndex
文件可包含可选的列索引来提高读效率,参考PageIndex.md
dictionary page
每个page之前都可以选择是否需要dictionary page,它记录了该page所有不同的值,可增强处理速度、提高压缩率。
文件尾Footer
版本信息、模式信息、所有Block的元数据信息、4字节表示元数据长度、PAR1。读取时先读文件尾元数据,可定位文件块,所以可切分并行处理。
一个Parquet文件实例
4-byte magic number "PAR1" <Column 1 Chunk 1 + Column Metadata> <Column 2 Chunk 1 + Column Metadata> ... <Column N Chunk 1 + Column Metadata> <Column 1 Chunk 2 + Column Metadata> <Column 2 Chunk 2 + Column Metadata> ... <Column N Chunk 2 + Column Metadata> ... <Column 1 Chunk M + Column Metadata> <Column 2 Chunk M + Column Metadata> ... <Column N Chunk M + Column Metadata> File Metadata 4-byte length in bytes of file metadata 4-byte magic number "PAR1"
该文件包含N列,分为M个Row Group。
编码
由于每个Page值都来自于同一个列,所以可以以Page为单位统一编码。Parquet有多种编码方式(差值编码、连续重复值编码、字典编码),在写文件时根据列类型自动选择适合的,该编码方式会写入到文件元数据供Reader读取时识别。
Run Length
适用重复数据
Delta Encoding
适用有序数据集,例如 timestamp,自动生成的 ID,以及监控 metrics
Dictionary Encoding
适用小规模的数据集合,例如 IP 地址
Prefix Encoding
适用字符串的增量编码
更多内容,可参考:
压缩
编码后以Page为单位利用标准压缩算法进行压缩。Parquet默认不使用,可支持Snappy、Gzip、LZO等。
写时需要设置Parquet文件属性:
注意:
Block适当大可以包含较多行,扫描效率搞,提升顺序IO操作效率。
但由于Block读写需要缓存在内存,所以不能过大,更不应该超过HDFS Block,才能使得读每个Parquet文件块时只需从一个HDFS Block上读取不会跨DataNode节点。
所以可以同步扩大Parquet Block和HDFS Block为512MB-1GB,且一个HDFS文件一个Block。
文件将具有较小的Row Goup,可以更有效地抵抗损坏。
读取任一行时,需要decode以及解压缩包含的相关Page,所以Page可以适当小。但不能过小,更多的Page会带来额外的元数据开销。
顺序读取时,一般不会一次读1个Page,可以设page.size为8KB。
message
为根保存,
required
(精准一个)、optional
(0或1个)、repeated
(0个或多个)修饰,再加上数据类型和字段名称。气象记录例子如下:
message WeatherRecord
{
required int32 year;
required int32 temperature;
required binary stationId (UTF8);
}
再来一个复杂一点的地址簿的例子:
message AddressBook {
required string owner;
repeated string ownerPhoneNumbers;
repeated group contacts {
required string name;
optional string phoneNumber;
}
}
地址簿的表示可用下图描述:
在实际存储时,Schema的树结构有几个叶子节点就会存多少个列:
我们可使用Hive、Impala等处理Parquet文件读写,也可用低级API。
Parquet有可插拔的内存数据模型,促进Parquet文件格式和各种工具、组件集成(比Flink中Avro定义Parquet元数据)。
各组件使用Parquet时并行化处理粒度:
ClientProtocol
与NameNode机器配置的TCP端口的建立连接。DataNodeProtocol
协议与NameNode通信。所有DataNode都会周期性地向NameNode发送心跳,当网络异常即发送网络分区时,会导致部分DataNode无法连接到NameNode。此时NameNode会侦测到最近无心跳的DataNode(阈值默认为10分钟),并标记为死亡节点,不再转发新的客户端IO请求给他们,其上的数据也不再可用。
DataNode死亡还会使得某些Block的副本数少于指定数量,NameNode会一直追踪那些需要被复制的block,而且一旦需要(比如DataNode失联或硬盘损坏、副本崩溃、文件副本因子调大等)就会立刻构建、初始化Block副本。
当某个DataNode上的空闲磁盘空间下降到阈值时,某些主题的数据移动到其他DataNode。
可能会发生获取一个Block数据中途发生崩溃,原因如存储设备故障、网络故障、软件Bug等。
HDFS客户端实现了校验和来检查HDFS文件内容。创建HDFS文件时,同时会为该文件的每个Block计算校验和并将这些校验和存在单独的一个隐藏文件中(同一个namespace)。
当客户端读取文件内容时,会先通过关联的校验和文件来对接收到的文件进行校验匹配检查。当校验不通过时,客户端可以重选其他DataNode来获取该文件Block副本。
FsImage和EditLog如果挂了,可导致HDFS服务失效。可对NameNode设置,持有多个这两个文件的副本,同步更新。
也可以采用NFS共享存储或是JounalNode(推荐)对NameNode做HA处理。
或是用zkfc(ZKFailoverController)来检测NameNode在ZK中的状态,异常时切换为StandBy Namenode。
Snapshots支持在特定时刻存储数据副本。 快照功能的一种用途可以是将损坏的HDFS实例回滚到先前已知的工作良好的某个时间点。
Hadoop2.6.0提供了堆外内存持久化的能力,如果在数据刷入磁盘前重启服务,可能丢失数据。
可参见Memory Storage Support in HDFS
Archival Storage可将不断增长的存储容量与计算容量分离。可用那些更高密度和低成本的存储能力及较低的计算能力的节点可用于存储冷数据。
可参见Archival Storage, SSD & Memory
可参考:
hadoop dfs [genericOpitions] [-ls <path>] //显示目标路径当前目录下的所有文件 [-lsr <path>] //递归显示目标路径下的所有目录及文件(深度优先) [-du <path>] //以字节为单位显示目录中所有文件的大小,或该文件的大小(如果path为文件) [-dus <paht>] //以字节为单位显示目标文件大小(用于查看文件夹大小) [-count [-q] <path>] //将目录的大小、包含文件(包括文件)个数的信息输出到屏幕(标准stdout) [-mv <src> <dst>] //把文件或目录移动到目标路径,这个命令允许同时移动多个文件,但是只允许移动到一个目标路径中,参数中的最有一个文件夹即为目标路径 [-cp <src> <dst>] //复制文件或目录到目标路径,这个命令允许同时复制多个文件,如果复制多个文件,目标路径必须是文件夹 [-rm [-skipTrash] <path>] //删除文件,这个命令不能删除文件夹 [-rmr [-skipTrash] <path>] //删除文件夹及其下的所有文件 [-expunge] [-put <localsrc> ... <dst>] //从本地文件系统上传文件到HDFS中 [-copyFromLocal <localsrc> ... <dst>] //与put相同 [-moveFromLocal <localsrc> ... <dst>] //与put相同,但是文件上传之后会从本地文件系统中移除 [-get [-ignoreCrc] [-crc] <src> <localdst>] //复制文件到本地文件系统。这个命令可以选择是否忽视校验和,忽视校验和和下载主要用于挽救那些已经发生错误的文件 [-getmerge <src> <localdst> [addnl]] //将源目录中的所有文件进行排序并写入目标文件中,文件之间以换行符分隔 [-cat <src>] //在终端显示(标准输出stdout)文件中的内容,类似Linux系统中的cat [-text <src>] [-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>] //与get相同 [-moveToLocal [-crc] <src> <localdst>] [-mkidr <path>] //创建文件夹 [-setrep [-R] [-w] <rep> <path/file>] //改变一个文件的副本个数。参数-R可以递归地对该目录下的所有文件做统一操作 [-touchz <path>] //类似Linux中的touch,创建一个空文件 [-test -[ezd] <path>] //将源文件输出为文本格式显示到终端上,通过这个命令可以查看TextRecordInputStream(SequenceFile等)或zip文件 [-stat [format] <path>] //以指定格式返回路径的信息 [-tail [-f] <file>] //在终端上显示(标准输出stdout)文件的最后1kb内容。-f选项的行为与LInux中一致,会持续监测先添加到文件中的内容,这在查看日志文件时会显得非常方便。 [-chmod [-R] <MODE[,MODE]...| OCTALMODE> PATH...] //改变文件的权限,只有文件的所有者或者是超级用户才能使用这个命令。-R可以递归地改变文件夹内的所有文件的权限 [-chown [-R] [OWNER] [:[GROUP] PATH...]] //改变文件的拥有者,-R可以递归地改变文件夹内所有文件的拥有者。同样,这个命令只有超级用户才能使用 [-chgrp [-R] GROUP PATH...] //改变文件所属的组,-R可以递归地改变文件夹内所有文件所属的组。这个命令必须是超级用户才能使用 [-help [cmd]] //这是命令的帮助信息
- 分页查看文件内容 hadoop dfs -text /xxx/20190813/abc |more - 查看目录下文件 hadoop dfs -ls -h /xxx/20190813 - 查看HDFS目录下的文件和子目录 hadoop dfs -ls hdfs_path - 在HDFS上创建文件夹 hadoop dfs -mkdir hdfs_path - 删除HDFS上的文件 hadoop dfs -rm hdfs_path - 删除HDFS上的文件夹 hadoop dfs -rmr hdfs_path - 将本地文件copy到HDFS上 hadoop dfs -put local_file hdfs_path - 复制HDFS文件到本地 hadoop dfs -get hdfs_file local_path - 查看HDFS上某文件的内容 hadoop dfs -cat hdfs_file - 查看HDFS上某文件内容行数 hadoop dfs -cat hdfs_file | wc -l - 查看gc目录大小 hadoop dfs -du -s -h /home/gc - 查看gc目录下所有文件大小,从小到大排序 hadoop dfs -du -s -h /home/gc/* | sort -n
注意 -text
和-cat
区别:
hdfs fsck /xxx/20190813/abc.lzo -locations -blocks -files
hdfs fsck /xxx/20190813 -openforwrite | grep OPENFORWRITE |awk -F ' ' '{print $1}'
hdfs debug recoverLease -path /xxx/20190813/abc.lzo
按文件大小倒序排序:
hadoop dfs -ls -S -h /xxx
按文件大小正序排序:
hadoop dfs -ls -S -h -r /xxx
按文件最后修改时间由旧到新排序
hadoop dfs -ls -S -t -r /xxx
按文件最后修改时间由新到旧排序
hadoop dfs -ls -S -t /xxx
hdfs dfsadmin -triggerBlockReport datanodeip:4004
参考:
-《Hadoop权威指南第四版》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。