当前位置:   article > 正文

Java程序调用Hadoop接口入门_java使用hadoop

java使用hadoop

一、安装Hadoop集群环境

参考http://blog.itpub.net/29485627/viewspace-2137702/

 

二、程序编写

1 整个程序的目录为

 

2 HdfsUtil.java中的代码为

  1. package hadoop.hdfs;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.util.Arrays;
  6. import java.util.List;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.FSDataInputStream;
  9. import org.apache.hadoop.fs.FSDataOutputStream;
  10. import org.apache.hadoop.fs.FileStatus;
  11. import org.apache.hadoop.fs.FileSystem;
  12. import org.apache.hadoop.fs.Path;
  13. import org.apache.hadoop.hdfs.DistributedFileSystem;
  14. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. /**
  18. * @author 作者 : yangyang
  19. * @version 创建时间:2016年4月21日 类说明 :hdfs文件系统操作类
  20. */
  21. public class HdfsUtil {
  22. private static final Logger log = LoggerFactory.getLogger(HdfsUtil.class);
  23. // 初始化
  24. static Configuration conf = new Configuration();
  25. static FileSystem hdfs;
  26. static {
  27. try {
  28. hdfs = FileSystem.get(conf);
  29. } catch (IOException e) {
  30. if (log.isDebugEnabled())
  31. log.debug("初始化hadoop配置失败", e);
  32. }
  33. }
  34. /**
  35. * 创建文件夹
  36. *
  37. * @param dir
  38. * @throws IOException
  39. */
  40. public static boolean mkDirs(String dir) {
  41. try {
  42. Path path = new Path(dir);
  43. return hdfs.mkdirs(path);
  44. } catch (IOException e) {
  45. // TODO Auto-generated catch block
  46. e.printStackTrace();
  47. }
  48. return false;
  49. }
  50. /**
  51. * 本地文件上传到hdfs
  52. *
  53. * @param localSrc
  54. * @param hdfsDst
  55. * @throws IOException
  56. */
  57. public static void uploadFile(String localSrc, String hdfsDst) throws IOException {
  58. Path src = new Path(localSrc);
  59. Path dst = new Path(hdfsDst);
  60. hdfs.copyFromLocalFile(src, dst);
  61. FileStatus files[] = hdfs.listStatus(dst);
  62. System.out.println("Upload to \t" + conf.get("fs.default.name")
  63. + hdfsDst);
  64. for (FileStatus file : files) {
  65. System.out.println(file.getPath());
  66. }
  67. }
  68. /**
  69. * 下载文件到本地
  70. * @param remotePath hdfs文件目录
  71. * @param localPath 本地文件目录
  72. */
  73. public static void downLoadFile(String remotePath, String localPath) {
  74. Path _remotePath = new Path(remotePath);
  75. Path _localPath = new Path(localPath);
  76. try {
  77. hdfs.copyToLocalFile(false,_remotePath, _localPath,true);
  78. } catch (IOException e) {
  79. // TODO Auto-generated catch block
  80. e.printStackTrace();
  81. } finally {
  82. try {
  83. hdfs.close();
  84. } catch (IOException e) {
  85. // TODO Auto-generated catch block
  86. e.printStackTrace();
  87. }
  88. }
  89. }
  90. /**
  91. * 创建文件
  92. *
  93. * @param fileName
  94. * @param fileContent
  95. * @throws IOException
  96. */
  97. public static void createFile(String fileName, String fileContent) {
  98. Path dst = new Path(fileName);
  99. byte[] bytes = fileContent.getBytes();
  100. FSDataOutputStream output;
  101. try {
  102. output = hdfs.create(dst);
  103. output.write(bytes);
  104. } catch (IOException e) {
  105. if (log.isDebugEnabled())
  106. log.debug("创建文件异常:" + fileName, e);
  107. }
  108. System.out.println("new file \t" + conf.get("fs.default.name")
  109. + fileName);
  110. }
  111. /**
  112. * 获取文件内容
  113. * @param fileName 文件名
  114. * @return
  115. */
  116. public static String readFileContent(String fileName){
  117. Path p = new Path(conf.get("fs.default.name")+fileName);
  118. FSDataInputStream in = null;
  119. String content = "";
  120. try {
  121. in = hdfs.open(p);
  122. BufferedReader buff = new BufferedReader(new InputStreamReader(in));
  123. content = buff.readLine();
  124. buff.close();
  125. in.close();
  126. hdfs.close();
  127. } catch (IOException e) {
  128. // TODO Auto-generated catch block
  129. if(log.isDebugEnabled())
  130. log.debug("读取文件:"+fileName+"失败", e);
  131. }
  132. return content;
  133. }
  134. public void listFiles(String dirName) throws IOException {
  135. Path f = new Path(dirName);
  136. FileStatus[] status = hdfs.listStatus(f);
  137. System.out.println(dirName + " has all files:");
  138. for (int i = 0; i < status.length; i++) {
  139. System.out.println(status[i].getPath().toString());
  140. }
  141. }
  142. /**
  143. * 删除文件
  144. *
  145. * @param fileName
  146. * 文件路径
  147. * @throws IOException
  148. */
  149. public static boolean deleteFile(String fileName) throws IOException {
  150. Path f = new Path(fileName);
  151. boolean isExists = hdfs.exists(f);
  152. if (isExists) {
  153. boolean isDel = hdfs.delete(f, true);
  154. return isDel;
  155. } else {
  156. return false;
  157. }
  158. }
  159. /**
  160. * 获取集群上的所有节点名称
  161. *
  162. * @throws IOException
  163. */
  164. public static List<DatanodeInfo> getDateNodeHost() throws IOException {
  165. DistributedFileSystem _hdfs = (DistributedFileSystem) hdfs;
  166. DatanodeInfo[] dataNodeStats = _hdfs.getDataNodeStats();
  167. List<DatanodeInfo> dataNodeLst = Arrays.asList(dataNodeStats);
  168. return dataNodeLst;
  169. }
  170. /**
  171. * 文件重命名
  172. *
  173. * @param fileName
  174. * 文件名
  175. * @param newFileName
  176. * 新文件名
  177. * @throws IOException
  178. */
  179. public static boolean renameFile(String fileName, String newFileName) {
  180. Path path = new Path(fileName);
  181. Path newPath = new Path(newFileName);
  182. boolean b = false;
  183. try {
  184. b = hdfs.rename(path, newPath);
  185. } catch (Exception e) {
  186. if (log.isDebugEnabled())
  187. log.debug("文件:[" + fileName + "]修改为:[" + newFileName + "]失败", e);
  188. }
  189. return b;
  190. }
  191. public static void main(String[] args) throws IOException {
  192. /* System.err.println(mkDirs("/test"));
  193. createFile("/test/my.txt", "I Love Beijing!");
  194. System.out.println(readFileContent("/test/my.txt"));*/
  195. // deleteFile("/test/my.txt");
  196. // uploadFile("F:\\a1.txt", "/test");
  197. downLoadFile("/test/a1.txt", "F:\\a2.txt");
  198. }
  199. }

 

3 src/main/resouces中的四个配置文件从hadoop环境中拷贝,具体位于/usr/hadoop/etc/hadoop中

 

三、验证

1 运行HdfsUtil中的main方法的上三行(下三行先注释起来),结果为

true

new file    hdfs://192.168.121.201:9000/test/my.txt

 

在hadoop环境中验证

# hadoop fs -ls /

# hadoop fs -ls /test

 

2 将main方法中的第四行代码打开,其他行代码都注释,重新跑一次。

进hadoop环境中验证,发现my.txt文件已成功删除


3 建立F:\a1.txt文件,其中的内容随意输入,比如“abcdefg”。

将main方法中的第五行代码打开,其他行代码都注释,重新跑一次。运行结果为:

Uploadto   hdfs://192.168.121.201:9000/test

hdfs://192.168.121.201:9000/test/a1.txt


进hadoop环境中验证,发现a1.txt已经被传到hadoop中



4 将main方法中的第六行代码打开,其他行代码都注释,重新跑一次。发现hadoop中的/test/a1.txt已被下载到F:\a2.txt,其中的内容为“abcdefg”

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/740464?site
推荐阅读
相关标签
  

闽ICP备14008679号