当前位置:   article > 正文

(1)Hadoop核心 -- java代码对HDFS的操作_fs.listfiles(new path("/"), true);

fs.listfiles(new path("/"), true);

之前已经介绍了如何搭建CentOS虚拟机并且安装Hadoop,使用命令成功访问操作Hadoop的hdfs,接下来介绍如果使用java 代码操作Hadoop的hdfs.

一、环境准备

1.CentOS7

2.Hadoop3.1.1

3.SpringBoot2.1.0

代码地址:springboot集成hadoop项目代码

二、开发准备

说明:因为后面设置了虚拟机固定IP为192.168.2.2 替换掉之前的地址即可,rpc访问hdfs地址增加了一层为/hadoop/hdfs/xx

1.使用IDEA新建一个SpringBoot项目,这个很简单不懂的可以看我之前发的文章或者上网搜一下就可以了,这个是我的项目。

2.pom添加hadoop所依赖的jar

在刚建的SpringBoot项目的pom.xml文件里添加hadoop的依赖包hadoop-common, hadoop-client, hadoop-hdfs

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-common</artifactId>
  4. <version>3.1.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hadoop</groupId>
  8. <artifactId>hadoop-hdfs</artifactId>
  9. <version>3.1.1</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.hadoop</groupId>
  13. <artifactId>hadoop-client</artifactId>
  14. <version>3.1.1</version>
  15. </dependency>

 3.启动hadoop服务

怎么启动hadoop的hdfs服务可以看我之前文章 hdfs启动 

首先我们进入linux的hadoop安装目录,然后进入sbin目录下,执行start-dfs.sh即可

  1. cd /usr/local/hadoop/hadoop-3.1.1
  2. /usr/local/hadoop/hadoop-3.1.1

启动后我们就可以访问下hdfs的文件管理系统,地址就是hdfs-site.xml中配置的地址,我的linux虚拟机IP是192.168.2.2,所以我的访问地址是 http://192.168.2.2:50070/explorer.html#/  就可以看到我们的文件管理了。

三、开始编码

对HDFS操作设计以下几个主要的类:

Configuration:封装了客户端或者服务器的配置信息

FileSystem:此类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作通过FileSystem的静态方法get获得该对象,例:FileSystem hdfs = FileSystem.get(conf);

FSDataInputStream:这是HDFS中的输入流,通过由FileSystem的open方法获取

FSDataOutputStream:这是HDFS中的输出流,通过由FileSystem的create方法获取

1.配置hadoop信息

我们在application.yml 里面配置hdfs的IP地址和用户名信息

  1. spring:
  2. application:
  3. name: hadoop
  4. server:
  5. port: 9000
  6. hdfs:
  7. path: hdfs://192.168.2.2:9000
  8. username: root

 2.我们封装一个hadoop连接的工具类HadoopUtil用来访问hdfs服务器

  HadoopUtil 工具类完整代码:

  1. /**
  2. * 类或方法的功能描述 :Hadoop工具类
  3. * @date: 2018-11-28 13:59
  4. */
  5. @Component
  6. public class HadoopUtil {
  7. @Value("${hdfs.path}")
  8. private String path;
  9. @Value("${hdfs.username}")
  10. private String username;
  11. private static String hdfsPath;
  12. private static String hdfsName;
  13. /**
  14. * 获取HDFS配置信息
  15. * @return
  16. */
  17. private static Configuration getConfiguration() {
  18. Configuration configuration = new Configuration();
  19. configuration.set("fs.defaultFS", hdfsPath);
  20. return configuration;
  21. }
  22. /**
  23. * 获取HDFS文件系统对象
  24. * @return
  25. * @throws Exception
  26. */
  27. public static FileSystem getFileSystem() throws Exception {
  28. // 客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份 DHADOOP_USER_NAME=hadoop
  29. // FileSystem hdfs = FileSystem.get(getHdfsConfig()); //默认获取
  30. // 也可以在构造客户端fs对象时,通过参数传递进去
  31. FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
  32. return fileSystem;
  33. }
  34. @PostConstruct
  35. public void getPath() {
  36. hdfsPath = this.path;
  37. }
  38. @PostConstruct
  39. public void getName() {
  40. hdfsName = this.username;
  41. }
  42. public static String getHdfsPath() {
  43. return hdfsPath;
  44. }
  45. public String getUsername() {
  46. return username;
  47. }
  48. }

