当前位置:   article > 正文

MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs_mapreduce怎么下载文件下载到本地

mapreduce怎么下载文件下载到本地
为利用Hadoop集群平台的分布存储和计算能力,基于MapReduce将ftp文件分布式下载并上传到HDFS中。

1、文件移动流程:ftp服务器->datanode本地目录->HDFS目录;

2、实现主要基于两个设计思想:
   1)将FTP服务器文件列表作为MapReduce处理的数据对象,按照文件名分布到不同Reduce节点下载和上传到HDFS中;
   2)在每个datanode节点都建立一个本地文件保存目录,最好是统一路径名,这样每个Reduce节点都把FTP服务器文件下载到该目录下;

3、代码主要过程:
   1)驱动类中先读取FTP服务器上要下载的文件列表,并移入到hdfs中,作为Map函数的输入;
   2)Map函数处理文件列表,获取文件名字,作为Reduce函数输入;
   3)Reduce函数根据输入的文件名去下载ftp服务器上对应的文件,并下载到datanode节点的统一本地目录,再将本地目录文件上传到HDFS中;

4、主要技术点:
   1)FTPClient实现ftp文件下载;
   2)hadoop的IOUtils类实现文件从本地上传到HDFS;

5、准备工作
   1)ftp服务器端口、用户名和密码、下载文件目录;
      linux下ftp命令:进入$ftp ip/常用命令:ls/cd/put/get/mput/mget
   2)每个节点统一建立本地目录/tmp/fjs/localftp,保存ftp服务器上下载的文件;
   3)Namenode上建立HDFS保存文件的目录/tmp/fjs/ftp;
   4)Namenode上建立HDFS保存文件列表的目录/tmp/fjs/in,即Map函数的输入数据;

