赞
踩
Hadoop对于压缩文件的支持
如果我们压缩的文件有相应压缩格式的扩展名(比如lzo,gz,bzip2等),hadoop就会根据扩展名去选择解码器解压。
hadoop对每个压缩格式的支持,详细见下表:
如果压缩的文件没有扩展名,则需 要在执行mapreduce任务的时候指定输入格式.
- hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-streaming-0.20.2-CD H3B4.jar
- -file /usr/home/hadoop/hello/mapper.py -mapper /usr/home/hadoop/hello/mapper.py
- -file /usr/home/hadoop/hello/reducer.py -reducer /usr/home/hadoop/hello/reducer.py
- -input lzotest -output result4
- -jobconf mapred.reduce.tasks=1
- *-inputformat org.apache.hadoop.mapred.LzoTextInputFormat*
hadoop下各种压缩算法的压缩比,压缩时间,解压时间见下表:
压缩算法 | 原始文件大小 | 压缩后的文件大小 | 压缩速度 | 解压缩速度 |
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO-bset | 8.3GB | 2GB | 4MB/s | 60.6MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/S | 74.6MB/s |
hadoop各种压缩算法的优缺点简述
在考虑如何压缩那些将由MapReduce处理的数据时,考虑压缩格式是否支持分割是很重要的。考虑存储在HDFS中的未压缩的文件,其大小为1GB,HDFS的块大小为64MB,所以该文件将被存储为16块,将此文件用作输入的MapReduce作业会创建1个输人分片(split ,也称为“分块”。对于block,我们统一称为“块”。)每个分片都被作为一个独立map任务的输入单独进行处理。
现在假设。该文件是一个gzip格式的压缩文件,压缩后的大小为1GB。和前面一样,HDFS将此文件存储为16块。然而,针对每一块创建一个分块是没有用的,因为不可能从gzip数据流中的任意点开始读取,map任务也不可能独立于其他分块只读取一个分块中的数据。gzip格式使用DEFLATE来存储压缩过的数据,DEFLATE将数据作为一系列压缩过的块进行存储。问题是,每块的开始没有指定用户在数据流中任意点定位到下一个块的起始位置,而是其自身与数据流同步。因此,gzip不支持分割(块)机制。
在这种情况下,MapReduce不分割gzip格式的文件,因为它知道输入是gzip压缩格式的(通过文件扩展名得知),而gzip压缩机制不支持分割机制。这样是以牺牲本地化为代价:一个map任务将处理16个HDFS块。大都不是map的本地数据。与此同时,因为map任务少,所以作业分割的粒度不够细,从而导致运行时间变长。
在我们假设的例子中,如果是一个LZO格式的文件,我们会碰到同样的问题,因为基本压缩格式不为reader提供方法使其与流同步。但是,bzip2格式的压缩文件确实提供了块与块之间的同步标记(一个48位的PI近似值),因此它支持分割机制。
对于文件的收集,这些问题会稍有不同。ZIP是存档格式,因此它可以将多个文件合并为一个ZIP文件。每个文件单独压缩,所有文档的存储位置存储在ZIP文件的尾部。这个属性表明ZIP文件支持文件边界处分割,每个分片中包括ZIP压缩文件中的一个或多个文件。
根据应用的具体情况来决定应该使用哪种压缩格式。就个人而言,更趋向于使用最快的速度压缩,还是使用最优的空间压缩?一般来说,应该尝试不同的策略,并用具有代表性的数据集进行测试,从而找到最佳方法。对于那些大型的、没有边界的文件,如日志文件,有以下选项。
存储未压缩的文件。
使用支持分割机制的压缩格式,如bzip2。
在应用中将文件分割成几个大的数据块,然后使用任何一种支持的压缩格式单独压缩每个数据块(可不用考虑压缩格式是否支持分割)。在这里,需要选择数据块的大小使压缩后的数据块在大小上相当于HDFS的块。
使用支持压缩和分割的Sequence File(序列文件)。
对于大型文件,不要对整个文件使用不支持分割的压缩格式,因为这样会损失本地性优势,从而使降低MapReduce应用的性能。
在hadoop中使用lzo的压缩算法可以减小数据的大小和数据的磁盘读写时间,在HDFS中存储压缩数据,可以使集群能保存更多的数据,延长集群的使用寿命。不仅如此,由于mapreduce作业通常瓶颈都在IO上,存储压缩数据就意味这更少的IO操作,job运行更加的高效。
但是在hadoop上使用压缩也有两个比较麻烦的地方:第一,有些压缩格式不能被分块,并行的处理,比如gzip。第二,另外的一些压缩格式虽然支持分块处理,但是解压的过程非常的缓慢,使job的瓶颈转移到了cpu上,例如bzip2。
如果能够拥有一种压缩算法,即能够被分块,并行的处理,速度也非常的快,那就非常的理想。这种方式就是lzo。
lzo的压缩文件是由许多的小的blocks组成(约256K),使的hadoop的job可以根据block的划分来split job。不仅如此,lzo在设计时就考虑到了效率问题,它的解压速度是gzip的两倍,这就让它能够节省很多的磁盘读写,它的压缩比的不如gzip,大约压缩出来的文件比gzip压缩的大一半,但是这样仍然比没有经过压缩的文件要节省20%-50%的存储空间,这样就可以在效率上大大的提高job执行的速度。
hadoop下lzo配置文档参考http://www.tech126.com/hadoop-lzo/
如何在MapReduce中使用压缩
1.输入的文件的压缩
如果输入的文件是压缩过的,那么在被MapReduce读取时,它们会被自动解压,根据文件扩展名来决定应该使用哪一个压缩解码器。
2.MapReduce作业的输出的压缩
如果要压缩MapReduce作业的输出,请在作业配置文件中将mapred.output.compress属性设置为true。将mapred.output.compression.codec属性设置为自己打算使用的压缩编码/解码器的类名。
如果为输出使用了一系列文件,可以设置mapred.output.compression.type属性来控制压缩类型,默认为RECORD,它压缩单独的记录。将它改为BLOCK,则可以压缩一组记录。由于它有更好的压缩比,所以推荐使用。
3.map作业输出结果的压缩
即使MapReduce应用使用非压缩的数据来读取和写入,我们也可以受益于压缩map阶段的中间输出。因为map作业的输出会被写入磁盘并通过网络传输到reducer节点,所以如果使用LZO之类的快速压缩,能得到更好的性能,因为传输的数据量大大减少了。以下代码显示了启用rnap输出压缩和设置压缩格式的配置属性。
- conf.setCompressMapOutput(true);
- conf.setMapOutputCompressorClass(GzipCodec.class);
本地压缩库
考虑到性能,最好使用一个本地库(native library)来压缩和解压。例如,在一个测试中,使用本地gzip压缩库减少了解压时间50%,压缩时间大约减少了10%(与内置的Java实现相比较)。表4-4展示了Java和本地提供的每个压缩格式的实现。井不是所有的格式都有本地实现(例如bzip2压缩),而另一些则仅有本地实现(例如LZO)。
压缩格式 | Java实现 | 本地实现 |
DEFLATE | 是 | 是 |
gzip | 是 | 是 |
bzip2 | 是 | 否 |
LZO | 否 | 是 |
Hadoop带有预置的32位和64位Linux的本地压缩库,位于库/本地目录。对于其他平台,需要自己编译库,具体请参见Hadoop的维基百科http://wiki.apache.org/hadoop/NativeHadoop。
本地库通过Java系统属性java.library.path来使用。Hadoop的脚本在bin目录中已经设置好这个属性,但如果不使用该脚本,则需要在应用中设置属性。
默认情况下,Hadoop会在它运行的平台上查找本地库,如果发现就自动加载。这意味着不必更改任何配置设置就可以使用本地库。在某些情况下,可能希望禁用本地库,比如在调试压缩相关问题的时候。为此,将属性hadoop.native.lib设置为false,即可确保内置的Java等同内置实现被使用(如果它们可用的话)。
文件的压缩有两大好处:1、可以减少存储文件所需要的磁盘空间;2、可以加速数据在网络和磁盘上的传输。尤其是在处理大数据时,这两大好处是相当重要的。
下面是一个使用gzip工具压缩文件的例子。将文件/user/hadoop/aa.txt进行压缩,压缩后为/user/hadoop/text.gz
1 package com.hdfs; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.io.OutputStream; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FSDataInputStream; 10 import org.apache.hadoop.fs.FSDataOutputStream; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.IOUtils; 14 import org.apache.hadoop.io.compress.CompressionCodec; 15 import org.apache.hadoop.io.compress.CompressionCodecFactory; 16 import org.apache.hadoop.io.compress.CompressionInputStream; 17 import org.apache.hadoop.io.compress.CompressionOutputStream; 18 import org.apache.hadoop.util.ReflectionUtils; 19 20 public class CodecTest { 21 //压缩文件 22 public static void compress(String codecClassName) throws Exception{ 23 Class<?> codecClass = Class.forName(codecClassName); 24 Configuration conf = new Configuration(); 25 FileSystem fs = FileSystem.get(conf); 26 CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf); 27 //指定压缩文件路径 28 FSDataOutputStream outputStream = fs.create(new Path("/user/hadoop/text.gz")); 29 //指定要被压缩的文件路径 30 FSDataInputStream in = fs.open(new Path("/user/hadoop/aa.txt")); 31 //创建压缩输出流 32 CompressionOutputStream out = codec.createOutputStream(outputStream); 33 IOUtils.copyBytes(in, out, conf); 34 IOUtils.closeStream(in); 35 IOUtils.closeStream(out); 36 } 37 38 //解压缩 39 public static void uncompress(String fileName) throws Exception{ 40 Class<?> codecClass = Class.forName("org.apache.hadoop.io.compress.GzipCodec"); 41 Configuration conf = new Configuration(); 42 FileSystem fs = FileSystem.get(conf); 43 CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf); 44 FSDataInputStream inputStream = fs.open(new Path("/user/hadoop/text.gz")); 45 //把text文件里到数据解压,然后输出到控制台 46 InputStream in = codec.createInputStream(inputStream); 47 IOUtils.copyBytes(in, System.out, conf); 48 IOUtils.closeStream(in); 49 } 50 51 //使用文件扩展名来推断二来的codec来对文件进行解压缩 52 public static void uncompress1(String uri) throws IOException{ 53 Configuration conf = new Configuration(); 54 FileSystem fs = FileSystem.get(URI.create(uri), conf); 55 56 Path inputPath = new Path(uri); 57 CompressionCodecFactory factory = new CompressionCodecFactory(conf); 58 CompressionCodec codec = factory.getCodec(inputPath); 59 if(codec == null){ 60 System.out.println("no codec found for " + uri); 61 System.exit(1); 62 } 63 String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); 64 InputStream in = null; 65 OutputStream out = null; 66 try { 67 in = codec.createInputStream(fs.open(inputPath)); 68 out = fs.create(new Path(outputUri)); 69 IOUtils.copyBytes(in, out, conf); 70 } finally{ 71 IOUtils.closeStream(out); 72 IOUtils.closeStream(in); 73 } 74 } 75 76 public static void main(String[] args) throws Exception { 77 //compress("org.apache.hadoop.io.compress.GzipCodec"); 78 //uncompress("text"); 79 uncompress1("hdfs://master:9000/user/hadoop/text.gz"); 80 } 81 82 }
首先执行77行进行压缩,压缩后执行第78行进行解压缩,这里解压到标准输出,所以执行78行会再控制台看到文件/user/hadoop/aa.txt的内容。如果执行79行的话会将文件解压到/user/hadoop/text,他是根据/user/hadoop/text.gz的扩展名判断使用哪个解压工具进行解压的。解压后的路径就是去掉扩展名。
进行文件压缩后,在执行命令./hadoop fs -ls /user/hadoop/查看文件信息,如下:
1 [hadoop@master bin]$ ./hadoop fs -ls /user/hadoop/ 2 Found 7 items 3 -rw-r--r-- 3 hadoop supergroup 76805248 2013-06-17 23:55 /user/hadoop/aa.mp4 4 -rw-r--r-- 3 hadoop supergroup 520 2013-06-17 22:29 /user/hadoop/aa.txt 5 drwxr-xr-x - hadoop supergroup 0 2013-06-16 17:19 /user/hadoop/input 6 drwxr-xr-x - hadoop supergroup 0 2013-06-16 19:32 /user/hadoop/output 7 drwxr-xr-x - hadoop supergroup 0 2013-06-18 17:08 /user/hadoop/test 8 drwxr-xr-x - hadoop supergroup 0 2013-06-18 19:45 /user/hadoop/test1 9 -rw-r--r-- 3 hadoop supergroup 46 2013-06-19 20:09 /user/hadoop/text.gz
第4行为压缩之前的文件,大小为520个字节。第9行为压缩后的文件,大小为46个字节。由此可以看出上面讲的压缩的两大好处了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。