客户端去操作HDFS时,是有一个用户身份的,默认情况下,HDFS客户端API会从JVM中获取一个参数来作为自己的用户身份:DHADOOP_USER_NAME=hadoop
FileSystem hdfs = FileSystem.get(getHdfsConfig()); //默认获取
也可以在构造客户端fs对象时,通过参数传递进去
FileSystem hdfs = FileSystem.get(new URI(rootPath), getHdfsConfig(), "你的用户名");

最终我们通过调用 HadoopUtil.getFileSystem() 就可以获取到hdfs的文件管理

3.创建文件夹

首先我们查看hdfs文件系统上面只有一个Java文件夹,现在我们来创建一个叫demo的文件夹

我们通过RPC的方式来操作hdfs,首先新建一个 HadoopController 类,创建一个 mkdir 方法用来新建文件夹

  1. /**
  2. * 类或方法的功能描述 :TODO
  3. * @date: 2018-11-28 13:51
  4. */
  5. @RestController
  6. @RequestMapping("/hadoop")
  7. public class HadoopController {
  8. /**
  9. * 创建文件夹
  10. * @param path
  11. * @return
  12. * @throws Exception
  13. */
  14. @PostMapping("/mkdir")
  15. public BaseReturnVO mkdir(@RequestParam("path") String path) throws Exception {
  16. if (StringUtils.isEmpty(path)) {
  17. return new BaseReturnVO("请求参数为空");
  18. }
  19. // 文件对象
  20. FileSystem fs = HadoopUtil.getFileSystem();
  21. // 目标路径
  22. Path newPath = new Path(path);
  23. // 创建空文件夹
  24. boolean isOk = fs.mkdirs(newPath);
  25. fs.close();
  26. if (isOk) {
  27. return new BaseReturnVO("create dir success");
  28. } else {
  29. return new BaseReturnVO("create dir fail");
  30. }
  31. }
  32. }

首先先通过我们封装的 HadoopUtil  跟hdfs文件系统建立连接,获得文件对象,然后根据入参 path 创建一个新的目标路径 newPath,之后调用文件对象的mkdir方法即可创建空文件夹,还是挺简单的。

使用Postman发送请求,参数为新建的目录路径 /demo

 

创建成功后我们去hdfs文件系统查看,发现有了demo这个文件夹了

4.创建文件

在HadoopController中新建一个createFile方法,在刚刚新建的demo文件夹中新建一个文件

  1. /**
  2. * 创建文件
  3. * @param path
  4. * @return
  5. * @throws Exception
  6. */
  7. @PostMapping("/createFile")
  8. public BaseReturnVO createFile(@RequestParam("path") String path, @RequestParam("file") MultipartFile file) throws Exception {
  9. if (StringUtils.isEmpty(path) || null == file.getBytes()) {
  10. return new BaseReturnVO("请求参数为空");
  11. }
  12. String fileName = file.getOriginalFilename();
  13. FileSystem fs = HadoopUtil.getFileSystem();
  14. // 上传时默认当前目录,后面自动拼接文件的目录
  15. Path newPath = new Path(path + "/" + fileName);
  16. // 打开一个输出流
  17. FSDataOutputStream outputStream = fs.create(newPath);
  18. outputStream.write(file.getBytes());
  19. outputStream.close();
  20. fs.close();
  21. return new BaseReturnVO("create file success");
  22. }

上传的时候我们把文件名拼接在newPath后面自动根据文件名称生成文件路径