6、具体代码:

   1)主类FtpMR:驱动类加MapReduce类;

  1. package ct.gd;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.hadoop.mapreduce.Reducer;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import org.apache.hadoop.util.GenericOptionsParser;
  12. public class FtpMR {
  13. public static class FtpMap extends Mapper<Object,Text,Text,Text>{
  14. private Text _key = new Text();
  15. private Text _value = new Text();
  16. public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
  17. String line = value.toString();
  18. //tag是随机值,目的是将文件均匀分到各节点下载,随机范围根据集群节点数,这里是0-100内
  19. //假设下载文件有1000个,100随机范围,集群有100个节点,那每个节点均匀可能获得10个文件下载,
  20. //map输出的<key,value>,输入reduce时,key值相同的会形成value list,因此设计该随机key值
  21. String tag = ComMethod.getRandomNbr();
  22. _key.set(tag);
  23. _value.set(line);
  24. context.write(_key,_value);
  25. }
  26. }
  27. public static class FtpReduce extends Reducer<Text,Text,Text,Text>{
  28. public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
  29. String ftpSrv=context.getConfiguration().get("ftpSrv");//获取ftp服务器连接信息
  30. String outputPath=context.getConfiguration().get("outputPath");//获取hdfs存放文件的目录
  31. FtpUtil fu=new FtpUtil();
  32. for(Text value:values){
  33. String filename=value.toString();//输入的value是ftp服务器上的文件名
  34. String localFile=fu.DownFileToLocal(ftpSrv,filename);//下载文件到本地目录,并返回文件保存的路径
  35. if (localFile!=null) fu.WriteFileToHDFS(localFile,ComMethod.changeToDir(outputPath),filename);//本地文件上传到hdfs中
  36. }
  37. }
  38. }
  39. public static void main(String[] args) throws Exception {
  40. Configuration conf = new Configuration();
  41. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  42. if (otherArgs.length != 2) {
  43. System.err.println("Usage: FtpMR <in> <out>");
  44. System.exit(2);
  45. }
  46. String inputPath=otherArgs[0];//FTP服务器保存文件列表的文件目录
  47. String outputPath=otherArgs[1];//下载的ftp文件保存在hdfs中的目录
  48. FtpUtil fu=new FtpUtil();
  49. //ftp服务器字符串格式:IP|port|username|password|file directory
  50. String strFtpSrv="IP|port|name|password|directory";
  51. //获取ftp服务器上文件列表,保存到hdfs的inputPath目录下
  52. if(!fu.getFtpFileList(strFtpSrv,inputPath)){
  53. System.err.println("下载ftp服务器文件列表失败");
  54. System.exit(2);
  55. }
  56. //将ftp服务器的参数作为参数传递到Reduce中
  57. conf.set("ftpSrv", strFtpSrv);
  58. //将hdfs上保存下载文件的目录传递到Reduce中
  59. conf.set("outputPath", outputPath);
  60. Job job = new Job(conf, "FtpToHdfs");
  61. job.setJarByClass(FtpMR.class);
  62. //job.setNumReduceTasks(1);//设置reduce输入文件一个,方便查看结果
  63. job.setMapperClass(FtpMap.class);
  64. job.setReducerClass(FtpReduce.class);
  65. job.setOutputKeyClass(Text.class);
  66. job.setOutputValueClass(Text.class);
  67. FileInputFormat.addInputPath(job, new Path(inputPath));
  68. FileOutputFormat.setOutputPath(job, new Path(outputPath));
  69. System.exit(job.waitForCompletion(true) ? 0 : 1);
  70. }
  71. }

   2)接口类FtpUtil:主要处理ftp文件下载和写入hdfs中;

  1. package ct.gd;
  2. import java.io.BufferedInputStream;
  3. import java.io.File;
  4. import java.io.FileInputStream;
  5. import java.io.FileOutputStream;
  6. import java.io.IOException;
  7. import java.io.InputStream;
  8. import java.io.OutputStream;
  9. import org.apache.commons.net.ftp.FTP;
  10. import org.apache.commons.net.ftp.FTPClient;
  11. import org.apache.commons.net.ftp.FTPFile;
  12. import org.apache.commons.net.ftp.FTPReply;
  13. import org.apache.hadoop.conf.Configuration;
  14. import org.apache.hadoop.fs.FSDataOutputStream;
  15. import org.apache.hadoop.fs.FileSystem;
  16. import org.apache.hadoop.fs.Path;
  17. import org.apache.hadoop.io.IOUtils;
  18. public class FtpUtil {
  19. /*下载文件列表处理函数,开始*/
  20. public boolean getFtpFileList(String strFtpSrv,String inputPath){//从ftp服务器上读取文件列表
  21. String[] FtpSrvConn=strFtpSrv.split("\\|");//截取ftp服务器连接信息
  22. FTPClient ftp = new FTPClient();
  23. try {
  24. ftp.connect(FtpSrvConn[0], Integer.parseInt(FtpSrvConn[1])); //url和port
  25. ftp.login(FtpSrvConn[2], FtpSrvConn[3]); //name和password
  26. int reply = ftp.getReplyCode();
  27. if (!FTPReply.isPositiveCompletion(reply)) {
  28. ftp.disconnect();
  29. return false;
  30. }
  31. String remotePath=FtpSrvConn[4];//Ftp服务器上文件目录
  32. ftp.changeWorkingDirectory(remotePath);
  33. FTPFile[] fs = ftp.listFiles(remotePath);
  34. StringBuffer buffer = new StringBuffer();
  35. for(FTPFile ff:fs){
  36. String fileName = ff.getName();
  37. buffer.append(fileName+"\n");
  38. }
  39. if(writeBufferToHDFSFile(buffer, inputPath)){
  40. ftp.logout();
  41. return true;
  42. }
  43. ftp.logout();
  44. } catch (IOException e) {System.out.println(e.getMessage());}
  45. finally {
  46. if (ftp.isConnected()) {
  47. try { ftp.disconnect(); } catch (IOException ioe) { System.out.println(ioe.getMessage()); }
  48. }
  49. }
  50. return false;
  51. }
  52. private boolean writeBufferToHDFSFile(StringBuffer buffer, String inputPath){//将文件列表写到hdfs中
  53. Configuration conf = new Configuration();
  54. FileSystem fs = null;
  55. String fileName="fileLists.txt";
  56. try {
  57. fs = FileSystem.get(conf);
  58. inputPath = ComMethod.changeToDir(inputPath) + fileName;
  59. Path fsInputPath=new Path(inputPath);
  60. FSDataOutputStream outputStream = fs.create(fsInputPath);
  61. outputStream.write(buffer.toString().getBytes("UTF-8"));
  62. outputStream.flush();
  63. outputStream.sync();
  64. outputStream.close();
  65. return true;
  66. } catch (IOException e) {System.out.println(e.getMessage());}
  67. return false;
  68. }
  69. /*下载文件列表处理函数,结束*/
  70. /*下载文件处理函数,开始*/
  71. public String DownFileToLocal(String ftpSrv,String filename){
  72. //在节点上创建本地保存下载文件的目录
  73. String localPath="/tmp/fjs/localftp";
  74. File localDir = new File(localPath);//如果不存在就创建
  75. if(!localDir.exists()){
  76. localDir.mkdirs();
  77. }
  78. FTPClient ftp = new FTPClient();
  79. String[] FtpSrvConn=ftpSrv.split("\\|");//截取ftp服务器连接信息
  80. try {
  81. ftp.connect(FtpSrvConn[0], Integer.parseInt(FtpSrvConn[1])); //url和port
  82. ftp.login(FtpSrvConn[2], FtpSrvConn[3]); //name和password
  83. int reply = ftp.getReplyCode();
  84. if (!FTPReply.isPositiveCompletion(reply)) {
  85. ftp.disconnect();
  86. return null;
  87. }
  88. String remotePath=FtpSrvConn[4];//Ftp服务器上文件目录
  89. ftp.changeWorkingDirectory(remotePath);
  90. String localFilePath = ComMethod.changeToDir(localPath) + filename;
  91. File localFile = new File(localFilePath);
  92. OutputStream is = new FileOutputStream(localFile);
  93. ftp.retrieveFile(filename, is);//下载
  94. is.close();
  95. ftp.logout();
  96. return localFilePath;
  97. } catch (IOException e) {
  98. System.err.println(e.getMessage());
  99. } finally {
  100. if (ftp.isConnected()) {
  101. try { ftp.disconnect(); } catch (IOException ioe) { }
  102. }
  103. }
  104. return null;
  105. }
  106. /*下载文件处理函数,结束*/
  107. /*上传文件到hdfs处理函数,开始*/
  108. public void WriteFileToHDFS(String localFile,String outputPath,String filename){
  109. Configuration conf = new Configuration();
  110. FileSystem fs = null;
  111. try {
  112. fs=FileSystem.get(conf);
  113. InputStream in = new BufferedInputStream(new FileInputStream(localFile));
  114. String ouputFile = outputPath + filename;//hdfs存放文件路劲和名字
  115. OutputStream out = fs.create(new Path(ouputFile));
  116. IOUtils.copyBytes(in, out, 1024*1024,true);//迁移
  117. out.flush();
  118. if(out!=null) out.close();
  119. if(in!=null) in.close();
  120. //删除本地文件
  121. File _outputFileName = new File(localFile);
  122. if(_outputFileName.exists()) _outputFileName.delete();
  123. } catch (IOException e) {e.printStackTrace();}
  124. }
  125. /*上传文件到hdfs处理函数,结束*/
  126. public static void main(String[] args) throws Exception {
  127. }
  128. }

   3)通用函数类ComMethod:主要是一些通用字符处理函数;

  1. package ct.gd;
  2. import java.util.Random;
  3. public class ComMethod {
  4. public static String changeToDir(String dirPath){//目录最后是否有/
  5. if(dirPath.charAt(dirPath.length()-1)!='/'){
  6. dirPath = dirPath + "/";
  7. }
  8. return dirPath;
  9. }
  10. public static String getRandomNbr(){//获取随机数
  11. Random rand = new Random();
  12. String nbr = String.valueOf(rand.nextInt(100));
  13. return nbr;
  14. }
  15. }



7、执行结果
   1)执行命令:yarn jar /mnt/mr.jar /tmp/fjs/in /tmp/fjs/ftp
   2)hadoop fs -ls /tmp/fjs/in 可以看到文件列表文件
   3)hadoop fs -ls /tmp/fjs/ftp 可以看到下载的文件
   4)每个节点ls -l /tmp/fjs/localftp,如果文件都迁入hdfs,应该为空

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/677431
推荐阅读
相关标签
  

闽ICP备14008679号