当前位置:   article > 正文

HDFS JAVA API

hdfs java api

Hadoop主要使用JAVA语言编写实现的,Hadoop不同的文件系统之间通过调用JAVA API进行交互。HDFS的命令行本质上就是JAVA API的应用。

常用 JAVA API 介绍

org.apache.hadoop.fs.FileSystem:一个通用文件系统的抽象基类,可以被分布式文件系统继承。所有可能使用Hadoop文件系统的代码都要使用到这个类。

org.apache.hadoop.fs.FileStatus:一个接口,用于向客户端展示系统中文件和目录的元数据,具体包括文件大小、块大小、副本信息、所有者、修改时间等,可通过FileSystem.listStatus()方法获得具体的实例对象。

org.apache.hadoop.fs.FSDataInputStream:文件输入流,用于读取Hadoop文件。

org.apache.hadoop.fs.FSDataOutputStream:文件输出流,用于写Hadoop文件。

org.apache.hadoop.conf.Configuration:访问配置项。所有的配置项的值,如果在core-site.xml中有对应的配置,则以core-site.xml为准。

org.apache.hadoop.fs.Path:用于表示Hadoop文件系统中的一个文件或者一个目录的路径。

org.apache.hadoop.fs.PathFilter:一个接口,通过实现PathFilter.accept(Path path)来判定是否接收路径path表示的文件或目录。

常用 JAVA API 操作

首先需要导入相关的依赖

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>3.1.3</version> # hadoop版本
  5. </dependency>

查询HDFS文件内容并输出

  1. public static void FileSystemcat(String path) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000"); //设置HDFS访问地址
  4. FileSystem fs = FileSystem.get(conf); //取得FileSystem文件系统实例
  5. InputStream in = fs.open(new Path(path)); //打开文件输入流
  6. IOUtils.copyBytes(in, System.out, 4096, false); //输出文件内容
  7. IOUtils.closeStream(in); //关闭输入流
  8. fs.close();
  9. }

说明:

在运行HDFS程序之前,需要先初始化Configuration对象,该对象的主要作用是读取HDFS 系统配置信息,也就是安装Hadoop时候的配置文件,例如core-site.xml,hdfs-site.xml等文件。FileSystem是一个普通的文件系统API,可以使用静态工厂方法取得FileSystem实例,并传入Configuration对象参数。通过调用FileSystem对象的open()方法,取得文件的输入流。该方法实际上返回的是一个FSDataInputStream对象,而不是标准的java.io类对象。FSDataInputStream类是继承了java.io.DataInputStream类的一个特殊类,支持随机访问,因此可以从流的任意位置读取数据。FSDataInputStream类的主要作用是使用DataInputStream包装一个输入流,并且使用BufferedInputStream实现对输入的缓冲。

创建目录

  1. public static void createDir(String path) throws IOException {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000");
  4. FileSystem fs = FileSystem.get(conf);
  5. boolean isok = hdfs.mkdirs(new Path(path)); //创建目录
  6. if (isok) {
  7. System.out.println("创建目录成功!");
  8. } else {
  9. System.out.println("创建目录失败!");
  10. }
  11. fs.close();
  12. }

说明:

使用FileSystem的创建目录方法mkdirs(),可以创建未存在的父目录,就像java.io.File的mkdirs()方法一样,如果目录创建成功,则返回true,失败则返回false。

创建文件

  1. public static void createFile(String path,String w) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000");
  4. FileSystem fs = FileSystem.get(conf);
  5. FSDataOutputStream outputStream = fs.create(new Path(path)); //打开一个输出流
  6. outputStream.write(w.getBytes()); //写入文件内容
  7. outputStream.close();
  8. fs.close();
  9. System.out.println("文件创建成功!");
  10. }

说明:

FileSystem实例的create()方法返回一个文件输出流对象FSDataOutputStream,该类继承了java.io.DataOutputStream类。与FSDataOutputStream不同的是,FSDataOutputStream类不支持随机访问,因此不能从文件的任意位置写入数据,只能从文件末尾追加数据。create()方法有多个重载方法,允许指定是否强制覆盖已有文件(默认覆盖)、文件副本数量、写入文件的缓冲大小、文件块大小、文件权限许可等。在调用create()方法时,还可以传入一个Progressable对象,这是一个接口,其中定义了一个progress()回调方法,使用该方法可以得知数据被写入数据节点的进度。代码可以改为

  1. public static void createFile(String path,String w) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000");
  4. FileSystem fs = FileSystem.get(conf);
  5. InputStream in=new BufferedInputStream(new FileInputStream(w));
  6. FSDataOutputStream outputStream = fs.create(new Path(path)),
  7. new Progressable(){
  8. @Override
  9. public void progress(){
  10. System.out.println(".");
  11. }
  12. });
  13. IOUtils.copyBytes(in,outputStream ,4096,false);
  14. fs.close();
  15. System.out.println("文件创建成功!");
  16. }

则可以通过在控制台打印“.”来显示上传创建进度。

删除文件

  1. public static void deleteFile(String path) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000");
  4. FileSystem fs = FileSystem.get(conf);
  5. boolean isok = fs.deleteOnExit(new Path(path)); //删除文件
  6. if (isok) {
  7. System.out.println("删除成功!");
  8. } else {
  9. System.out.println("删除失败!");
  10. }
  11. fs.close();
  12. }

说明:

使用FileSystem的deleteOnExit()方法,可以对HDFS文件系统中已经存在的文件进行删除。

遍历文件和目录

  1. public static void listStatus(String path) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000");
  4. FileSystem hdfs;= FileSystem.get(conf);
  5. FileStatus[] fs = hdfs.listStatus(new Path(path)); //遍历HDFS上的文件和目录
  6. if (fs.length > 0) {
  7. for (FileStatus f : fs) {
  8. showDir(f, hdfs);
  9. }
  10. }
  11. }
  12. private static void showDir(FileStatus f, FileSystem hdfs) throws Exception {
  13. Path path = f.getPath();
  14. System.out.println(path); //输出文件或目录的路径
  15. if (f.isDirectory()) {
  16. FileStatus[] file = hdfs.listStatus(path); //如果是目录,则递归遍历该目录下的所有子目录或文件
  17. if (file.length > 0) {
  18. for (FileStatus files : file) {
  19. showDir(files, hdfs);
  20. }
  21. } else {
  22. System.out.println("当前并未创建目录或文件");
  23. }
  24. }
  25. }

说明:

通过调用FileSystem的listStatus()方法获得指定路径下的一级子目录及文件,并将结果存储于FileStatus类型的数组中,然后循环遍历该数组,当遇到目录时,再次调用listStatus()方法取得该目录下的所有子目录及文件,从而能够递归取得指定路径下的所有目录及文件。

查看文件元数据

  1. public static void fileStatusCat(String path) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000");
  4. FileSystem fs = FileSystem.get(conf);
  5. FileStatus fileStatus = fs.getFileStatus(new Path(path));
  6. if (fileStatus.isDirectory()) {
  7. System.out.println("文件夹详细信息:");
  8. } else {
  9. System.out.println("文件详细信息:");
  10. }
  11. // 输出元数据信息
  12. System.out.println("文件路径:" + fileStatus.getPath());
  13. System.out.println("文件修改日期:" + new Timestamp(fileStatus.getModificationTime()).toString());
  14. System.out.println("文件上次访问日期:" + new Timestamp(fileStatus.getAccessTime()).toString());
  15. System.out.println("文件长度:" + fileStatus.getLen());
  16. System.out.println("文件路径:" + fileStatus.getPath());
  17. System.out.println("文件备份数:" + fileStatus.getReplication());
  18. System.out.println("文件块大小:" + fileStatus.getBlockSize());
  19. System.out.println("文件所有者:" + fileStatus.getOwner());
  20. System.out.println("文件所在分组:" + fileStatus.getGroup());
  21. System.out.println("文件的权限:" + fileStatus.getPermission().toString());
  22. }