上传成功了,我们接着多上传几个文件方便后面查询使用

5.读取文件

新建一个readFile方法用来读取我们上传的文件,我们读取1.txt 文件的路径是 /demo/1.txt

  1. /**
  2. * 读取HDFS文件内容
  3. * @param path
  4. * @return
  5. * @throws Exception
  6. */
  7. @PostMapping("/readFile")
  8. public BaseReturnVO readFile(@RequestParam("path") String path) throws Exception {
  9. FileSystem fs = HadoopUtil.getFileSystem();
  10. Path newPath = new Path(path);
  11. InputStream in = null;
  12. try {
  13. in = fs.open(newPath);
  14. // 复制到标准的输出流
  15. IOUtils.copyBytes(in, System.out, 4096);
  16. } finally {
  17. IOUtils.closeStream(in);
  18. fs.close();
  19. }
  20. return new BaseReturnVO("读取成功");
  21. }

我们把文件信息输出到了控制台了

6.读取目录信息

新建一个readPathInfo方法,读取/demo下面的信息

  1. /**
  2. * 读取HDFS目录信息
  3. * @param path
  4. * @return
  5. * @throws Exception
  6. */
  7. @PostMapping("/readPathInfo")
  8. public BaseReturnVO readPathInfo(@RequestParam("path") String path) throws Exception {
  9. FileSystem fs = HadoopUtil.getFileSystem();
  10. Path newPath = new Path(path);
  11. FileStatus[] statusList = fs.listStatus(newPath);
  12. List<Map<String, Object>> list = new ArrayList<>();
  13. if (null != statusList && statusList.length > 0) {
  14. for (FileStatus fileStatus : statusList) {
  15. Map<String, Object> map = new HashMap<>();
  16. map.put("filePath", fileStatus.getPath());
  17. map.put("fileStatus", fileStatus.toString());
  18. list.add(map);
  19. }
  20. return new BaseReturnVO(list);
  21. } else {
  22. return new BaseReturnVO("目录内容为空");
  23. }
  24. }

7.读取文件列表

新建一个方法listFile读取/demo下的所有文件

  1. /**
  2. * 读取文件列表
  3. * @param path
  4. * @return
  5. * @throws Exception
  6. */
  7. @PostMapping("/listFile")
  8. public BaseReturnVO listFile(@RequestParam("path") String path) throws Exception {
  9. if (StringUtils.isEmpty(path)) {
  10. return new BaseReturnVO("请求参数为空");
  11. }
  12. FileSystem fs = HadoopUtil.getFileSystem();
  13. Path newPath = new Path(path);
  14. // 递归找到所有文件
  15. RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(newPath, true);
  16. List<Map<String, String>> returnList = new ArrayList<>();
  17. while (filesList.hasNext()) {
  18. LocatedFileStatus next = filesList.next();
  19. String fileName = next.getPath().getName();
  20. Path filePath = next.getPath();
  21. Map<String, String> map = new HashMap<>();
  22. map.put("fileName", fileName);
  23. map.put("filePath", filePath.toString());
  24. returnList.add(map);
  25. }
  26. fs.close();
  27. return new BaseReturnVO(returnList);
  28. }

8.重命名文件 

新建一个renameFile方法

  1. /**
  2. * 重命名文件
  3. * @param oldName
  4. * @param newName
  5. * @return
  6. * @throws Exception
  7. */
  8. @PostMapping("/renameFile")
  9. public BaseReturnVO renameFile(@RequestParam("oldName") String oldName, @RequestParam("newName") String newName) throws Exception {
  10. if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
  11. return new BaseReturnVO("请求参数为空");
  12. }
  13. FileSystem fs = HadoopUtil.getFileSystem();
  14. Path oldPath = new Path(oldName);
  15. Path newPath = new Path(newName);
  16. boolean isOk = fs.rename(oldPath, newPath);
  17. fs.close();
  18. if (isOk) {
  19. return new BaseReturnVO("rename file success");
  20. } else {
  21. return new BaseReturnVO("rename file fail");
  22. }
  23. }

