赞
踩
注意文件的位置
data.txt:
file.txt:
新建Class:UploadHdfs:
package hdfsFileOperations; import java.net.URI; import java.util.Scanner; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class UploadHdfs { //本地文件上传到Hdfs public void uploadFile() throws IOException{ String src="/home/zhangjian/file/data.txt"; String name="/data.txt"; String dst="/user/hadoop/input"; Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),conf); Path srcPath = new Path(src);//原路径 Path dstPath1 = new Path(dst+name);//目标路径+文件名,用于判断文件是够在目标路径存在 Path dstPath = new Path(dst+name);//目标路径 //判断是否存在相同名称的文件 if(!fs.exists(dstPath1))//不存在,直接复制 { fs.copyFromLocalFile(false,srcPath, dstPath);//(false不删除本地文件;true删除本地文件,原路径,目标路径) } else//若存在,由用户选择上传方式 { System.out.println("温馨提示:该文件已经存在,你可以选择如下选项进行操作"); System.out.println("1、覆盖原文件"); System.out.println("2、添加到文件末尾"); System.out.println("0、退出"); System.out.print("请输入你的选择"); Scanner in=new Scanner(System.in); int x=in.nextInt(); if(x==1) { fs.copyFromLocalFile(false,srcPath, dstPath); } else if(x==2) appendFile(src,dst); else if(x==0) return; } System.out.println("上传成功!"); } //添加到文件末尾 public void appendFile(String src,String dst) throws IOException{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),conf); Path dstPath = new Path(dst); InputStream in = new BufferedInputStream(new FileInputStream(src)); FSDataOutputStream out = fs.append(dstPath); IOUtils.copyBytes(in,out,4096,true); fs.close(); } }
创建DownloadHdfs类:
package hdfsFileOperations; import java.net.URI; import java.util.Scanner; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class DownloadHdfs { public void download() throws IOException{ String oldPath="/user/hadoop/input/data.txt"; String name="data.txt"; String newPath="/home/zhangjian/file/"; Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),conf); Path ofile = new Path(oldPath); File nfile1=new File(newPath+name); File nfile=new File(newPath); FSDataInputStream in = fs.open(ofile); if(!nfile1.exists()) { FileOutputStream out = new FileOutputStream(nfile); IOUtils.copyBytes(in,out,2048,true); } else{ Scanner sc=new Scanner(System.in); System.out.print("该文件在本地已经存在,请修改文件名称:"); String new2=sc.next(); File newfile = new File(newPath+new2); FileOutputStream out = new FileOutputStream(newfile); IOUtils.copyBytes(in,out,2048,true); } System.out.println("文件下载到本地成功!"); } //文件重命名 public void rename(String oldname,String newname) throws IOException{ Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),conf); Path oldPath = new Path(oldname); Path newPath = new Path(newname); boolean flag = fs.rename(oldPath,newPath); if(flag){ System.out.println("重命名成功!"); }else{ System.out.println("重命名失败!"); } } }
创建OutputHdfsTOTerminal类:
package hdfsFileOperations; import java.net.URI; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class OutputHdfsTOTerminal { //文件内容输出到终端 public void displayFile() throws IOException{ String uri="/user/hadoop/input/Merge.txt"; Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),conf); InputStream in = null; try{ in = fs.open(new Path(uri)); IOUtils.copyBytes(in,System.out,4096,false); System.out.println("输出到终端成功!"); }catch(Exception e){ e.printStackTrace(); }finally{ IOUtils.closeStream(in); } } }
创建DisplayHdfsContent类:
package hdfsFileOperations; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Date; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class DisplayHdfsContent { //显示指定文件大小、权限、创建时间、权限等 public void displayHdfsContent()throws IOException{ String remote="/user/hadoop/input/Merge.txt"; Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),conf); FileStatus[] status = fs.listStatus(new Path(remote)); for(int i=0;i<status.length;i++){ long time=status[i].getModificationTime(); Date date = new Date(time); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String data = sdf.format(date); System.out.println("路径: " + status[i].getPath()+ " 文件大小: " + status[i].getLen() + " 权限: " + status[i].getPermission() + " 文件创建时间: "+data); } } }
创建类DisplayAllHdfsContent:
package hdfsFileOperations; import java.text.SimpleDateFormat; import java.util.Date; import java.io.*; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class DisplayAllHdfsContent { //显示所有文件大小、权限、创建时间、权限等 public void displayAllHdfsContent () throws IOException{ String filePath="/user/hadoop/input/"; Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),conf); FileStatus[] status = fs.listStatus(new Path(filePath)); for(int i=0;i<status.length;i++){ long time =status[i].getModificationTime(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(time); String data = format.format(date); System.out.println("文件大小为:"+status[i].getBlockSize()+" 文件路径为:"+status[i].getPath()+" 文件的权限为:"+status[i].getPermission()+" 文件创建时间:"+data); } } }
创建类DeleteOrAddHdfs:
package hdfsFileOperations; import java.net.URI; import java.util.Scanner; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class DeleteOrAddHdfs { //创建文件** public void deleteOrAddHdfs() throws IOException{ System.out.println("1、创建文件"); System.out.println("2、删除文件"); System.out.println("3、退出!"); System.out.print("请输入你的选择:"); Scanner in = new Scanner(System.in); int a=in.nextInt(); switch(a){ case 1: mk();break; case 2: rm();break; case 3:break; } } public void mk() throws IOException{ String upremote="/user/hadoop/input"; String remote="/user/hadoop/input/myfile.txt"; Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs =FileSystem.get(URI.create("hdfs://localhost:9000"),conf); Path a = new Path(upremote); Path b = new Path(remote); if(!fs.exists(a)) { System.out.println("该路径不存在,即将进行创建"); fs.mkdirs(a); FSDataOutputStream out = fs.create(b); out.close(); System.out.println("文件创建成功!"); }else{ System.out.println("该路径存在,即将创建目标文件"); FSDataOutputStream out = fs.create(b); out.close(); System.out.println("文件创建成功!"); } } //删除文件,由用户指定删除目录不为空时,是否还进行删除 public static void rm() throws IOException{ String remote="/user/hadoop/input/myfile.txt"; Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs =FileSystem.get(URI.create("hdfs://localhost:9000"),conf); Path a = new Path(remote); FileStatus[] status = fs.listStatus(a); if(status.length>0) { System.out.println("文件目录不为空,你可以做如下操作:"); System.out.println("1、继续删除"); System.out.println("2、不删除"); System.out.print("请输入你的选择:"); Scanner in = new Scanner(System.in); int x=in.nextInt(); if(x==1) { fs.delete(a,true); System.out.println("目录已经删除"); } else if(x==2) { System.out.println("未做任何操作!"); } } } }
创建类MoveHdfs:
package hdfsFileOperations; import java.net.URI; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class MoveHdfs { //在hdfs中进行文件的移动 public void moveHdfs() throws IOException{ String prepath="/user/hadoop/input/Merge.txt"; String newpath="/user/hadoop/file/"; Configuration conf = new Configuration(); FileSystem fs =FileSystem.get(URI.create("hdfs://localhost:9000"),conf); Path a = new Path(prepath); Path b = new Path(newpath); if(fs.rename(a,b)) { System.out.println("移动成功"); } } }
创建类ReadLine:
package hdfsFileOperations; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; public class ReadLine { public class MyFSDataInputStream extends FSDataInputStream { public MyFSDataInputStream(InputStream in) { super(in); // TODO Auto-generated constructor stub } } public String readline() throws IOException { String remoteFilePath="/user/hadoop/input/data.txt"; Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),conf); Path remotePath = new Path(remoteFilePath); FSDataInputStream in = fs.open(remotePath); BufferedReader d = new BufferedReader(new InputStreamReader(in)); String line = null; String a="未读取到文件尾"; if ((line = d.readLine()) != null) { d.close(); in.close(); return line; } return a; } }
创建类DisplayTheContent:
package hdfsFileOperations; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; public class DisplayTheContent { private String remotePath="/user/hadoop/input/data.txt"; public void show(){ try { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); InputStream inputStream = new URL("hdfs","localhost",9000,remotePath.toString()).openStream(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); String line = null; while ((line = bufferedReader.readLine()) != null){ System.out.println(line); } } catch (IOException e) { e.printStackTrace(); } } }
package view; import java.io.IOException; import java.util.Scanner; import hdfsFileOperations.DeleteOrAddHdfs; import hdfsFileOperations.DisplayAllHdfsContent; import hdfsFileOperations.DisplayHdfsContent; import hdfsFileOperations.DownloadHdfs; import hdfsFileOperations.MoveHdfs; import hdfsFileOperations.OutputHdfsTOTerminal; import hdfsFileOperations.UploadHdfs; //以下为对HDFS文件操作的目录 public class Menu { public static void main(String[] args) throws IOException{ UploadHdfs h1=new UploadHdfs(); DownloadHdfs h2=new DownloadHdfs(); OutputHdfsTOTerminal h3=new OutputHdfsTOTerminal(); DisplayHdfsContent h4=new DisplayHdfsContent(); DisplayAllHdfsContent h5=new DisplayAllHdfsContent(); DeleteOrAddHdfs h6=new DeleteOrAddHdfs(); MoveHdfs h7=new MoveHdfs(); ReadLine h8=new ReadLine(); DisplayTheContent h9=new DisplayTheContent(); Scanner input = new Scanner(System.in); while(true){ System.out.println("**********************JAVA的HDFS文件操作**********************"); System.out.println("1、向HDFS上传任意文本文件,如果指定的文件在HDFS中已经存在,由用户指定是追加到原有文件末尾还是覆盖原有的文件"); System.out.println("2、从HDFS中下载指定文件,如果本地文件与要下载的文件名称相同,则自动对下载的文件重命名"); System.out.println("3、将HDFS中指定文件的内容输出到终端中"); System.out.println("4、显示HDFS中指定的文件的读写权限、大小、创建时间、路径等信息"); System.out.println("5、给定HDFS中某一个目录,输出该目录下的所有文件的读写权限、大小、创建时间、路径等信息,如果该文件是目录,则递归输出该目录下所有文件相关信息"); System.out.println("6、提供一个HDFS内的文件的路径,对该文件进行创建和删除操作。如果文件所在目录不存在,则自动创建目录"); System.out.println("7、在HDFS中,将文件从源路径移动到目的路径"); System.out.println("8、实现按行读取HDFS中指定文件的方法“readLine()”,如果读到文件末尾,则返回空,否则返回文件一行的文本"); System.out.println("9、用”java.net.URL”和“org.apache.hadoop.fs.FsURLStreamHandlerFactory”编程完成输出HDFS中指定文件的文本到终端中"); System.out.println("0、退出!"); System.out.print("请输入你的选择:"); int a=input.nextInt(); switch(a){ case 1: h1.uploadFile();break; case 2: h2.download();break; case 3: h3.displayFile();break; case 4: h4.displayHdfsContent();break; case 5: h5.displayAllHdfsContent();break; case 6: h6.deleteOrAddHdfs();break; case 7: h7.moveHdfs();break; case 8: h8.readline();break; case 9: h9.show();break; case 0:break; } } } }
进入Hadoop环境
cd /usr/local/hadoop
启动Hadoop
./sbin/start-hdfs.sh
查看是否启动成功
jps
在Eclipse中运行程序
(1)向HDFS上传任意文本文件,如果指定的文件在HDFS中已经存在,由用户指定是追加到原有文件末尾还是覆盖原有的文件
再次执行第一步,覆盖源文件
查看上传的文件:
(2)从HDFS中下载指定文件,如果本地文件与要下载的文件名称相同,则自动对下载的文件重命名
查看data1.txt文件内容
(3)将HDFS中指定文件的内容输出到终端中
(4)显示HDFS中指定的文件的读写权限、大小、创建时间、路径等信息
(5)给定HDFS中某一个目录,输出该目录下的所有文件的读写权限、大小、创建时间、路径等信息,如果该文件是目录,则递归输出该目录下所有文件相关信息
(6)提供一个HDFS内的文件的路径,对该文件进行创建和删除操作。如果文件所在目录不存在,则自动创建目录
查看是否创建成功
./bin/hdfs dfs -ls /user/hadoop/input
删除文件
查看是否删除:
可以看见文件myfile.txt已经被删除
(7)在HDFS中,将文件从源路径移动到目的路径
目录下的Merge.txt文件已经被移除了,再查看file目录下:
可以看见Merge.txt文件从input文件夹移动到了file文件夹
(8)编程实现一个类“MyFSDataInputStream”,该类继承“org.apache.hadoop.fs
.FSDataInputStream”,要求如下:实现按行读取HDFS中指定文件的方法“readLine()”,如果读到文件末尾,则返回空,否则返回文件一行的文本
返回为空,说明读取到了文件末尾
(9)查看Java帮助手册或其它资料,用”java.net.URL”和“org.apache.hadoop.fs.
FsURLStreamHandlerFactory”编程完成输出HDFS中指定文件的文本到终端中
输出到终端的内容与文件内容一致,输出成功
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。