当前位置:   article > 正文

Hadoop大数据技术-java操作hdfs_java 操作hdfs

java 操作hdfs

一、实验目的:

1、使用java完成hdfs的相关操作

二、实验要求:

1、正确使用java对hdfs进行相关操作,解决以下问题

三、实验设备:

Windows电脑一台、VMware虚拟机、WindTerm终端模拟器、IntelliJ IDEA

四、实验过程及结果:

一、创建文件

1、代码

  1. //创建文件
  2. @Test
  3. public static void testMkdirFile() throws URISyntaxException, IOException, InterruptedException {
  4. //连接的集群地址
  5. URI uri = new URI("hdfs://hadoop100:8020");
  6. //创建一个配置信息
  7. Configuration configuration = new Configuration();
  8. //用户
  9. String user = "hadoop";
  10. //获取到了客户端对象
  11. FileSystem fs = FileSystem.get(uri, configuration, user);
  12. //创建一个文件夹
  13. fs.mkdirs(new Path("/hadoop/wenjian"));
  14. //关闭资源
  15. fs.close();
  16. }

2、结果

二、删除文件

1、代码

  1. //删除文件
  2. @Test
  3. public static void testDeleteFile() throws Exception {
  4. //连接的集群地址
  5. URI uri = new URI("hdfs://hadoop100:8020");
  6. //创建一个配置信息
  7. Configuration configuration = new Configuration();
  8. //用户
  9. String user = "hadoop";
  10. //获取到了客户端对象
  11. FileSystem fs = FileSystem.get(uri, configuration, user);
  12. //删除一个文件,b:代表是否递归,true会删除指定文件夹及内容,false删除指定文件
  13. fs.delete(new Path("/hadoop/wenjian"), false);
  14. //关闭资源
  15. fs.close();
  16. }

2、结果

三、列出根目录下所有文件

1、代码

  1. //列出目录下文件
  2. @Test
  3. public static void testListFile() throws IOException, URISyntaxException, InterruptedException {
  4. //连接的集群地址
  5. URI uri = new URI("hdfs://hadoop100:8020");
  6. //创建一个配置信息
  7. Configuration configuration = new Configuration();
  8. //用户
  9. String user = "hadoop";
  10. //获取到了客户端对象
  11. FileSystem fs = FileSystem.get(uri, configuration, user);
  12. //获取目录状态
  13. FileStatus[] listStatus = fs.listStatus(new Path("/"));
  14. for (FileStatus list : listStatus) {//增强for循环遍历listStatus
  15. System.out.println(list.getPath().getName());
  16. }
  17. RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
  18. //递归列出该目录下所有文件,不包括文件夹,后面的布尔值为是否递归
  19. while (listFiles.hasNext()) {//如果listfiles里还有东西
  20. LocatedFileStatus next = listFiles.next();//得到下一个并pop出listFiles
  21. System.out.println(next.getPath().getName());//输出
  22. }
  23. }
  24. //列出目录下文件信息
  25. @Test
  26. public static void testDetailFile() throws URISyntaxException, IOException, InterruptedException {
  27. //连接的集群地址
  28. URI uri = new URI("hdfs://hadoop100:8020");
  29. //创建一个配置信息
  30. Configuration configuration = new Configuration();
  31. //用户
  32. String user = "hadoop";
  33. //获取到了客户端对象
  34. FileSystem fs = FileSystem.get(uri, configuration, user);
  35. // 获取迭代器对象("/"表示获取全部目录下的文件)
  36. RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
  37. while (listFiles.hasNext()) {
  38. LocatedFileStatus fileStatus = listFiles.next();
  39. System.out.println("文件权限:" + fileStatus.getPermission());
  40. System.out.println("文件所有者:" + fileStatus.getOwner());
  41. System.out.println("文件路径:" + fileStatus.getPath());
  42. System.out.println("文件内容长度:" + fileStatus.getLen());
  43. System.out.println("修改时间:" + fileStatus.getModificationTime());
  44. System.out.println("备份数:" + fileStatus.getReplication());
  45. System.out.println("文件块大小:" + fileStatus.getBlockSize());
  46. System.out.println("文件名:" + fileStatus.getPath().getName());
  47. // 获取该文件块的信息
  48. BlockLocation[] blockLocations = fileStatus.getBlockLocations();
  49. for (BlockLocation bl : blockLocations) {
  50. // 获取DataNodes的主机名
  51. String[] hosts = bl.getHosts();
  52. for (String host : hosts) {
  53. System.out.println(host);
  54. }
  55. }
  56. System.out.println();
  57. }
  58. }

