熟悉HDFS操作常用的Java API。
Java IDE:Eclipse。
(一)编程实现以下功能,并利用Hadoop提供的Shell命令完成相同任务 :
read -p 'please input the file you want to print:' filename
hdfs dfs -test -e /$filename
if [ $? == 0 ]
hdfs dfs -cat /$filename
echo 'the file does not exist'
public static void cat(Configuration conf, String remoteFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); if(fs.exists(remotePath)){ FSDataInputStream in = fs.open(remotePath); BufferedReader d = new BufferedReader(new InputStreamReader(in)); String line = null; while ( (line = d.readLine()) != null ) { System.out.println(line); } d.close(); in.close(); } else System.out.println("the file does not exist"); fs.close(); }
read -p 'please input the filename you want to ls:' filename
hdfs dfs -test -e /$filename
if [ $? == 0 ]
hdfs dfs -ls -h /$filename
echo 'the file does not exist'
public static void ls(Configuration conf, String remoteFilePath) throws IOException{ FileSystem fs = FileSystem.get(conf); Path path = new Path(remoteFilePath); if(fs.exists(path)){ FileStatus[] fileStatuses = fs.listStatus(path); for(FileStatus s : fileStatuses){ System.out.println("路径: " + s.getPath().toString()); System.out.println("权限: " + s.getPermission().toString()); System.out.println("大小: " + s.getLen()); /* 返回的是时间戳,转化为时间日期格式 */ Long timeStamp = s.getModificationTime(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String date = format.format(timeStamp); System.out.println("时间: " + date); } } else System.out.println("the file does not exist"); fs.close(); }
read -p 'please input the filename you want to ls:' filename
hdfs dfs -test -e /$filename
if [ $? == 0 ]
hdfs dfs -ls -h -R /$filename
echo 'the file does not exist'
public static void lsdir(Configuration conf, String remoteFilePath) throws IOException{ FileSystem fs = FileSystem.get(conf); Path dirpath = new Path(remoteFilePath); if(fs.exists(dirpath)){ /* 递归获取目录下的所有文件 */ RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirpath, true); while(remoteIterator.hasNext()){ //FileStatus对象封装了文件的和目录的额元数据,包括文件长度、块大小、权限等信息 FileStatus s = remoteIterator.next(); /* 输出每个文件的信息 */ System.out.println("路径: " + s.getPath().toString()); System.out.println("权限: " + s.getPermission().toString()); System.out.println("大小: " + s.getLen()); /* 返回的是时间戳,转化为时间日期格式 */ Long timeStamp = s.getModificationTime(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String date = format.format(timeStamp); System.out.println("时间: " + date); } } else System.out.println("the file or path does not exist"); fs.close(); }
#!/bin/bash read -p 'please input the path:' path if $(hdfs dfs -test -d /$path) then echo 'the path exists' read -p 'please input the file:' filename if $(hdfs dfs -test -e /$path/$filename) then hdfs dfs -rm /$path/$filename fi hdfs dfs -touchz /$path/$filename else echo 'the path does not exist' hdfs dfs -mkdir /$path read -p 'please input the file:' filename if $(hdfs dfs -test -e /$path/$filename) then hdfs dfs -rm /$path/$filename fi hdfs dfs -touchz /$path/$filename fi
/** * 判断路径是否存在 */ public static boolean test(Configuration conf, String path) throws IOException { FileSystem fs = FileSystem.get(conf); return fs.exists(new Path(path)); } /** * 创建目录 */ public static boolean mkdir(Configuration conf, String remoteDir) throws IOException { FileSystem fs = FileSystem.get(conf); Path dirPath = new Path(remoteDir); boolean result = fs.mkdirs(dirPath); fs.close(); return result; } /** * 创建文件 */ public static void touchz(Configuration conf, String remoteFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); FSDataOutputStream outputStream = fs.create(remotePath); outputStream.close(); fs.close(); } /** * 删除文件 */ public static boolean rm(Configuration conf, String remoteFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); boolean result = fs.delete(remotePath, false); fs.close(); return result; } /** * 主函数 * @throws IOException */ public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://hadoop1:8020"); Scanner input = new Scanner(System.in); System.out.println("please input the file:"); String path = input.next(); if(HDFSApi.test(conf, path)){//目录存在,选择创建文件or删除文件 System.out.println("please input the file you want to touch now:"); String file = input.next(); path = path + file; if(HDFSApi.test(conf, path)){//文件存在 HDFSApi.rm(conf, path); } else{//文件不存在 HDFSApi.touchz(conf, path); } } else{//目录不存在,先创建目录 HDFSApi.mkdir(conf, path); System.out.println("please input the file you want to touch now:"); String file = input.next(); path = path + file; HDFSApi.touchz(conf, path); } input.close(); }
#!/bin/bash read -p 'please input the path:' path if $(hdfs dfs -test -d /$path) then echo 'the path exists' if $(hdfs dfs -rmdir /$path) then read -p 'Do you want to rmr?:y/n' order if [ $order=='y' ] then hdfs dfs -rmr /$path fi fi else echo 'the path does not exist' hdfs dfs -mkdir /$path fi
/** * 判断路径是否存在 */ public static boolean test(Configuration conf, String path) throws IOException { FileSystem fs = FileSystem.get(conf); return fs.exists(new Path(path)); } /** * 判断目录是否为空 * true: 空,false: 非空 */ public static boolean isDirEmpty(Configuration conf, String remoteDir) throws IOException { FileSystem fs = FileSystem.get(conf); Path dirPath = new Path(remoteDir); RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true); return !remoteIterator.hasNext(); } /** * 创建目录 */ public static boolean mkdir(Configuration conf, String remoteDir) throws IOException { FileSystem fs = FileSystem.get(conf); Path dirPath = new Path(remoteDir); boolean result = fs.mkdirs(dirPath); fs.close(); return result; } /** * 删除目录 */ public static boolean rmDir(Configuration conf, String remoteDir) throws IOException { FileSystem fs = FileSystem.get(conf); Path dirPath = new Path(remoteDir); /* 第二个参数表示是否递归删除所有文件 */ boolean result = fs.delete(dirPath, true); fs.close(); return result; } public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://hadoop1:8020"); Scanner input = new Scanner(System.in); System.out.println("please input the file:"); String path = input.next(); if(HDFSApi.test(conf, path)){//目录存在 if(HDFSApi.isDirEmpty(conf, path)){//目录为空 HDFSApi.rmDir(conf, path); } else{ System.out.println("Directory is not empty,do you want to rmr it? y/n");//是否强制删除 String order = input.next(); if(order.equals("y")){ System.out.println(order); HDFSApi.rmDir(conf, path); } } } else{//目录不存在,创建目录 HDFSApi.mkdir(conf, path); } input.close(); }
read -p 'please input the file which contains the Additional content:' file1
read -p 'please input the file you want to append:' file2
read -p 'please input the choice:Append to the end or beginning:' order
if [ $order=='end' ]
hdfs dfs -appendToFile $file1 $file2
hdfs dfs -cat $file2
hdfs dfs -appendToFile $file2 $file1
hdfs dfs -cat $file1 >> $file2
hdfs dfs -cat $file2
/** * 判断路径是否存在 */ public static boolean test(Configuration conf, String path) throws IOException { FileSystem fs = FileSystem.get(conf); return fs.exists(new Path(path)); } /** * 追加文本内容 */ public static void appendContentToFile(Configuration conf, String content, String remoteFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); /* 创建一个文件输出流,输出的内容将追加到文件末尾 */ FSDataOutputStream out = fs.append(remotePath); out.write(content.getBytes()); out.close(); fs.close(); } /** * 追加文件内容 */ public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); /* 创建一个文件读入流 */ FileInputStream in = new FileInputStream(localFilePath); /* 创建一个文件输出流,输出的内容将追加到文件末尾 */ FSDataOutputStream out = fs.append(remotePath); /* 读写文件内容 */ byte[] data = new byte[1024]; int read = -1; if(in!=null){ while ( (read = in.read(data)) > 0 ) { out.write(data, 0, read); } } out.close(); in.close(); fs.close(); } /** * 移动文件到本地 * 移动后,删除源文件 */ public static void moveToLocalFile(Configuration conf, String remoteFilePath, String localFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); Path localPath = new Path(localFilePath); fs.moveToLocalFile(remotePath, localPath); } /** * 创建文件 */ public static void touchz(Configuration conf, String remoteFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); FSDataOutputStream outputStream = fs.create(remotePath); outputStream.close(); fs.close(); } public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://hadoop1:8020"); Scanner input = new Scanner(System.in); System.out.println("要被追加的文件路径为:"); String path = input.next(); if(!HDFSApi.test(conf, path)){ System.out.println("路径不存在"); } else{ System.out.println("请输入要追加的内容:"); String content = input.next(); System.out.println("please input your choice:before or after:"); String choice = input.next(); if(choice.equals("after")){//追加在文件末尾 HDFSApi.appendContentToFile(conf, content, path); } else{ /* 没有相应的 api 可以直接操作,因此先把文件移动到本地*/ /*创建一个新的 HDFS,再按顺序追加内容 */ String localTmpPath = "/user/hadoop/tmp.txt"; // 移动到本地 HDFSApi.moveToLocalFile(conf, path, localTmpPath); // 创建一个新文件 HDFSApi.touchz(conf, path); // 先写入新内容 HDFSApi.appendContentToFile(conf, content,path); // 再写入原来内容 HDFSApi.appendToFile(conf, localTmpPath, path); System.out.println("已追加内容到文件开头: " + path); } } }
read -p 'please input the file you want to print:' filename
if $(hdfs dfs -test -e /$filename)
hdfs dfs -rm /$filename
echo 'the file does not exist'
/** * 删除文件 */ public static boolean rm(Configuration conf, String remoteFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); boolean result = fs.delete(remotePath, false); fs.close(); return result; } /** * 判断路径是否存在 */ public static boolean test(Configuration conf, String path) throws IOException { FileSystem fs = FileSystem.get(conf); return fs.exists(new Path(path)); } /** * 主函数 */ public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://hadoop1:8020"); Scanner input = new Scanner(System.in); System.out.println("要删除的文件路径为:"); String path = input.next(); if(HDFSApi.test(conf, path)){ HDFSApi.rm(conf, path); System.out.println("删除成功"); } else{ System.out.println("文件不存在"); } }
read -p 'please input the file you want to mv:' file1
read -p 'please input the file you want to mv to:' file2
hdfs dfs -mv /$file1 /$file2
/** * 移动文件 */ public static boolean mv(Configuration conf, String remoteFilePath, String remoteToFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(remoteFilePath); Path dstPath = new Path(remoteToFilePath); boolean result = fs.rename(srcPath, dstPath); fs.close(); return result; } /** * 主函数 * @throws IOException */ public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://hadoop1:8020"); Scanner input = new Scanner(System.in); System.out.println("要移动的文件路径为:"); String file1 = input.next(); System.out.println("移动到的路径为:"); String file2 = input.next(); if(HDFSApi.mv(conf, file1, file2)){ System.out.println("移动成功"); } else{ System.out.println("移动失败"); } }
package HDFSApi; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.*; public class MyFSDataInputStream extends FSDataInputStream { public MyFSDataInputStream(InputStream in) { super(in); } /** * 实现按行读取 * 每次读入一个字符,遇到"\n"结束,返回一行内容 */ public static String readline(BufferedReader br) throws IOException { char[] data = new char[1024]; int read = -1; int off = 0; // 循环执行时,br 每次会从上一次读取结束的位置继续读取 //因此该函数里,off 每次都从 0 开始 while ( (read = br.read(data, off, 1)) != -1 ) { if (String.valueOf(data[off]).equals("\n") ) { off += 1; break; } off += 1; } if (off > 0) { return String.valueOf(data); } else { return null; } } /** * 读取文件内容 */ public static void cat(Configuration conf, String remoteFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path remotePath = new Path(remoteFilePath); FSDataInputStream in = fs.open(remotePath); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = null; while ( (line = MyFSDataInputStream.readline(br)) != null ) { System.out.println(line); } br.close(); in.close(); fs.close(); } /** * 主函数 */ public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://hadoop1:8020"); String path = "/user/hadoop/text.txt"; // HDFS 路径 try { MyFSDataInputStream.cat(conf, path); } catch (Exception e) { e.printStackTrace(); } } }
package HdfsApi; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import java.io.*; import java.net.URL; public class HDFSApi { static{ URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } /** * 主函数 * @throws IOException */ public static void main(String[] args) throws IOException { String remoteFilePath = "hdfs://hadoop1:8020/output/t.sh"; // HDFS 文件,这里可以自己定义 InputStream in = null; try{ /* 通过 URL 对象打开数据流,从中读取数据 */ in = new URL(remoteFilePath).openStream(); IOUtils.copyBytes(in,System.out,4096,false); } finally{ IOUtils.closeStream(in); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。