赞
踩
1.pom文件依赖版本示例:
<!-- 版本配置 -->
<junit.version>4.12</junit.version>
<mysql-connector.version>5.1.46</mysql-connector.version>
<mybatis-spring-boot-starter.version>1.3.1</mybatis-spring-boot-starter.version>
<mybatis-plus.verison>2.1.9</mybatis-plus.verison>
<mybatis-plus-boot-starter.version>2.1.9</mybatis-plus-boot-starter.version>
<alibaba.druid.version>1.1.9</alibaba.druid.version>
<commons-lang3.version>3.5</commons-lang3.version>
<slf4j-api.version>1.7.25</slf4j-api.version>
<logback.version>1.2.3</logback.version>
<log4j.version>1.2.9</log4j.version>
<fastxml.jackson.version>2.8.10</fastxml.jackson.version>
<commons-codec.version>1.9</commons-codec.version>
<commons-io.version>2.5</commons-io.version>
<hibernate-validator.version>5.3.6.Final</hibernate-validator.version>
<asm.version>3.3.1</asm.version>
<cglib-nodep.version>3.2.5</cglib-nodep.version>
<commons-beanutils.version>1.9.3</commons-beanutils.version>
<fastjson.version>1.2.32</fastjson.version>
<springfox-swagger.version>2.8.0</springfox-swagger.version>
<spring-data-redis.version>1.8.3.RELEASE</spring-data-redis.version>
<shiro.version>1.4.0-RC2</shiro.version>
<poi.version>3.9</poi.version>
<mybatis.version>3.4.4</mybatis.version>
<velocity.version>2.0</velocity.version>
<velocity.engine.version>1.7</velocity.engine.version>
<freemarker.version>2.3.28</freemarker.version>
<tomcat-servlet-api.version>7.0.77</tomcat-servlet-api.version>
<jstl-api.version>1.2-rev-1</jstl-api.version>
<jstl-impl.version>1.2</jstl-impl.version>
<joda-time.version>2.9.3</joda-time.version>
<kaptcha.version>0.0.9</kaptcha.version>
2.SpringBoot整合Hadoop,在application.yml配置 ,hadoop集群的搭建请参考作者的其它文章。
spring:
hadoop:
config:
fs:
defaultFS: hdfs://192.168.2.213:8020 #你的hdfs的路径
fsshell:
enabled: true #开启fsshell支持
mvc:
view:
prefix: /templates/
suffix: .ftl
static-path-pattern: /static/**
datasource:
url: jdbc:mysql://localhost:3306/hadoop?useUnicode=true&characterEncoding=utf-8&useSSL=false #配置数据库为mysql
username: root
password: hwf123456
# 使用druid数据源
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
filters: stat
maxActive: 20
initialSize: 1
maxWait: 60000
minIdle: 1
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: select 'x'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxOpenPreparedStatements: 20
mybatis:
#MyBatis的配置
mapper-locations: classpath:mapping/*.xml
type-aliases-package: com.hdvon.domain
configuration:
location: classpath:mybatis-config.xml
map-underscore-to-camel-case: true
server:
port: 8082
3.这个时候就可以在service里面直接注入FsShell,去做一些简单的操作。
@Autowired
FsShell shell;
比如说,这里只举几个栗子:
(1)返回hdfs路径下的文件信息:
public List<HdfsFile> listPaths(String path) throws Exception{
Collection<FileStatus> fileStatuses = shell.lsr(path);
List<HdfsFile> fileList = new ArrayList<>();
for(FileStatus fs: fileStatuses){
boolean isDirectory = fs.isDirectory();
if (!isDirectory){
Path hdfsPath = fs.getPath();
String name = hdfsPath.getName();
String uri = hdfsPath.toUri().getPath();
String owner = fs.getOwner();
long blockSize = fs.getBlockSize();
HdfsFile file = new HdfsFile();
file.setFileOwner(owner);
file.setFileName(name);
file.setBlockSize(blockSize);
file.setFilePath(uri);
fileList.add(file);
}
}
return fileList;
}
(2)创建hdfs目录
public RestResult mkDir(String path){
try {
shell.mkdir(path);
} catch (Exception e){
e.printStackTrace();
return RestResult.ERROR("创建hdfs目录失败" , e);
}
return RestResult.SUCCESS("创建hdfs目录成功" , null);
}
(3)上传文件
public RestResult putFile(String srcPath , String destPath){
try {
shell.put(srcPath , destPath);
} catch (Exception e){
e.printStackTrace();
return RestResult.ERROR("上传文件失败" , e);
}
return RestResult.SUCCESS("上传文件成功" , null);
}
(4)下载文件
public RestResult getFile(String srcPath , String destPath){
try {
shell.get(srcPath , destPath);
} catch (Exception e){
e.printStackTrace();
return RestResult.ERROR("下载文件失败" , e);
}
return RestResult.SUCCESS("下载文件成功" , null);
}
4.除了用FsShell,另外一种也是很常用的,没错^_^,就是FileSystem,因为FileSystem需要一个
Configuration,所以我们在SpringBoot里面需要创建一个实例出来。
@org.springframework.context.annotation.Configuration
public class HdfsRepository {
@Bean
public Configuration getConfiguration(){
Configuration configuration = new Configuration();
return configuration;
}
}
这个时候我们就可以在service里面注入Configuration,然后获取FileSystem。
比如说:
(1)获取hdfs当前路径下不是目录的文件
public List<HdfsFile> getFiles(String path){
try {
FileSystem fs = FileSystem.get(configuration);
RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = fs.listFiles(new Path(path) , false);
List<HdfsFile> files = new ArrayList<>();
while (fileStatusRemoteIterator.hasNext()){
LocatedFileStatus fileStatus = fileStatusRemoteIterator.next();
Path path1 = fileStatus.getPath();
long blockSize = fileStatus.getBlockSize();
String name = path1.getName();
String uriPath = path1.toUri().getPath();
String owner = fileStatus.getOwner();
HdfsFile file = new HdfsFile();
file.setFilePath(uriPath);
file.setBlockSize(blockSize);
file.setFileName(name);
file.setFileOwner(owner);
files.add(file);
}
return files;
} catch (Exception e){
e.printStackTrace();
return null;
}
}
(2)查询当前路径下所有的目录
public List<HdfsFile> getDirectory(String path){
try{
FileSystem fs = FileSystem.get(configuration);
FileStatus[] fileStatuses = fs.listStatus(new Path(path));
List<HdfsFile> files = new ArrayList<>();
for (FileStatus fileStatus : fileStatuses){
if (fileStatus.isDirectory()){
Path path1 = fileStatus.getPath();
String name = path1.getName();
String uriPath = path1.toUri().getPath();
long blockSize = fileStatus.getBlockSize();
String owner = fileStatus.getOwner();
HdfsFile file = new HdfsFile();
file.setFilePath(uriPath);
file.setBlockSize(blockSize);
file.setFileName(name);
file.setFileOwner(owner);
files.add(file);
}
}
return files;
} catch (Exception e){
e.printStackTrace();
return null;
}
}
(4)上传文件
public Boolean uploadFile(String srcPath , String destPath){
try{
FileSystem fs = FileSystem.get(configuration);
fs.copyFromLocalFile(new Path(srcPath) , new Path(destPath));
} catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
(5)使用流上传文件
public Boolean uploadFile2(InputStream inStream , String hdfsPath){
FSDataOutputStream fos = null;
try{
FileSystem fs = FileSystem.get(configuration);
fos = fs.create(new Path(hdfsPath));
IOUtils.copyBytes(inStream , fos , 4078);
} catch (Exception e){
e.printStackTrace();
return false;
} finally {
IOUtils.closeStream(fos);
}
return true;
}
(6)使用流下载文件
public Boolean downLoadFile(String localPath , String hdfsPath){
FSDataInputStream fis = null;
FileOutputStream fos = null;
try{
FileSystem fs = FileSystem.get(configuration);
fis = fs.open(new Path(hdfsPath));
fos = new FileOutputStream(localPath);
IOUtils.copyBytes(fis ,fos , 4078);
} catch (Exception e){
e.printStackTrace();
return false;
} finally {
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
}
return true;
}
(7)写文件
public Boolean writeFile(String inpath , String outpath){
FSDataOutputStream fos = null;
FSDataInputStream fis = null;
try{
FileSystem fs = FileSystem.get(configuration);
fos = fs.create(new Path(outpath));
fis = fs.open(new Path(inpath));
IOUtils.copyBytes(fis , fos , 1024);
} catch (Exception e){
e.printStackTrace();
return false;
} finally {
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
}
return true;
}
(8)修改文件名
public boolean renameFile(String oldPath, String newPath){
try{
FileSystem fs = FileSystem.get(configuration);
boolean flag = fs.rename(new Path(oldPath) , new Path(newPath));
return flag;
} catch (Exception e){
e.printStackTrace();
return false;
}
(9)移动hdfs上面的文件
public boolean moveFile(String oldPath, String newPath){
FSDataInputStream fdis = null;
FSDataOutputStream fos = null;
try{
FileSystem fs = FileSystem.get(configuration);
fdis = fs.open(new Path(oldPath));
fos = fs.create(new Path(newPath));
IOUtils.copyBytes(fdis , fos , 4096);
fs.delete(new Path(oldPath) , true);
} catch (Exception e){
e.printStackTrace();
return false;
} finally {
IOUtils.closeStream(fos);
IOUtils.closeStream(fdis);
}
return true;
}
(10)hdfs移动目录,请小伙伴们注意咯:FileStatus只能读到当前一级文件或者目录,所以需要递归遍历才能读到所有的子文件
public boolean moveDir(String oldPath, String newPath){
FSDataInputStream fdis = null;
FSDataOutputStream fos = null;
try{
FileSystem fs = FileSystem.get(configuration);
Path pathdir = new Path(oldPath);
FileStatus[] fileStatuses = fs.listStatus(pathdir);
String basePath = newPath+"/";
//创建基础目录
fs.mkdirs(new Path(basePath));
for (FileStatus fileStatus : fileStatuses){
recuerFile(fdis , fos , fs, fileStatus , basePath);
}
fs.delete(new Path(oldPath) , true);
} catch (Exception e){
e.printStackTrace();
return false;
} finally {
IOUtils.closeStream(fos);
IOUtils.closeStream(fdis);
}
return true;
}
public void recuerFile(FSDataInputStream fdis, FSDataOutputStream fos , FileSystem fs , FileStatus fileStatus , String basePath) {
try{
boolean dir = fileStatus.isDirectory();
FileStatus[] fileStatuses = fs.listStatus(fileStatus.getPath());
Path path = fileStatus.getPath();
String fileName = path.getName();
if (dir){
fs.mkdirs(new Path(basePath+fileName));
basePath = basePath+"/"+fileName+"/"; //因为当前遍历到的是一个目录 , basePath又要改成当前目录的路径,之后的文件写入这个新的目录
for (FileStatus fileStatus1 : fileStatuses){
recuerFile(fdis , fos , fs, fileStatus1 , basePath);
}
} else {
fdis = fs.open(path);
fos = fs.create(new Path(basePath+fileName));
IOUtils.copyBytes(fdis , fos , 4096);
}
} catch (Exception e){
e.printStackTrace();
}
}
(11)hdfs移动目录 ,使用callback减少业务代码入侵我们的方法!!
1.首先创建一个回调接口Callback。里面就只有一个方法 void call(T obj);
2.移动目录的逻辑
public boolean moveDir0(String oldPath, String newPath){
try{
FileSystem fs = FileSystem.get(configuration);
Path pathdir = new Path(oldPath);
FileStatus[] fileStatuses = fs.listStatus(pathdir);
final String basePath = newPath+"/";
//创建基础目录
fs.mkdirs(new Path(basePath));
for (FileStatus fileStatus : fileStatuses){
//递归调用回调之后的业务
recuerFile0(fs, fileStatus, new Callback<FileStatus>() {
String common_path = basePath;
@Override
public void call(FileStatus fileStatus) {
boolean dir = fileStatus.isDirectory();
Path path = fileStatus.getPath();
String fileName = path.getName();
FSDataInputStream fis = null;
FSDataOutputStream fos = null;
try{
if (dir){
fs.mkdirs(new Path(common_path+path.toUri().getPath()));
} else {
fis = fs.open(path);
fos = fs.create(new Path(common_path+path.toUri().getPath()));
IOUtils.copyBytes(fis , fos , 4096);
}
} catch (Exception e){
e.printStackTrace();
} finally {
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
}
}
});
}
fs.delete(new Path(oldPath) , true);
} catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
3.递归方法改版,删掉业务代码,就很清楚了^_^
public void recuerFile0(FileSystem fs , FileStatus fileStatus , Callback callback) {
try{
boolean dir = fileStatus.isDirectory();
callback.call(fileStatus);
if (dir){
FileStatus[] fileStatuses = fs.listStatus(fileStatus.getPath());
for (FileStatus fileStatus1 : fileStatuses){
recuerFile0(fs, fileStatus1 ,callback);
}
}
} catch (Exception e){
e.printStackTrace();
}
}
(12)hdfs删除文件,如果是用hdfs的API的话,默认删除的文件不会放入到回收站,需要自己去实现这个功能!!!
public boolean deleteFile(String path){
try{
FileSystem fs = FileSystem.get(configuration);
boolean flag = fs.delete(new Path(path) , true);
return flag;
} catch (Exception e){
e.printStackTrace();
return false;
}
}
(13)删除文件,并且将文件放入到回收站
public boolean deleteFile2Trash(String path){
try{
Trash trash = new Trash(configuration);
boolean tflag = trash.moveToTrash(new Path(path));
return tflag;
} catch (Exception e){
e.printStackTrace();
return false;
}
}
当然整合的时候如果出现权限的问题,请参考作者的另外一篇博客:spring、springboot整合hadoop的时候出现的权限问题_太白酒仙的博客-CSDN博客_springboot整合hadoop
搭建hadoop集群请参考作者的:hadoop分布式环境搭建_太白酒仙的博客-CSDN博客
最后,欢迎热爱技术的小伙伴加入我们的聊天群qq:852278138,大家一起交流^_^
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。