赞
踩
之前已经介绍了如何搭建CentOS虚拟机并且安装Hadoop,使用命令成功访问操作Hadoop的hdfs,接下来介绍如果使用java 代码操作Hadoop的hdfs.
1.CentOS7
2.Hadoop3.1.1
3.SpringBoot2.1.0
说明:因为后面设置了虚拟机固定IP为192.168.2.2 替换掉之前的地址即可,rpc访问hdfs地址增加了一层为/hadoop/hdfs/xx
1.使用IDEA新建一个SpringBoot项目,这个很简单不懂的可以看我之前发的文章或者上网搜一下就可以了,这个是我的项目。
2.pom添加hadoop所依赖的jar
在刚建的SpringBoot项目的pom.xml文件里添加hadoop的依赖包hadoop-common, hadoop-client, hadoop-hdfs
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>3.1.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>3.1.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.1.1</version>
- </dependency>
3.启动hadoop服务
怎么启动hadoop的hdfs服务可以看我之前文章 hdfs启动
首先我们进入linux的hadoop安装目录,然后进入sbin目录下,执行start-dfs.sh即可
- cd /usr/local/hadoop/hadoop-3.1.1
- /usr/local/hadoop/hadoop-3.1.1
启动后我们就可以访问下hdfs的文件管理系统,地址就是hdfs-site.xml中配置的地址,我的linux虚拟机IP是192.168.2.2,所以我的访问地址是 http://192.168.2.2:50070/explorer.html#/ 就可以看到我们的文件管理了。
对HDFS操作设计以下几个主要的类:
Configuration:封装了客户端或者服务器的配置信息
FileSystem:此类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作通过FileSystem的静态方法get获得该对象,例:FileSystem hdfs = FileSystem.get(conf);
FSDataInputStream:这是HDFS中的输入流,通过由FileSystem的open方法获取
FSDataOutputStream:这是HDFS中的输出流,通过由FileSystem的create方法获取
1.配置hadoop信息
我们在application.yml 里面配置hdfs的IP地址和用户名信息
- spring:
- application:
- name: hadoop
- server:
- port: 9000
-
-
- hdfs:
- path: hdfs://192.168.2.2:9000
- username: root
2.我们封装一个hadoop连接的工具类HadoopUtil用来访问hdfs服务器
HadoopUtil 工具类完整代码:
- /**
- * 类或方法的功能描述 :Hadoop工具类
- * @date: 2018-11-28 13:59
- */
- @Component
- public class HadoopUtil {
- @Value("${hdfs.path}")
- private String path;
- @Value("${hdfs.username}")
- private String username;
-
- private static String hdfsPath;
- private static String hdfsName;
-
- /**
- * 获取HDFS配置信息
- * @return
- */
- private static Configuration getConfiguration() {
-
- Configuration configuration = new Configuration();
- configuration.set("fs.defaultFS", hdfsPath);
- return configuration;
- }
-
- /**
- * 获取HDFS文件系统对象
- * @return
- * @throws Exception
- */
- public static FileSystem getFileSystem() throws Exception {
- // 客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份 DHADOOP_USER_NAME=hadoop
- // FileSystem hdfs = FileSystem.get(getHdfsConfig()); //默认获取
- // 也可以在构造客户端fs对象时,通过参数传递进去
- FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
- return fileSystem;
- }
-
- @PostConstruct
- public void getPath() {
- hdfsPath = this.path;
- }
- @PostConstruct
- public void getName() {
- hdfsName = this.username;
- }
-
- public static String getHdfsPath() {
- return hdfsPath;
- }
-
- public String getUsername() {
- return username;
- }
- }
客户端去操作HDFS时,是有一个用户身份的,默认情况下,HDFS客户端API会从JVM中获取一个参数来作为自己的用户身份:DHADOOP_USER_NAME=hadoop
FileSystem hdfs = FileSystem.get(getHdfsConfig()); //默认获取
也可以在构造客户端fs对象时,通过参数传递进去
FileSystem hdfs = FileSystem.get(new URI(rootPath), getHdfsConfig(), "你的用户名");
最终我们通过调用 HadoopUtil.getFileSystem() 就可以获取到hdfs的文件管理
3.创建文件夹
首先我们查看hdfs文件系统上面只有一个Java文件夹,现在我们来创建一个叫demo的文件夹
我们通过RPC的方式来操作hdfs,首先新建一个 HadoopController 类,创建一个 mkdir 方法用来新建文件夹
- /**
- * 类或方法的功能描述 :TODO
- * @date: 2018-11-28 13:51
- */
- @RestController
- @RequestMapping("/hadoop")
- public class HadoopController {
-
- /**
- * 创建文件夹
- * @param path
- * @return
- * @throws Exception
- */
- @PostMapping("/mkdir")
- public BaseReturnVO mkdir(@RequestParam("path") String path) throws Exception {
- if (StringUtils.isEmpty(path)) {
- return new BaseReturnVO("请求参数为空");
- }
- // 文件对象
- FileSystem fs = HadoopUtil.getFileSystem();
- // 目标路径
- Path newPath = new Path(path);
- // 创建空文件夹
- boolean isOk = fs.mkdirs(newPath);
- fs.close();
- if (isOk) {
- return new BaseReturnVO("create dir success");
- } else {
- return new BaseReturnVO("create dir fail");
- }
- }
- }
首先先通过我们封装的 HadoopUtil 跟hdfs文件系统建立连接,获得文件对象,然后根据入参 path 创建一个新的目标路径 newPath,之后调用文件对象的mkdir方法即可创建空文件夹,还是挺简单的。
使用Postman发送请求,参数为新建的目录路径 /demo
创建成功后我们去hdfs文件系统查看,发现有了demo这个文件夹了
4.创建文件
在HadoopController中新建一个createFile方法,在刚刚新建的demo文件夹中新建一个文件
- /**
- * 创建文件
- * @param path
- * @return
- * @throws Exception
- */
- @PostMapping("/createFile")
- public BaseReturnVO createFile(@RequestParam("path") String path, @RequestParam("file") MultipartFile file) throws Exception {
- if (StringUtils.isEmpty(path) || null == file.getBytes()) {
- return new BaseReturnVO("请求参数为空");
- }
- String fileName = file.getOriginalFilename();
- FileSystem fs = HadoopUtil.getFileSystem();
- // 上传时默认当前目录,后面自动拼接文件的目录
- Path newPath = new Path(path + "/" + fileName);
- // 打开一个输出流
- FSDataOutputStream outputStream = fs.create(newPath);
- outputStream.write(file.getBytes());
- outputStream.close();
- fs.close();
- return new BaseReturnVO("create file success");
- }
上传的时候我们把文件名拼接在newPath后面自动根据文件名称生成文件路径
上传成功了,我们接着多上传几个文件方便后面查询使用
5.读取文件
新建一个readFile方法用来读取我们上传的文件,我们读取1.txt 文件的路径是 /demo/1.txt
- /**
- * 读取HDFS文件内容
- * @param path
- * @return
- * @throws Exception
- */
- @PostMapping("/readFile")
- public BaseReturnVO readFile(@RequestParam("path") String path) throws Exception {
- FileSystem fs = HadoopUtil.getFileSystem();
- Path newPath = new Path(path);
- InputStream in = null;
- try {
- in = fs.open(newPath);
- // 复制到标准的输出流
- IOUtils.copyBytes(in, System.out, 4096);
- } finally {
- IOUtils.closeStream(in);
- fs.close();
- }
- return new BaseReturnVO("读取成功");
- }
我们把文件信息输出到了控制台了
6.读取目录信息
新建一个readPathInfo方法,读取/demo下面的信息
- /**
- * 读取HDFS目录信息
- * @param path
- * @return
- * @throws Exception
- */
- @PostMapping("/readPathInfo")
- public BaseReturnVO readPathInfo(@RequestParam("path") String path) throws Exception {
- FileSystem fs = HadoopUtil.getFileSystem();
- Path newPath = new Path(path);
- FileStatus[] statusList = fs.listStatus(newPath);
- List<Map<String, Object>> list = new ArrayList<>();
- if (null != statusList && statusList.length > 0) {
- for (FileStatus fileStatus : statusList) {
- Map<String, Object> map = new HashMap<>();
- map.put("filePath", fileStatus.getPath());
- map.put("fileStatus", fileStatus.toString());
- list.add(map);
- }
- return new BaseReturnVO(list);
- } else {
- return new BaseReturnVO("目录内容为空");
- }
- }
7.读取文件列表
新建一个方法listFile读取/demo下的所有文件
- /**
- * 读取文件列表
- * @param path
- * @return
- * @throws Exception
- */
- @PostMapping("/listFile")
- public BaseReturnVO listFile(@RequestParam("path") String path) throws Exception {
- if (StringUtils.isEmpty(path)) {
- return new BaseReturnVO("请求参数为空");
- }
- FileSystem fs = HadoopUtil.getFileSystem();
- Path newPath = new Path(path);
- // 递归找到所有文件
- RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(newPath, true);
- List<Map<String, String>> returnList = new ArrayList<>();
- while (filesList.hasNext()) {
- LocatedFileStatus next = filesList.next();
- String fileName = next.getPath().getName();
- Path filePath = next.getPath();
- Map<String, String> map = new HashMap<>();
- map.put("fileName", fileName);
- map.put("filePath", filePath.toString());
- returnList.add(map);
- }
- fs.close();
- return new BaseReturnVO(returnList);
- }
8.重命名文件
新建一个renameFile方法
- /**
- * 重命名文件
- * @param oldName
- * @param newName
- * @return
- * @throws Exception
- */
- @PostMapping("/renameFile")
- public BaseReturnVO renameFile(@RequestParam("oldName") String oldName, @RequestParam("newName") String newName) throws Exception {
- if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
- return new BaseReturnVO("请求参数为空");
- }
- FileSystem fs = HadoopUtil.getFileSystem();
- Path oldPath = new Path(oldName);
- Path newPath = new Path(newName);
- boolean isOk = fs.rename(oldPath, newPath);
- fs.close();
- if (isOk) {
- return new BaseReturnVO("rename file success");
- } else {
- return new BaseReturnVO("rename file fail");
- }
- }
9.删除文件
新建一个deleteFile 方法
- /**
- * 删除文件
- *
- * @param path
- * @return
- * @throws Exception
- */
- @PostMapping("/deleteFile")
- public BaseReturnVO deleteFile(@RequestParam("path") String path) throws Exception {
- FileSystem fs = HadoopUtil.getFileSystem();
- Path newPath = new Path(path);
- boolean isOk = fs.deleteOnExit(newPath);
- fs.close();
- if (isOk) {
- return new BaseReturnVO("delete file success");
- } else {
- return new BaseReturnVO("delete file fail");
- }
- }
10.上传本地文件到hdfs
新建一个uploadFile方法,把我本地D盘的hello.txt文件上传上去
- /**
- * 上传文件
- *
- * @param path
- * @param uploadPath
- * @return
- * @throws Exception
- */
- @PostMapping("/uploadFile")
- public BaseReturnVO uploadFile(@RequestParam("path") String path, @RequestParam("uploadPath") String uploadPath) throws Exception {
- FileSystem fs = HadoopUtil.getFileSystem();
- // 上传路径
- Path clientPath = new Path(path);
- // 目标路径
- Path serverPath = new Path(uploadPath);
-
- // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
- fs.copyFromLocalFile(false, clientPath, serverPath);
- fs.close();
- return new BaseReturnVO("upload file success");
- }
文件上传成功了,我们查看文件系统
11.下载hdfs文件到本地
新建一个download方法下载hdfs上的文件到本地的D盘的hdfs文件夹中
- /**
- * 下载文件
- * @param path
- * @param downloadPath
- * @return
- * @throws Exception
- */
- @PostMapping("/downloadFile")
- public BaseReturnVO downloadFile(@RequestParam("path") String path, @RequestParam("downloadPath") String downloadPath) throws Exception {
- FileSystem fs = HadoopUtil.getFileSystem();
- // 上传路径
- Path clientPath = new Path(path);
- // 目标路径
- Path serverPath = new Path(downloadPath);
-
- // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
- fs.copyToLocalFile(false, clientPath, serverPath);
- fs.close();
- return new BaseReturnVO("download file success");
- }
文件下载成功,我们打开本地的D盘
12.hdfs之间文件复制
新建一个copyFile方法把/java/test.txt 文件复制到/demo/test.txt下面
- /**
- * HDFS文件复制
- * @param sourcePath
- * @param targetPath
- * @return
- * @throws Exception
- */
- @PostMapping("/copyFile")
- public BaseReturnVO copyFile(@RequestParam("sourcePath") String sourcePath, @RequestParam("targetPath") String targetPath) throws Exception {
- FileSystem fs = HadoopUtil.getFileSystem();
- // 原始文件路径
- Path oldPath = new Path(sourcePath);
- // 目标路径
- Path newPath = new Path(targetPath);
-
- FSDataInputStream inputStream = null;
- FSDataOutputStream outputStream = null;
- try {
- inputStream = fs.open(oldPath);
- outputStream = fs.create(newPath);
-
- IOUtils.copyBytes(inputStream, outputStream, 1024*1024*64,false);
- return new BaseReturnVO("copy file success");
- } finally {
- inputStream.close();
- outputStream.close();
- fs.close();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。