9.删除文件 

 新建一个deleteFile 方法

  1. /**
  2. * 删除文件
  3. *
  4. * @param path
  5. * @return
  6. * @throws Exception
  7. */
  8. @PostMapping("/deleteFile")
  9. public BaseReturnVO deleteFile(@RequestParam("path") String path) throws Exception {
  10. FileSystem fs = HadoopUtil.getFileSystem();
  11. Path newPath = new Path(path);
  12. boolean isOk = fs.deleteOnExit(newPath);
  13. fs.close();
  14. if (isOk) {
  15. return new BaseReturnVO("delete file success");
  16. } else {
  17. return new BaseReturnVO("delete file fail");
  18. }
  19. }

10.上传本地文件到hdfs

新建一个uploadFile方法,把我本地D盘的hello.txt文件上传上去

  1. /**
  2. * 上传文件
  3. *
  4. * @param path
  5. * @param uploadPath
  6. * @return
  7. * @throws Exception
  8. */
  9. @PostMapping("/uploadFile")
  10. public BaseReturnVO uploadFile(@RequestParam("path") String path, @RequestParam("uploadPath") String uploadPath) throws Exception {
  11. FileSystem fs = HadoopUtil.getFileSystem();
  12. // 上传路径
  13. Path clientPath = new Path(path);
  14. // 目标路径
  15. Path serverPath = new Path(uploadPath);
  16. // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
  17. fs.copyFromLocalFile(false, clientPath, serverPath);
  18. fs.close();
  19. return new BaseReturnVO("upload file success");
  20. }

文件上传成功了,我们查看文件系统

11.下载hdfs文件到本地

新建一个download方法下载hdfs上的文件到本地的D盘的hdfs文件夹中

  1. /**
  2. * 下载文件
  3. * @param path
  4. * @param downloadPath
  5. * @return
  6. * @throws Exception
  7. */
  8. @PostMapping("/downloadFile")
  9. public BaseReturnVO downloadFile(@RequestParam("path") String path, @RequestParam("downloadPath") String downloadPath) throws Exception {
  10. FileSystem fs = HadoopUtil.getFileSystem();
  11. // 上传路径
  12. Path clientPath = new Path(path);
  13. // 目标路径
  14. Path serverPath = new Path(downloadPath);
  15. // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
  16. fs.copyToLocalFile(false, clientPath, serverPath);
  17. fs.close();
  18. return new BaseReturnVO("download file success");
  19. }

文件下载成功,我们打开本地的D盘

12.hdfs之间文件复制

新建一个copyFile方法把/java/test.txt 文件复制到/demo/test.txt下面

  1. /**
  2. * HDFS文件复制
  3. * @param sourcePath
  4. * @param targetPath
  5. * @return
  6. * @throws Exception
  7. */
  8. @PostMapping("/copyFile")
  9. public BaseReturnVO copyFile(@RequestParam("sourcePath") String sourcePath, @RequestParam("targetPath") String targetPath) throws Exception {
  10. FileSystem fs = HadoopUtil.getFileSystem();
  11. // 原始文件路径
  12. Path oldPath = new Path(sourcePath);
  13. // 目标路径
  14. Path newPath = new Path(targetPath);
  15. FSDataInputStream inputStream = null;
  16. FSDataOutputStream outputStream = null;
  17. try {
  18. inputStream = fs.open(oldPath);
  19. outputStream = fs.create(newPath);
  20. IOUtils.copyBytes(inputStream, outputStream, 1024*1024*64,false);
  21. return new BaseReturnVO("copy file success");
  22. } finally {
  23. inputStream.close();
  24. outputStream.close();
  25. fs.close();
  26. }
  27. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/530171
推荐阅读
相关标签
  

闽ICP备14008679号