当前位置:   article > 正文

java 追加写入hdfs_java操作之HDFS-创建-删除目录-读写文件-追加写文件

java hdfs追加

Hadoop文件操作之HDFS,创建。删除目录,读写文件,追加写文件

package hadoop.hadoop_demo;

import java.io.InputStream;

import java.io.OutputStream;

import java.net.URI;

import org.apache.commons.io.IOUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

/**

*

org.apache.hadoop

hadoop-common

2.7.2

org.apache.hadoop

hadoop-hdfs

2.7.2

*

*/

public class App {

public static void main(String[] args) throws Exception {

System.out.println("Hello World!");

String hdfs = "hdfs://192.168.1.111:9000";

// mkdir(hdfs,"gaojs");

// touchFile(hdfs,"gjs/1.log");

rmdir(hdfs, "zookeeper_server.pid");

appendFile(hdfs, "gjs/1.log");

// readFile(hdfs);

}

/**

* 追加文件,新版本才支持

* @param hdfs

* @param fullNasme

* @throws Exception

*/

private static void appendFile(String hdfs, String fullNasme)

throws Exception {

FileSystem fileSystem = getFileSystem();

OutputStream out = fileSystem.append(new Path(hdfs + "/" + fullNasme));

out.write(("I am gaojs, who are you" + System.currentTimeMillis() + "\r\n")

.getBytes("UTF-8"));

out.flush();

out.close();

}

/**

* 取得FileSystem

* @return

* @throws Exception

*/

public static final FileSystem getFileSystem() throws Exception {

String hdfs = "hdfs://192.168.1.111:9000";

Configuration conf = new Configuration();

conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");

conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");

FileSystem fileSystem = FileSystem.get(URI.create(hdfs), conf);

System.out.println(fileSystem);

return fileSystem;

}

/**

* 读文件

* @param hdfs

* @throws Exception

*/

private static void readFile(String hdfs) throws Exception {

FileSystem fileSystem = getFileSystem();

InputStream in = fileSystem.open(new Path(hdfs + "/"

+ "zookeeper_server.pid"));

IOUtils.copy(in, System.out);

fileSystem.close();

}

/**

* 创建空文件

* @param hdfs

* @param fullNasme

* @throws Exception

*/

private static void touchFile(String hdfs, String fullNasme)

throws Exception {

FileSystem fileSystem = getFileSystem();

boolean res = fileSystem

.createNewFile(new Path(hdfs + "/" + fullNasme));

if (res) {

System.out.println("-------create File Success------" + fullNasme);

} else {

System.out.println("-------create File Fail------" + fullNasme);

}

fileSystem.close();

}

/**

* 删除文件或者目录

* @param hdfs

* @param fullNasme

* @throws Exception

*/

private static void rmdir(String hdfs, String fullNasme) throws Exception {

FileSystem fileSystem = getFileSystem();

boolean res = fileSystem.delete(new Path(hdfs + "/" + fullNasme));

if (res) {

System.out.println("------rmdir Success------" + fullNasme);

} else {

System.out.println("------rmdir Fail------" + fullNasme);

}

fileSystem.close();

}

/**

* 创建目录

* @param hdfs

* @param fullNasme

* @throws Exception

*/

private static void mkdir(String hdfs, String fullNasme) throws Exception {

FileSystem fileSystem = getFileSystem();

boolean res = fileSystem.mkdirs(new Path(hdfs + "/" + fullNasme));

if (res) {

System.out.println("-------mkdir Success------" + fullNasme);

} else {

System.out.println("-------mkdir Fail------" + fullNasme);

}

}

}

错误解决方案:

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_242718953_1, ugi=Administrator (auth:SIMPLE)]]

Exception in thread "main" java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[192.168.1.111:50010,DS-c7e4fa47-633d-4d8b-aa09-c50b1e6a411a,DISK]], original=[DatanodeInfoWithStorage[192.168.1.111:50010,DS-c7e4fa47-633d-4d8b-aa09-c50b1e6a411a,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.

at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:929)

at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:992)

at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1160)

at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:455)

解决方案:

conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");

conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/592777
推荐阅读
相关标签
  

闽ICP备14008679号