赞
踩
1、使用java完成hdfs的相关操作
1、正确使用java对hdfs进行相关操作,解决以下问题
Windows电脑一台、VMware虚拟机、WindTerm终端模拟器、IntelliJ IDEA
1、代码
- //创建文件
- @Test
- public static void testMkdirFile() throws URISyntaxException, IOException, InterruptedException {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //创建一个文件夹
- fs.mkdirs(new Path("/hadoop/wenjian"));
- //关闭资源
- fs.close();
- }

2、结果
1、代码
- //删除文件
- @Test
- public static void testDeleteFile() throws Exception {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //删除一个文件,b:代表是否递归,true会删除指定文件夹及内容,false删除指定文件
- fs.delete(new Path("/hadoop/wenjian"), false);
- //关闭资源
- fs.close();
- }

2、结果
1、代码
- //列出目录下文件
- @Test
- public static void testListFile() throws IOException, URISyntaxException, InterruptedException {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //获取目录状态
- FileStatus[] listStatus = fs.listStatus(new Path("/"));
- for (FileStatus list : listStatus) {//增强for循环遍历listStatus
- System.out.println(list.getPath().getName());
- }
- RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
- //递归列出该目录下所有文件,不包括文件夹,后面的布尔值为是否递归
- while (listFiles.hasNext()) {//如果listfiles里还有东西
- LocatedFileStatus next = listFiles.next();//得到下一个并pop出listFiles
- System.out.println(next.getPath().getName());//输出
- }
- }
-
- //列出目录下文件信息
- @Test
- public static void testDetailFile() throws URISyntaxException, IOException, InterruptedException {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- // 获取迭代器对象("/"表示获取全部目录下的文件)
- RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
- while (listFiles.hasNext()) {
- LocatedFileStatus fileStatus = listFiles.next();
- System.out.println("文件权限:" + fileStatus.getPermission());
- System.out.println("文件所有者:" + fileStatus.getOwner());
- System.out.println("文件路径:" + fileStatus.getPath());
- System.out.println("文件内容长度:" + fileStatus.getLen());
- System.out.println("修改时间:" + fileStatus.getModificationTime());
- System.out.println("备份数:" + fileStatus.getReplication());
- System.out.println("文件块大小:" + fileStatus.getBlockSize());
- System.out.println("文件名:" + fileStatus.getPath().getName());
- // 获取该文件块的信息
- BlockLocation[] blockLocations = fileStatus.getBlockLocations();
- for (BlockLocation bl : blockLocations) {
- // 获取DataNodes的主机名
- String[] hosts = bl.getHosts();
- for (String host : hosts) {
- System.out.println(host);
- }
- }
- System.out.println();
- }
- }

2、结果
1、代码
- //上传文件
- @Test
- public static void testCopyFromLocalFile() throws Exception {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //上传一个文件
- fs.copyFromLocalFile(new Path("D:/hadoop.txt"), new Path("/hadoop/wenjian"));
- //关闭资源
- fs.close();
- }

2、结果
1、代码
- //下载文件
- @Test
- public static void testCopyToLocalFile() throws Exception {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //下载一个文件
- fs.copyToLocalFile(new Path("/hadoop/wenjian/hadoop.txt"), new Path("D:/test.txt"));
- //关闭资源
- fs.close();
- }

2、结果
1、代码
- //追加文件
- @Test
- public static void testAppendFile() throws URISyntaxException, IOException, InterruptedException {
- // 相关配置
- Configuration conf = new Configuration();//加载配置文件,将配置文件放在src下
- {
- conf.setBoolean("dfs.support.append", true);
- conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
- conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
- }
- FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100"), conf, "hadoop100");//获取文件系统实例
- ObjectMapper objectMapper = new ObjectMapper();
- FSDataOutputStream output = null;
- output = fs.append(new Path("/hadoop/wenjian/hadoop.txt"));
- // MessageHeader listContent = null;
- // System.out.println(listContent.toString());
- // output.write(objectMapper.writeValueAsString(listContent).getBytes("UTF-8"));
- output.write(("\n"+"hello hbase").getBytes("UTF-8"));
- // output.write("\n".getBytes("UTF-8"));//换行
- }

2、结果
运行读文件
1、代码
- //读文件
- @Test
- public static void testReadFile() throws Exception {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- // 创建路径对象(指向目录或文件)
- Path path = new Path("/hadoop/wenjian/hadoop.txt");
- // 创建文件系统数据字节输入流对象
- FSDataInputStream in = fs.open(path);
- // 创建缓冲字符输入流对象,提高读取效率(字节流-->字符流-->缓冲流)
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- // 定义行字符串
- String nextLine = "";
- // 通过循环读取缓冲字符输入流
- while ((nextLine = br.readLine()) != null) {
- // 在控制台输出读取的行内容
- System.out.println(nextLine);
- }
- // 关闭缓冲字符输入流
- br.close();
- // 关闭文件系统数据字节输入流
- in.close();
- // 关闭文件系统
- fs.close();
- }