2、结果


四、上传文件

1、代码

  1. //上传文件
  2. @Test
  3. public static void testCopyFromLocalFile() throws Exception {
  4. //连接的集群地址
  5. URI uri = new URI("hdfs://hadoop100:8020");
  6. //创建一个配置信息
  7. Configuration configuration = new Configuration();
  8. //用户
  9. String user = "hadoop";
  10. //获取到了客户端对象
  11. FileSystem fs = FileSystem.get(uri, configuration, user);
  12. //上传一个文件
  13. fs.copyFromLocalFile(new Path("D:/hadoop.txt"), new Path("/hadoop/wenjian"));
  14. //关闭资源
  15. fs.close();
  16. }

2、结果

五、下载文件

1、代码

  1. //下载文件
  2. @Test
  3. public static void testCopyToLocalFile() throws Exception {
  4. //连接的集群地址
  5. URI uri = new URI("hdfs://hadoop100:8020");
  6. //创建一个配置信息
  7. Configuration configuration = new Configuration();
  8. //用户
  9. String user = "hadoop";
  10. //获取到了客户端对象
  11. FileSystem fs = FileSystem.get(uri, configuration, user);
  12. //下载一个文件
  13. fs.copyToLocalFile(new Path("/hadoop/wenjian/hadoop.txt"), new Path("D:/test.txt"));
  14. //关闭资源
  15. fs.close();
  16. }

2、结果


六、追加文件

1、代码

  1. //追加文件
  2. @Test
  3. public static void testAppendFile() throws URISyntaxException, IOException, InterruptedException {
  4. // 相关配置
  5. Configuration conf = new Configuration();//加载配置文件,将配置文件放在src下
  6. {
  7. conf.setBoolean("dfs.support.append", true);
  8. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
  9. conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
  10. }
  11. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100"), conf, "hadoop100");//获取文件系统实例
  12. ObjectMapper objectMapper = new ObjectMapper();
  13. FSDataOutputStream output = null;
  14. output = fs.append(new Path("/hadoop/wenjian/hadoop.txt"));
  15. // MessageHeader listContent = null;
  16. // System.out.println(listContent.toString());
  17. // output.write(objectMapper.writeValueAsString(listContent).getBytes("UTF-8"));
  18. output.write(("\n"+"hello hbase").getBytes("UTF-8"));
  19. // output.write("\n".getBytes("UTF-8"));//换行
  20. }

2、结果

运行读文件


七、读文件

1、代码

  1. //读文件
  2. @Test
  3. public static void testReadFile() throws Exception {
  4. //连接的集群地址
  5. URI uri = new URI("hdfs://hadoop100:8020");
  6. //创建一个配置信息
  7. Configuration configuration = new Configuration();
  8. //用户
  9. String user = "hadoop";
  10. //获取到了客户端对象
  11. FileSystem fs = FileSystem.get(uri, configuration, user);
  12. // 创建路径对象(指向目录或文件)
  13. Path path = new Path("/hadoop/wenjian/hadoop.txt");
  14. // 创建文件系统数据字节输入流对象
  15. FSDataInputStream in = fs.open(path);
  16. // 创建缓冲字符输入流对象,提高读取效率(字节流-->字符流-->缓冲流)
  17. BufferedReader br = new BufferedReader(new InputStreamReader(in));
  18. // 定义行字符串
  19. String nextLine = "";
  20. // 通过循环读取缓冲字符输入流
  21. while ((nextLine = br.readLine()) != null) {
  22. // 在控制台输出读取的行内容
  23. System.out.println(nextLine);
  24. }
  25. // 关闭缓冲字符输入流
  26. br.close();
  27. // 关闭文件系统数据字节输入流
  28. in.close();
  29. // 关闭文件系统
  30. fs.close();
  31. }

2、结果