说明:

使用FileSystem的getFileStatus()方法,可以获得HDFS文件系统中的文件或目录的元数据信息,包括文件路径、文件修改日期、文件上次访问日期、文件长度、文件备份数、文件大小等。getFileStatus()方法返回一个FileStatus对象,元数据信息则封装在了该对象中。

从本地上传文件

  1. public static void copyFromLocal(String hPath, String localPath) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000");
  4. FileSystem fs = FileSystem.get(conf);
  5. Path src = new Path(localPath); //本地目录/文件
  6. Path dst = new Path(hPath); //HDFS目录/文件
  7. fs.copyFromLocalFile(src, dst); //复制上传本地文件至HDFS文件系统中
  8. System.out.println("文件上传成功!");
  9. fs.close();
  10. }

说明:

使用FileSystem的copyFromLocalFile()方法,可以将操作系统本地的文件上传到HDFS文件系统中,该方法需要传入两个Path类型的参数,分别代表本地目录/文件和HDFS目录/文件。

从HDFS下载文件

  1. public static void downloadToLocal(String path, String localPath) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000");
  4. FileSystem fs = FileSystem.get(conf);
  5. Path src = new Path(path); //HDFS目录/文件
  6. Path dst = new Path(localPath); //本地目录/文件
  7. fs.copyToLocalFile(false, src, dst); //从HDFS文件系统中复制下载文件至本地
  8. System.out.println("文件下载成功!");
  9. fs.close();
  10. }

说明:

使用FileSystem的copyToLocalFile()方法,可以将HDFS文件系统中的文件下载到操作系统本地,该方法需要传入两个Path类型的参数,分别代表本地目录/文件和HDFS目录/文件。

检测目录是否存在

  1. public static Boolean isExist(String path) throws Exception {
  2. try {
  3. Configuration conf = new Configuration();
  4. conf.set("fs.default.name", "hdfs://centos01:9000");
  5. FileSystem fs = FileSystem.get(conf);
  6. FileStatus fileStatus = fs.getFileStatus(new Path(path));
  7. if (fileStatus.isDirectory()) { //判断文件是否存在
  8. return true;
  9. }
  10. return false;
  11. } catch (Exception e) {
  12. return false;
  13. }
  14. }

说明:

使用FileSystem的getFileStatus()方法,可以对该方法返回的类的isDirectory()方法判断文件是否存在。

追加文件内容

  1. public static void appendFile(String w,String path) throws Exception {
  2. Configuration conf = new Configuration();
  3. conf.set("fs.default.name", "hdfs://centos01:9000");
  4. conf.setBoolean("dfs.support.append", true); //配置可追加
  5. conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure", true);
  6. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
  7. FileSystem fs = FileSystem.get(conf);
  8. FSDataOutputStream outputStream = fs.append(new Path(path)); //要追加的文件路径
  9. outputStream.write(w.getBytes(StandardCharsets.UTF_8)); //要追加的文件内容
  10. outputStream.close();
  11. fs.close();
  12. }

说明:

dfs.support.append:设置允许文件追加内容。

dfs.client.block.write.replace-datanode-on-failure:如果在写入管道中存在一个DataNode或者网络故障时,那么DFSClient将尝试从管道中删除失败的DataNode,然后继续尝试剩下的DataNodes进行写入。

dfs.client.block.write.replace-datanode-on-failure.policy:设置NEVER为永远不添加新的DataNode。

使用FileSystem的append()方法,提供要追加的文件路径,返回FSDataOutputStream类型,就可以使用FSDataOutputStream 追加文件内容。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/740519
推荐阅读
相关标签
  

闽ICP备14008679号