2、结果
- package edu.sugon;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.*;
- import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
- import org.junit.Test;
- import java.io.*;
- import java.net.URI;
- import java.net.URISyntaxException;
-
- public class HDFSOpt {
- //创建文件
- @Test
- public static void testMkdirFile() throws URISyntaxException, IOException, InterruptedException {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //创建一个文件夹
- fs.mkdirs(new Path("/hadoop/wenjian"));
- //关闭资源
- fs.close();
- }
-
- //删除文件
- @Test
- public static void testDeleteFile() throws Exception {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //删除一个文件,b:代表是否递归,true会删除指定文件夹及内容,false删除指定文件
- fs.delete(new Path("/hadoop/wenjian"), false);
- //关闭资源
- fs.close();
- }
-
- //列出目录下文件
- @Test
- public static void testListFile() throws IOException, URISyntaxException, InterruptedException {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //获取目录状态
- FileStatus[] listStatus = fs.listStatus(new Path("/"));
- for (FileStatus list : listStatus) {//增强for循环遍历listStatus
- System.out.println(list.getPath().getName());
- }
- RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
- //递归列出该目录下所有文件,不包括文件夹,后面的布尔值为是否递归
- while (listFiles.hasNext()) {//如果listfiles里还有东西
- LocatedFileStatus next = listFiles.next();//得到下一个并pop出listFiles
- System.out.println(next.getPath().getName());//输出
- }
- }
-
- //列出目录下文件信息
- @Test
- public static void testDetailFile() throws URISyntaxException, IOException, InterruptedException {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- // 获取迭代器对象("/"表示获取全部目录下的文件)
- RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
- while (listFiles.hasNext()) {
- LocatedFileStatus fileStatus = listFiles.next();
- System.out.println("文件权限:" + fileStatus.getPermission());
- System.out.println("文件所有者:" + fileStatus.getOwner());
- System.out.println("文件路径:" + fileStatus.getPath());
- System.out.println("文件内容长度:" + fileStatus.getLen());
- System.out.println("修改时间:" + fileStatus.getModificationTime());
- System.out.println("备份数:" + fileStatus.getReplication());
- System.out.println("文件块大小:" + fileStatus.getBlockSize());
- System.out.println("文件名:" + fileStatus.getPath().getName());
- // 获取该文件块的信息
- BlockLocation[] blockLocations = fileStatus.getBlockLocations();
- for (BlockLocation bl : blockLocations) {
- // 获取DataNodes的主机名
- String[] hosts = bl.getHosts();
- for (String host : hosts) {
- System.out.println(host);
- }
- }
- System.out.println();
- }
- }
-
- //上传文件
- @Test
- public static void testCopyFromLocalFile() throws Exception {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //上传一个文件
- fs.copyFromLocalFile(new Path("D:/hadoop.txt"), new Path("/hadoop/wenjian"));
- //关闭资源
- fs.close();
- }
-
- //下载文件
- @Test
- public static void testCopyToLocalFile() throws Exception {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- //下载一个文件
- fs.copyToLocalFile(new Path("/hadoop/wenjian/hadoop.txt"), new Path("D:/test.txt"));
- //关闭资源
- fs.close();
- }
-
- //读文件
- @Test
- public static void testReadFile() throws Exception {
- //连接的集群地址
- URI uri = new URI("hdfs://hadoop100:8020");
- //创建一个配置信息
- Configuration configuration = new Configuration();
- //用户
- String user = "hadoop";
- //获取到了客户端对象
- FileSystem fs = FileSystem.get(uri, configuration, user);
- // 创建路径对象(指向目录或文件)
- Path path = new Path("/hadoop/wenjian/hadoop.txt");
- // 创建文件系统数据字节输入流对象
- FSDataInputStream in = fs.open(path);
- // 创建缓冲字符输入流对象,提高读取效率(字节流-->字符流-->缓冲流)
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- // 定义行字符串
- String nextLine = "";
- // 通过循环读取缓冲字符输入流
- while ((nextLine = br.readLine()) != null) {
- // 在控制台输出读取的行内容
- System.out.println(nextLine);
- }
- // 关闭缓冲字符输入流
- br.close();
- // 关闭文件系统数据字节输入流
- in.close();
- // 关闭文件系统
- fs.close();
- }
-
- //追加文件
- @Test
- public static void testAppendFile() throws URISyntaxException, IOException, InterruptedException {
- // 相关配置
- Configuration conf = new Configuration();//加载配置文件,将配置文件放在src下
- {
- conf.setBoolean("dfs.support.append", true);
- conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
- conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
- }
- FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100"), conf, "hadoop100");//获取文件系统实例
- ObjectMapper objectMapper = new ObjectMapper();
- FSDataOutputStream output = null;
- output = fs.append(new Path("/hadoop/wenjian/hadoop.txt"));
- // MessageHeader listContent = null;
- // System.out.println(listContent.toString());
- // output.write(objectMapper.writeValueAsString(listContent).getBytes("UTF-8"));
- output.write(("\n"+"hello hbase").getBytes("UTF-8"));
- // output.write("\n".getBytes("UTF-8"));//换行
- }
-
- public static void main( String[] args ) throws Exception {
- //创建文件
- // testMkdirFile();
- //删除文件
- // testDeleteFile();
- //列出目录下文件
- // testListFile();
- //列出目录下文件信息
- // testDetailFile();
- //上传文件
- // testCopyFromLocalFile();
- //下载文件
- // testCopyToLocalFile();
- //读文件
- testReadFile();
- //追加文件
- // testAppendFile();
- }
- }

该实验通过使用JavaApi对hdfs中的文件进行了创建、删除、上传、下载、读取、追加以及列出目录下的文件的基本操作,使我们对其有了进一步的了解。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。