八、完整代码

  1. package edu.sugon;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.*;
  4. import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
  5. import org.junit.Test;
  6. import java.io.*;
  7. import java.net.URI;
  8. import java.net.URISyntaxException;
  9. public class HDFSOpt {
  10. //创建文件
  11. @Test
  12. public static void testMkdirFile() throws URISyntaxException, IOException, InterruptedException {
  13. //连接的集群地址
  14. URI uri = new URI("hdfs://hadoop100:8020");
  15. //创建一个配置信息
  16. Configuration configuration = new Configuration();
  17. //用户
  18. String user = "hadoop";
  19. //获取到了客户端对象
  20. FileSystem fs = FileSystem.get(uri, configuration, user);
  21. //创建一个文件夹
  22. fs.mkdirs(new Path("/hadoop/wenjian"));
  23. //关闭资源
  24. fs.close();
  25. }
  26. //删除文件
  27. @Test
  28. public static void testDeleteFile() throws Exception {
  29. //连接的集群地址
  30. URI uri = new URI("hdfs://hadoop100:8020");
  31. //创建一个配置信息
  32. Configuration configuration = new Configuration();
  33. //用户
  34. String user = "hadoop";
  35. //获取到了客户端对象
  36. FileSystem fs = FileSystem.get(uri, configuration, user);
  37. //删除一个文件,b:代表是否递归,true会删除指定文件夹及内容,false删除指定文件
  38. fs.delete(new Path("/hadoop/wenjian"), false);
  39. //关闭资源
  40. fs.close();
  41. }
  42. //列出目录下文件
  43. @Test
  44. public static void testListFile() throws IOException, URISyntaxException, InterruptedException {
  45. //连接的集群地址
  46. URI uri = new URI("hdfs://hadoop100:8020");
  47. //创建一个配置信息
  48. Configuration configuration = new Configuration();
  49. //用户
  50. String user = "hadoop";
  51. //获取到了客户端对象
  52. FileSystem fs = FileSystem.get(uri, configuration, user);
  53. //获取目录状态
  54. FileStatus[] listStatus = fs.listStatus(new Path("/"));
  55. for (FileStatus list : listStatus) {//增强for循环遍历listStatus
  56. System.out.println(list.getPath().getName());
  57. }
  58. RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
  59. //递归列出该目录下所有文件,不包括文件夹,后面的布尔值为是否递归
  60. while (listFiles.hasNext()) {//如果listfiles里还有东西
  61. LocatedFileStatus next = listFiles.next();//得到下一个并pop出listFiles
  62. System.out.println(next.getPath().getName());//输出
  63. }
  64. }
  65. //列出目录下文件信息
  66. @Test
  67. public static void testDetailFile() throws URISyntaxException, IOException, InterruptedException {
  68. //连接的集群地址
  69. URI uri = new URI("hdfs://hadoop100:8020");
  70. //创建一个配置信息
  71. Configuration configuration = new Configuration();
  72. //用户
  73. String user = "hadoop";
  74. //获取到了客户端对象
  75. FileSystem fs = FileSystem.get(uri, configuration, user);
  76. // 获取迭代器对象("/"表示获取全部目录下的文件)
  77. RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
  78. while (listFiles.hasNext()) {
  79. LocatedFileStatus fileStatus = listFiles.next();
  80. System.out.println("文件权限:" + fileStatus.getPermission());
  81. System.out.println("文件所有者:" + fileStatus.getOwner());
  82. System.out.println("文件路径:" + fileStatus.getPath());
  83. System.out.println("文件内容长度:" + fileStatus.getLen());
  84. System.out.println("修改时间:" + fileStatus.getModificationTime());
  85. System.out.println("备份数:" + fileStatus.getReplication());
  86. System.out.println("文件块大小:" + fileStatus.getBlockSize());
  87. System.out.println("文件名:" + fileStatus.getPath().getName());
  88. // 获取该文件块的信息
  89. BlockLocation[] blockLocations = fileStatus.getBlockLocations();
  90. for (BlockLocation bl : blockLocations) {
  91. // 获取DataNodes的主机名
  92. String[] hosts = bl.getHosts();
  93. for (String host : hosts) {
  94. System.out.println(host);
  95. }
  96. }
  97. System.out.println();
  98. }
  99. }
  100. //上传文件
  101. @Test
  102. public static void testCopyFromLocalFile() throws Exception {
  103. //连接的集群地址
  104. URI uri = new URI("hdfs://hadoop100:8020");
  105. //创建一个配置信息
  106. Configuration configuration = new Configuration();
  107. //用户
  108. String user = "hadoop";
  109. //获取到了客户端对象
  110. FileSystem fs = FileSystem.get(uri, configuration, user);
  111. //上传一个文件
  112. fs.copyFromLocalFile(new Path("D:/hadoop.txt"), new Path("/hadoop/wenjian"));
  113. //关闭资源
  114. fs.close();
  115. }
  116. //下载文件
  117. @Test
  118. public static void testCopyToLocalFile() throws Exception {
  119. //连接的集群地址
  120. URI uri = new URI("hdfs://hadoop100:8020");
  121. //创建一个配置信息
  122. Configuration configuration = new Configuration();
  123. //用户
  124. String user = "hadoop";
  125. //获取到了客户端对象
  126. FileSystem fs = FileSystem.get(uri, configuration, user);
  127. //下载一个文件
  128. fs.copyToLocalFile(new Path("/hadoop/wenjian/hadoop.txt"), new Path("D:/test.txt"));
  129. //关闭资源
  130. fs.close();
  131. }
  132. //读文件
  133. @Test
  134. public static void testReadFile() throws Exception {
  135. //连接的集群地址
  136. URI uri = new URI("hdfs://hadoop100:8020");
  137. //创建一个配置信息
  138. Configuration configuration = new Configuration();
  139. //用户
  140. String user = "hadoop";
  141. //获取到了客户端对象
  142. FileSystem fs = FileSystem.get(uri, configuration, user);
  143. // 创建路径对象(指向目录或文件)
  144. Path path = new Path("/hadoop/wenjian/hadoop.txt");
  145. // 创建文件系统数据字节输入流对象
  146. FSDataInputStream in = fs.open(path);
  147. // 创建缓冲字符输入流对象,提高读取效率(字节流-->字符流-->缓冲流)
  148. BufferedReader br = new BufferedReader(new InputStreamReader(in));
  149. // 定义行字符串
  150. String nextLine = "";
  151. // 通过循环读取缓冲字符输入流
  152. while ((nextLine = br.readLine()) != null) {
  153. // 在控制台输出读取的行内容
  154. System.out.println(nextLine);
  155. }
  156. // 关闭缓冲字符输入流
  157. br.close();
  158. // 关闭文件系统数据字节输入流
  159. in.close();
  160. // 关闭文件系统
  161. fs.close();
  162. }
  163. //追加文件
  164. @Test
  165. public static void testAppendFile() throws URISyntaxException, IOException, InterruptedException {
  166. // 相关配置
  167. Configuration conf = new Configuration();//加载配置文件,将配置文件放在src下
  168. {
  169. conf.setBoolean("dfs.support.append", true);
  170. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
  171. conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
  172. }
  173. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100"), conf, "hadoop100");//获取文件系统实例
  174. ObjectMapper objectMapper = new ObjectMapper();
  175. FSDataOutputStream output = null;
  176. output = fs.append(new Path("/hadoop/wenjian/hadoop.txt"));
  177. // MessageHeader listContent = null;
  178. // System.out.println(listContent.toString());
  179. // output.write(objectMapper.writeValueAsString(listContent).getBytes("UTF-8"));
  180. output.write(("\n"+"hello hbase").getBytes("UTF-8"));
  181. // output.write("\n".getBytes("UTF-8"));//换行
  182. }
  183. public static void main( String[] args ) throws Exception {
  184. //创建文件
  185. // testMkdirFile();
  186. //删除文件
  187. // testDeleteFile();
  188. //列出目录下文件
  189. // testListFile();
  190. //列出目录下文件信息
  191. // testDetailFile();
  192. //上传文件
  193. // testCopyFromLocalFile();
  194. //下载文件
  195. // testCopyToLocalFile();
  196. //读文件
  197. testReadFile();
  198. //追加文件
  199. // testAppendFile();
  200. }
  201. }

五、实验心得:

该实验通过使用JavaApi对hdfs中的文件进行了创建、删除、上传、下载、读取、追加以及列出目录下的文件的基本操作,使我们对其有了进一步的了解。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/480510
推荐阅读
相关标签
  

闽ICP备14008679号