赞
踩
之前我们对hdfs的操作主要是通过在linux命令行里进行的,而在实际的应用中,为了实现本地与HDFS 的文件传输,我们主要借助于eclipse的开发环境开发的javaAPI来实现对远程HDFS的文件创建,上传,下载和删除等操作
Hadoop中关于文件操作类基本上全部是在"org.apache.hadoop.fs"包中,Hadoop类库中最终面向用户提供的接口类是FileSystem,该类封装了几乎所有的文件操作,例如CopyToLocalFile、CopyFromLocalFile、mkdir及delete等。综上基本上可以得出操作文件的程序库框架:
operator( ) {
得到Configuration对象
得到FileSystem对象
进行文件操作
}
下面介绍实现上述程序库框架中各个操作的具体步骤
Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop的一个文件系统接口。该类是一个抽象类,通过以下两种静态工厂方法可以过去FileSystem实例:
public static FileSystem.get(Configuration conf) throws IOException
public static FileSystem.get(URI uri, Configuration conf) throws IOException
HDFS上的文件创建,上传,下载,删除等操作的具体方法实现:
(1)public boolean mkdirs(Path f) throws IOException
一次性新建所有目录(包括父目录), f是完整的目录路径。
(2)public FSOutputStream create(Path f) throws IOException
创建指定path对象的一个文件,返回一个用于写入数据的输出流
create()有多个重载版本,允许我们指定是否强制覆盖已有的文件、文件备份数量、写入文件缓冲区大小、文件块大小以及文件权限。
(3)public boolean copyFromLocal(Path src, Path dst) throws IOException
将本地文件拷贝到文件系统
(4)public boolean exists(Path f) throws IOException
检查文件或目录是否存在
(5)public boolean delete(Path f, Boolean recursive)
永久性删除指定的文件或目录,如果f是一个空目录或者文件,那么recursive的值就会被忽略。只有recursive=true时,一个非空目录及其内容才会被删除。
(6)FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及权限信息
废话说了一大堆,还是直接上操作过程吧,就是简单的用java实现一个小的功能好在各个平台上都能运行。
首先前提条件是你要先把hadoop搭好,然后把hadoopAPI插件装到你的eclipse下
参考之前博客里的操作。
然后,再来看下面一波操作。
首先你要启动hadoop
./start-all.sh
然后需要下载hadoop的依赖包hadoop2lib.tar.gz,解压备用
新建JAVA项目,名为hadoop4
然后新建包
在hadoop4项目下创建目录,名为hadoop4lib,用于存放项目所需依赖包。
从hadoop2lib目录下拷贝所有jar包到项目下的hadoop4lib目录,就像这样
然后add to build path, 按住shift键选中第一个jar,然后点击最后一个,全选,右击,选择buildPath=》Add to Build Path,这样环境就准备好了
还记得新建的包吗?对,就是my.hdfs,再包里写自己的API就好了。其实就是新建类
比如:在my.hdfs包下,新建类MakeDir,程序功能是在HDFS的根目录下,创建名为hdfstest的目录。
package my.hdfs; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class MakeDir { public static void main(String[] args) throws IOException, URISyntaxException { Configuration conf = new Configuration(); String hdfsPath = "hdfs://localhost:9000"; FileSystem hdfs = FileSystem.get(new URI(hdfsPath), conf); String newDir = "/hdfstest"; boolean result = hdfs.mkdirs(new Path(newDir)); if (result) { System.out.println("Success!"); }else { System.out.println("Failed!"); } } }
新建类TouchFile,程序功能是在HDFS的目录/hdfstest下,创建名为touchfile的文件。
package my.hdfs; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class TouchFile { public static void main(String[] args) throws IOException, URISyntaxException { Configuration configuration = new Configuration(); String hdfsPath = "hdfs://localhost:9000"; FileSystem hdfs = FileSystem.get(new URI(hdfsPath), configuration); String filePath = "/hdfstest/touchfile"; FSDataOutputStream create = hdfs.create(new Path(filePath)); System.out.println("Finish!"); } }
创建类CopyFromLocalFile.class,程序功能是将本地linux操作系统上的文件/data/hadoop4/sample_data,上传到HDFS文件系统的/hdfstest目录下。
package my.hdfs; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class CopyFromLocalFile { public static void main(String[] args) throws IOException, URISyntaxException { Configuration conf = new Configuration(); String hdfsPath = "hdfs://localhost:9000"; FileSystem hdfs = FileSystem.get(new URI(hdfsPath), conf); String from_Linux = "/data/hadoop4/sample_data"; String to_HDFS = "/hdfstest/"; hdfs.copyFromLocalFile(new Path(from_Linux), new Path(to_HDFS)); System.out.println("Finish!"); } }
创建类CopyToLocalFile.class,程序功能是将HDFS文件系统上的文件/hdfstest/sample_data,下载到本地/data/hadoop4/copytolocal 。
package my.hdfs; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class CopyToLocalFile { public static void main(String[] args) throws IOException, URISyntaxException { Configuration conf = new Configuration(); String hdfsPath = "hdfs://localhost:9000"; FileSystem hdfs = FileSystem.get(new URI(hdfsPath), conf); String from_HDFS = "/hdfstest/sample_data"; String to_Linux = "/data/hadoop4/copytolocal"; hdfs.copyToLocalFile(false, new Path(from_HDFS), new Path(to_Linux)); System.out.println("Finish!"); } }
新建类ListFiles,程序功能是列出HDFS文件系统/hdfstest目录下,所有的文件,以及文件的权限、用户组、所属用户。
package my.hdfs; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class IteratorListFiles { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); String hdfspath = "hdfs://localhost:9000/"; FileSystem hdfs = FileSystem.get(URI.create(hdfspath), conf); String watchHDFS = "/"; iteratorListFile(hdfs, new Path(watchHDFS)); } public static void iteratorListFile(FileSystem hdfs, Path path) throws FileNotFoundException, IOException { FileStatus[] files = hdfs.listStatus(path); for (FileStatus file : files) { if (file.isDirectory()) { System.out.println(file.getPermission() + " " + file.getOwner() + " " + file.getGroup() + " " + file.getPath()); iteratorListFile(hdfs, file.getPath()); } else if (file.isFile()) { System.out.println(file.getPermission() + " " + file.getOwner() + " " + file.getGroup() + " " + file.getPath()); } } }
}
新建类LocateFile,程序功能是查看HDFS文件系统上,文件/hdfstest/sample_data的文件块信息。
package my.hdfs; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class LocateFile { public static void main(String[] args) throws IOException, URISyntaxException { Configuration conf = new Configuration(); String hdfsPath = "hdfs://localhost:9000"; FileSystem hdfs = FileSystem.get(new URI(hdfsPath), conf); Path file = new Path("/hdfstest/sample_data"); FileStatus fileStatus = hdfs.getFileStatus(file); BlockLocation[] location = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); for (BlockLocation block : location) { String[] hosts = block.getHosts(); for (String host : hosts) { System.out.println("block:" +block + " host:"+ host); } } }
}
新建类WriteFile,程序功能是在HDFS上,创建/hdfstest/writefile文件并在文件中写入内容“hello world hello data!”。
package my.hdfs; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class WriteFile { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); String hdfsPath = "hdfs://localhost:9000"; FileSystem hdfs = FileSystem.get(URI.create(hdfsPath), conf); String filePath = "/hdfstest/writefile"; FSDataOutputStream create = hdfs.create(new Path(filePath)); System.out.println("Step 1 Finish!"); String sayHi = "hello world hello data!"; byte[] buff = sayHi.getBytes(); create.write(buff, 0, buff.length); create.close(); System.out.println("Step 2 Finish!"); } }
在Eclipse里执行,然后在HDFS上查看实验结果。
hadoop fs -lsr /hdfstest
hadoop fs -cat /hdfstest/writefile
首先切换到/data/hadoop4目录下,将该目录下的所有文件删除(此时要求/data/hadoop4中必须全是文件,不能有目录)。
cd /data/hadoop4
rm -r /data/hadoop4/*
然后在该目录下新建两文件,分别命名为file1 ,file2。
touch file1
touch file2
向file1和file2中,分别输入内容如下
echo "hello file1" > file1
echo "hello file2" > file2
在my.hdfs包下,新建类PutMerge,程序功能是将Linux本地文件夹/data/hadoop4/下的所有文件,上传到HDFS上并合并成一个文件/hdfstest/mergefile。
package my.hdfs; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class PutMerge { public static void main(String[] args) throws IOException, URISyntaxException { Configuration conf = new Configuration(); String hdfsPath = "hdfs://localhost:9000"; FileSystem hdfs = FileSystem.get(new URI(hdfsPath), conf); FileSystem local = FileSystem.getLocal(conf); String from_LinuxDir = "/data/hadoop4/"; String to_HDFS = "/hdfstest/mergefile"; FileStatus[] inputFiles = local.listStatus(new Path(from_LinuxDir)); FSDataOutputStream out = hdfs.create(new Path(to_HDFS)); for (FileStatus file : inputFiles) { FSDataInputStream in = local.open(file.getPath()); byte[] buffer = new byte[256]; int bytesRead = 0; while ( (bytesRead = in.read(buffer) ) > 0) { out.write(buffer, 0, bytesRead); } in.close(); } System.out.println("Finish!"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。