赞
踩
Hadoop文件操作之HDFS,创建。删除目录,读写文件,追加写文件
package hadoop.hadoop_demo;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
*
org.apache.hadoop
hadoop-common
2.7.2
org.apache.hadoop
hadoop-hdfs
2.7.2
*
*/
public class App {
public static void main(String[] args) throws Exception {
System.out.println("Hello World!");
String hdfs = "hdfs://192.168.1.111:9000";
// mkdir(hdfs,"gaojs");
// touchFile(hdfs,"gjs/1.log");
rmdir(hdfs, "zookeeper_server.pid");
appendFile(hdfs, "gjs/1.log");
// readFile(hdfs);
}
/**
* 追加文件,新版本才支持
* @param hdfs
* @param fullNasme
* @throws Exception
*/
private static void appendFile(String hdfs, String fullNasme)
throws Exception {
FileSystem fileSystem = getFileSystem();
OutputStream out = fileSystem.append(new Path(hdfs + "/" + fullNasme));
out.write(("I am gaojs, who are you" + System.currentTimeMillis() + "\r\n")
.getBytes("UTF-8"));
out.flush();
out.close();
}
/**
* 取得FileSystem
* @return
* @throws Exception
*/
public static final FileSystem getFileSystem() throws Exception {
String hdfs = "hdfs://192.168.1.111:9000";
Configuration conf = new Configuration();
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
FileSystem fileSystem = FileSystem.get(URI.create(hdfs), conf);
System.out.println(fileSystem);
return fileSystem;
}
/**
* 读文件
* @param hdfs
* @throws Exception
*/
private static void readFile(String hdfs) throws Exception {
FileSystem fileSystem = getFileSystem();
InputStream in = fileSystem.open(new Path(hdfs + "/"
+ "zookeeper_server.pid"));
IOUtils.copy(in, System.out);
fileSystem.close();
}
/**
* 创建空文件
* @param hdfs
* @param fullNasme
* @throws Exception
*/
private static void touchFile(String hdfs, String fullNasme)
throws Exception {
FileSystem fileSystem = getFileSystem();
boolean res = fileSystem
.createNewFile(new Path(hdfs + "/" + fullNasme));
if (res) {
System.out.println("-------create File Success------" + fullNasme);
} else {
System.out.println("-------create File Fail------" + fullNasme);
}
fileSystem.close();
}
/**
* 删除文件或者目录
* @param hdfs
* @param fullNasme
* @throws Exception
*/
private static void rmdir(String hdfs, String fullNasme) throws Exception {
FileSystem fileSystem = getFileSystem();
boolean res = fileSystem.delete(new Path(hdfs + "/" + fullNasme));
if (res) {
System.out.println("------rmdir Success------" + fullNasme);
} else {
System.out.println("------rmdir Fail------" + fullNasme);
}
fileSystem.close();
}
/**
* 创建目录
* @param hdfs
* @param fullNasme
* @throws Exception
*/
private static void mkdir(String hdfs, String fullNasme) throws Exception {
FileSystem fileSystem = getFileSystem();
boolean res = fileSystem.mkdirs(new Path(hdfs + "/" + fullNasme));
if (res) {
System.out.println("-------mkdir Success------" + fullNasme);
} else {
System.out.println("-------mkdir Fail------" + fullNasme);
}
}
}
错误解决方案:
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_242718953_1, ugi=Administrator (auth:SIMPLE)]]
Exception in thread "main" java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[192.168.1.111:50010,DS-c7e4fa47-633d-4d8b-aa09-c50b1e6a411a,DISK]], original=[DatanodeInfoWithStorage[192.168.1.111:50010,DS-c7e4fa47-633d-4d8b-aa09-c50b1e6a411a,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:929)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:992)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1160)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:455)
解决方案:
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。