DocCloud项目需求
项目背景:
在一些大型企事业单位,工作中存在各种各样的工作文档,技术文档,规范等等。这些文档以word,xls,ppt,wps,pdf,txt存在。在此项目之前,文档的分享主要靠单位内部人员的互相发送。没有一个统一的平台对企业现存的各种文档进行统一管理。DocCloud项目提供了统一的文档管理平台。用户可以将文档上传至平台,所有其他用户可以在线查看此文档。同时满足搜索文档,分享,收藏等等一系列需求。在实践中,有百度文库,doc88,豆丁等公网项目。但是没有一个专门为企业用户服务的一个文档管理平台。
项目需求:
1.文档的统一存储
2.文档的检索
3.文档的在线预览
4.文档分享
5.文档推荐
6.文档上传下载
7.用户的注册,登录
8.文档权限管理
项目架构:
HDFS+LibreOffice6.0+solr+nginx+flume+hive+springboot+jpa+js+html+css
文档存储: HDFS
文件存储:1.本地(linux)-web服务器 优缺点:内存小,但是存储方便
2.ftp服务器(搭建一个存储文件的集群)优缺点:文档存储内存够,但是不能容错
3.hdfs (hadoop集群)优缺点:可扩展、容错、分布式存储
文档格式转换: LibreOffice6.0
因为存储的内容不是纯文本,就是传统的io流不能用、需要变成纯文本文件(txt)
doc、docx、ppt---→html---→txt
使用LibreOffice6.0,不适用word的原因:1.没有接口(不开源)2.不能再linux上运行
进程间通信:hadoop ipc
全文检索: solr
日志记录服务器:ngnix
web日志采集:flume
日志分析:hive
webMvc:springboot
持久层框架:jpa
单元测试:junit4
前端:css+html+js+jquery+bootstrap
版本管理:svn
依赖管理:maven
开发环境:idea
部署环境:linux
数据库:mysql
项目具体设计:
1.文档的上传下载
a.用户在前端点击上传按钮
b.在本地选择上传文档
c.开始上传
b.服务端校验文件后缀是否符合文档格式。
允许格式:doc,docx,ppt,pptx,xls,xlsx,pdf,txt
目的:避免上传不能转码的文档如:exe,zip,….
e.校验文档大小,允许128兆以下的文档上传。
128M:为了使文档在hdfs是一个块的形式保存。
f.计算文档的md5值,判断文档是否在文库中已经存在,如果存在,告知用户已经存在。
g.不存在,则上传至hdfs,同时数据库中保存用户上传文档信息。
数据保存在hdfs上,元数据保存在数据库mysql
2.上传成功以后需要提交文档转换任务(主要功能如下)
1>转换成html
2>转换成pdf提取缩略图,页数
3>提取文本 建立索引
以下代码没有涉及前端、只是后台测试(使用Postman测试),部分参数都没有从session中获取,都是随机生成的
没有软件的附上资源(下载双击安装就可以):链接:https://pan.baidu.com/s/1SibrDOB4GwkX4L0iw3nYTA
提取码:bisn
一、功能一:上传文件/2018.10.29
日志配置:
在类名上添加注解
@Slf4j
直接在类中使用log记录日志
文件上传:
关键注解:
@RequestParam("file") MultipartFile file
获取上传文件名:
file.getOriginalFilename()
创建一个java项目DocCloud
在DocCloud中创建model---->doccloudweb(模块名要小写)
在module中选择Spring Initializr
在创建module时,选择pom中的依赖如下,选完之后一路下一步
core下选择DevTools、LomBok
web下选择web
sql下选择JPA、mysql
NoSQL下选solr
<dependencies>
<!--数据相关依赖-持久层-->
<!--<dependency>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-starter-data-jpa</artifactId>-->
<!--</dependency>-->
<!--全文检索-->
<!--<dependency>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-starter-data-solr</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--热部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<!--数据库连接-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
</dependencies>
<!--编译、打jar包-->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
以上是创建项目选择的依赖,请添加以下外加依赖
<!--上传文件到hdfs上-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
数据库创建Doc_cloud
Doc:记录文件属性
使用jpa---->java+persistence+api
JPA是Java Persistence API的简称,中文名Java持久层API,是JDK 5.0注解或XML描述对象-关系表的映射关系,并将运行期的实体对象持久化到数据库中
1.配置数据源application.properties
#数据源配置
spring.datasource.name=root
spring.datasource.password=123
#true:表示展示sql语句
spring.jpa.show-sql=true
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/doc_cloud
2..上传文件到hdfs上用到的core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master2:9000</value>
</property>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
</property>
</configuration>
3.编写controller层-- DocController
import com.zhiyou100.doccloudweb.service.DocService;
import com.zhiyou100.doccloudweb.util.HdfsUtil;
import com.zhiyou100.doccloudweb.util.MD5Util;
import com.zhiyou100.doccloudweb.entity.Doc;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
import java.util.Random;
@Controller //表示是controller层--业务层
@RequestMapping("/doc")
@Slf4j
public class DocController {
@Autowired
private DocService docService;
//定义合法的文件后缀类型
public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};
//定义文件最大大小
public static final int DOC_MAX_SIZE = 128*1024*1024;
//定义文件保存到hdfs上的根目录
public static final String HOME="hdfs://192.168.228.13:9000/doccloud";
@RequestMapping("/upload")
@ResponseBody
public String upload(@RequestParam("file") MultipartFile file){
//判断是否是文件
if (file.isEmpty()){
return "file is empty";
}
//获取文件名
String filename = file.getOriginalFilename();
//以点分割-获取文件后缀
String[] strings = filename.split("\\.");
if (strings.length==1){
return "file does not has suffix";
}
String suffix = strings[1];
log.info("doc suffix is {}",suffix);
//1.判断文件后缀是否合法
boolean flag = isSuffixLegal(suffix);
if (!flag){
return "file is illegal";
}
try {
//2.判断文件大小是否合法
byte[] bytes = file.getBytes();
log.info("file size is {}",bytes.length);
if (bytes.length>DOC_MAX_SIZE){
return "file is large,file Max size:"+DOC_MAX_SIZE;
}
//3.计算文档的MD5值
String md5 = getMD5(bytes);
log.info("file is md5 {} ",md5);
//用户上传文件,保存到数据库
//1.校验数据库中的md5值,判断数据库中是否存在
Optional<Doc> doc = docService.findByMd5(md5);
if (doc.isPresent()){
//2.如果存在,更新
// 2.1获取文件对象
Doc docEntity = doc.get();
//2.2设置文件更新的人
docEntity.setUserId(new Random().nextInt());
//2.3保存到数据库
docService.save(docEntity);
}else {
//3.如果不存在,将文件元数据保存到数据库,将数据保存到hdfs
//3.1保存数据到hdfs
//3.1.1生成文件保存路径:HOME+当前时间
String date = getDate();
String dst = HOME+"/"+date+"/"+file.getOriginalFilename()+"/";
log.info("file dst {}",dst);
//3.1.2上传文件
HdfsUtil.upload(bytes,file.getOriginalFilename(),dst);
//3.2将元数据保存到数据库
//3.2.1创建一个文件对象
Doc docEntity = new Doc();
//3.2.2设置作者
docEntity.setUserId(new Random().nextInt());
//3.2.3设置备注
docEntity.setDocComment("hadoop");
//3.2.4设置文件路径
docEntity.setDocDir(dst);
//3.2.5设置文件名
docEntity.setDocName(filename);
//3.2.6设置文件大小
docEntity.setDocSize(bytes.length);
//3.2.7设置文件权限
docEntity.setDocPermission("1");
//3.2.8设置文件类型(后缀)
docEntity.setDocType(suffix);
//3.2.9设置文件状态
docEntity.setDocStatus("upload");
//3.2.10设置文件的md5值--保证文件的唯一性
docEntity.setMd5(md5);
//3.2.11设置文件创作时间
docEntity.setDocCreateTime(new Date());
//3.2.12保存元数据
docService.save(docEntity);
}
} catch (IOException e) {
e.printStackTrace();
}
return "upload success";
}
/**
* 获取当前是时间,用于文件的保存路径
* @return
*/
private String getDate() {
Date date = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
return simpleDateFormat.format(date);
}
/**
* 计算字节数组的MD5值
* @param bytes
* @return
*/
private String getMD5(byte[] bytes) {
return MD5Util.getMD5String(bytes);
}
/**
* 判断文件后缀是否合法
* @param suffix
* @return
*/
private boolean isSuffixLegal(String suffix) {
for (String docsuffix :
DOC_SUFFIXS) {
if (suffix.equals(docsuffix)){
return true;
}
}
return false;
}
}
4.编写业务层service层-- DocService
import com.zhiyou100.doccloudweb.dao.DocRepository;
import com.zhiyou100.doccloudweb.entity.Doc;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
public class DocService {
@Autowired
private DocRepository docRepository;
//通过id获取文件对象
public Optional<Doc> findById(int id) {
return docRepository.findById(id);
}
//通过MD5获取文件对象
public Optional<Doc> findByMd5(String md5) {
return docRepository.findByMd5(md5);
}
//保存文件对象到数据库
public void save(Doc docEntity) {
docRepository.save(docEntity);
}
}
//定义合法的文件后缀类型
public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};
//定义文件最大大小
public static final int DOC_MAX_SIZE = 128*1024*1024;
@RequestMapping("/doclist")
@ResponseBody
Doc doList(){
Optional<Doc> id = docService.findById(1);
return null;
}
5.dao层—持久层--DocRepository
import com.zhiyou100.doccloudweb.entity.Doc;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
//Doc:表示定义的实体类,Integer:表示主键类型
public interface DocRepository extends JpaRepository<Doc,Integer> {
//利用反射机制自动识别
Optional<Doc> findByMd5(String md5);
}
6.实体层—Doc
import lombok.Data;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.persistence.*;
import java.util.Date;
/**
* 文件属性
*/
@Entity
@Table(name = "doc") //映射到数据库中的表
@Data //get/set
public class Doc {
@Id //主键
//告诉框架id生成策略(怎么生成)GenerationType.IDENTITY:表示自动生成
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
@Column(name = "md5")//如果数据库字段与entity中字段名一样,则不用加此注解
private String md5;
@Column(name = "doc_name")
private String docName;
@Column(name = "doc_type")
private String docType;
@Column(name = "doc_status")
private String docStatus;
@Column(name = "doc_size")
private int docSize;
@Column(name = "doc_dir")
private String docDir;
@Column(name = "user_id")
private int userId;
@Column(name = "doc_create_time")
private Date docCreateTime;
@Column(name = "doc_comment")
private String docComment;
@Column(name = "doc_permission")
private String docPermission;
}
7.工具类
HdfsUtil
- import com.google.common.io.Resources;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
-
- import java.io.IOException;
-
- /*
- *@ClassName:HdfsUtil
- @Description:TODO
- @Author:
- @Date:2018/10/29 17:17
- @Version:v1.0
- */
- public class HdfsUtil {
- //文档上传工具类
- public static void upload(byte[] src, String docName, String dst) throws IOException {
- //加载配置文件
- Configuration coreSiteConf = new Configuration();
- coreSiteConf.addResource(Resources.getResource("core-site.xml"));
- //获取文件系统客户端对象
- FileSystem fileSystem = FileSystem.get(coreSiteConf);
-
- FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(dst + "/" + docName));
-
- fsDataOutputStream.write(src);
- fsDataOutputStream.close();
- fileSystem.close();
- }
- }
MD5Util
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.nio.MappedByteBuffer;
- import java.nio.channels.FileChannel;
- import java.security.MessageDigest;
- import java.security.NoSuchAlgorithmException;
-
- public class MD5Util {
- protected static char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
- protected static MessageDigest messagedigest = null;
-
- /**
- * MessageDigest初始化
- *
- * @author
- */
- static {
- try {
- messagedigest = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- System.err.println("MD5FileUtil messagedigest初始化失败");
- e.printStackTrace();
- }
- }
-
- /**
- * 对文件进行MD5加密
- *
- * @author
- */
- public static String getFileMD5String(File file) throws IOException {
- FileInputStream in = new FileInputStream(file);
- FileChannel ch = in.getChannel();
- MappedByteBuffer byteBuffer = ch.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
- messagedigest.update(byteBuffer);
- return bufferToHex(messagedigest.digest());
- }
-
- /**
- * 对字符串进行MD5加密
- *
- * @author
- */
- public static String getMD5String(String s) {
- return getMD5String(s.getBytes());
- }
-
- /**
- * 对byte类型的数组进行MD5加密
- *
- * @author
- */
- public static String getMD5String(byte[] bytes) {
- messagedigest.update(bytes);
- return bufferToHex(messagedigest.digest());
- }
-
- private static String bufferToHex(byte bytes[]) {
- return bufferToHex(bytes, 0, bytes.length);
- }
-
- private static String bufferToHex(byte bytes[], int m, int n) {
- StringBuffer stringbuffer = new StringBuffer(2 * n);
- int k = m + n;
- for (int l = m; l < k; l++) {
- char c0 = hexDigits[(bytes[l] & 0xf0) >> 4];
- char c1 = hexDigits[bytes[l] & 0xf];
- stringbuffer.append(c0);
- stringbuffer.append(c1);
- }
- return stringbuffer.toString();
- }
- }
二、功能:文档转换
上传成功以后需要提交文档转换任务(主要功能如下)
1>转换成html
2>转换成pdf提取缩略图,页数
3>提取文本 建立索引
1.定义一个docjob对象,用于封装任务信息
2.实现writable接口。因为要通过hadoop ipc序列化实现文档转换守护进程
该进程的作用是完成存放在本节点文档的转换,索引的任务。
1.文档转换成htm!通过runtime.exec执行命令来实现
2.通过hadoop ipc来接受任务
hadoop ipc
hadoop ipc是一套hadoop自带的成熟的rpc框架,性能高,稳定性性强。
server:
a.服务端定义接口b.定义按口的实现类
c用hadoop ipc暴露服务
client;
通过rpc.geproxy来调用服务岗的接口。
下面创建的是服务端的代码:
1.新建模块--docservicedeamon文件转换守护进程
2.pom文件中的依赖
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.5</version>
- </dependency>
- <!--ipc通信模块-->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.5</version>
- </dependency>
- <!--注解、-->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.16.20</version>
- </dependency>
- <!--berklydb数据库依赖-->
- <!-- https://mvnrepository.com/artifact/com.sleepycat/je -->
- <dependency>
- <groupId>com.sleepycat</groupId>
- <artifactId>je</artifactId>
- <version>5.0.73</version>
- </dependency>
- <!--hdfs文件上传与下载-->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>2.7.5</version>
- </dependency>
3.配置core-site.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <configuration>
- <property>
- <name>fs.defaultFS</name>
- <value>hdfs://master2:9000</value>
- </property>
- <property>
- <name>fs.hdfs.impl</name>
- <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
- </property>
- </configuration>
4.类一---DocJob
- import lombok.Data;
- import org.apache.hadoop.io.Writable;
-
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.io.Serializable;
-
- /**
- *此方法用于封装任务信息
- */
- @Data
- public class DocJob implements Writable,Serializable {
- private static final long serialVersionUID = 12345678L;
- //任务id
- private int id;
- //任务名
- private String name;
- //任务类型
- private DocJobType jobType;
- //提交者
- private int userId;
- //提交时间
- private long submitTime;
- //完成时间
- private long finishTime;
- //任务状态
- private JobStatus jobStatus;
- //任务重试次数
- private int retryTime;
- //文档输入路径
- private String input;
- //任务输出路径
- private String output;
- //文件名
- private String fileName;
- public void write(DataOutput out) throws IOException {
- out.writeInt(id);
- out.writeUTF(name);
- out.writeUTF(jobType.name());
- out.writeInt(userId);
- out.writeLong(finishTime);
- out.writeLong(submitTime);
- out.writeUTF(jobStatus.name());
- out.writeInt(retryTime);
- out.writeUTF(input);
- out.writeUTF(output);
- out.writeUTF(fileName);
- }
-
- public void readFields(DataInput in) throws IOException {
- id= in.readInt();
- name=in.readUTF();
- jobType=DocJobType.valueOf(in.readUTF());
- userId=in.readInt();
- finishTime=in.readLong();
- submitTime=in.readLong();
- jobStatus=JobStatus.valueOf(in.readUTF());
- retryTime=in.readInt();
- input=in.readUTF();
- output=in.readUTF();
- fileName=in.readUTF();
- }
- }
5.类二、DocJobType
- /**
- * 项目的类型:文档转换、定义索引...
- */
- public enum DocJobType {
- DOC_JOB_CONVERT,DOC_JOB_CREATE_INDEX,DOC_JOB_UPDATE_INDEX
- }
6.类三、JobStatus
- /**
- * 文档状态:准备、提交、运行、失败、完成
- */
- public enum JobStatus {
- PREPARE,SUBMIT,RUNNING,FAILED,SUCCEED,
- }
7类四、.JobDaemonService
- /**
- * 服务端
- * 1.定义接口继承VersionedProtocol
- */
- public interface JobDaemonService extends VersionedProtocol {
- //定义通信间的暗号
- long versionID=1L;
- //定义提交方法
- void submitDocJob(DocJob job);
-
- }
8.类五、JobDaemonServiceImpl
- import com.zhiyou100.doccloud.utils.BdbPersistentQueue;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.hadoop.ipc.ProtocolSignature;
-
-
- import java.io.File;
- import java.io.IOException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- /**
- * 2.定义接口的实现类
- * 实现Runnable接口:是为了使用多线程处理
- */
- @Slf4j
- public class JobDaemonServiceImpl implements JobDaemonService,Runnable{
- //定义将hdfs下载到本地的目录的根路径
- private static final String WORK_DIR="/tmp/docjobdaemon/";
- //定义持久化对象
- public BdbPersistentQueue<DocJob> queue;
- //定义线程池--多线程并行处理
- private ExecutorService pool = Executors.newFixedThreadPool(4);
- //定义一个标准-让线程运行
- private boolean flag = true;
-
- //构造方法:用于创建berkly数据库目录,并初始化持久化队列
- public JobDaemonServiceImpl(){
- //创建工作目录--本地保存路径
- File workDir = new File(WORK_DIR + "/" + "bdb/");
- if (!workDir.exists()){
- //如果不存在将创建
- workDir.mkdirs();
- System.out.println(workDir.getAbsolutePath());
- }
- //初始化持久化队列
- queue = new BdbPersistentQueue<DocJob>(WORK_DIR+"/"+"bdb/", "docjob", DocJob.class);
- }
-
- public void submitDocJob(DocJob job) {
- System.out.println(job);
- //将任务保存在序列化队列中,1.保证任务不丢失 2.并发控制,内存溢出
- log.info("receive job {}",job);
- queue.offer(job);
- }
-
- public long getProtocolVersion(String s, long l) throws IOException {
- return versionID;
- }
-
- public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
- return null;
- }
-
- @Override
- public void run() {
- while (flag){
- //将任务从序列化队列中取出任务,poll:每取出一个就从磁盘中移除一个
- DocJob docJob = queue.poll();
- //判断docjob中否为空
- if (docJob==null){
- //为空,等待5000毫秒
- try {
- Thread.sleep(5000);
- System.out.println("waiting for docjob");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }else {
- pool.submit(new DocJobHandler(docJob));
- }
- }
- }
- }
9.类六、Main
- import com.zhiyou100.doccloud.job.JobDaemonService;
- import com.zhiyou100.doccloud.job.JobDaemonServiceImpl;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.ipc.RPC;
-
- import java.io.IOException;
-
- /**
- * 守护进程--项目的入口类
- * 3.服务端:暴露端口
- */
- public class Main {
- public static void main(String[] args) throws IOException {
- //创建服务端接口实现类对象
- JobDaemonServiceImpl instance = new JobDaemonServiceImpl();
- //开启线程
- new Thread(instance).start();
-
- // 创建一个RPC builder
- RPC.Builder builder = new RPC.Builder(new Configuration());
-
- //指定RPC Server的参数
- builder.setBindAddress("localhost");
- builder.setPort(7788);
-
- //将自己的程序部署到server上
- builder.setProtocol(JobDaemonService.class);
- builder.setInstance(instance);
-
- //创建Server
- RPC.Server server = builder.build();
-
- //启动服务
- server.start();
- }
- }
10.类七--DocJobHandler
- import com.zhiyou100.doccloud.utils.HdfsUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.io.IOUtils;
-
- import java.io.File;
- import java.io.IOException;
- import java.util.UUID;
-
- @Slf4j
- public class DocJobHandler implements Runnable {
- private DocJob docJob;
-
- public DocJobHandler(DocJob docJob) {
- this.docJob = docJob;
- log.info("start to deal job {}",docJob);
- }
-
- /**
- *将文件冲hdfs上下载到本地,再将文件格式转化成HTML,最终上传到hdfs上
- */
- @Override
- public void run() {
- //1.将hdfs上的文件下载到本地
- //1.1获取文件的下载路径(在hdfs上的位置)
- String input = docJob.getInput();
- //1.2创建目标路径(下载到本地的路径)
- String tmpWorkDirPath = "/tmp/docjobdaemon/" + UUID.randomUUID().toString() + "/";
- File tmpWorkDir = new File(tmpWorkDirPath);
- tmpWorkDir.mkdirs();
- System.out.println("tmpWorkDirPath: "+tmpWorkDirPath);
- //1.3下载文件到临时目录
- try {
- HdfsUtil.copyToLocal(input,tmpWorkDirPath);
- log.info("download file to {}",tmpWorkDirPath);
- //step1:将下载到本地的文件格式转化成HTML
- String command = "D:\\soft\\LibreOffice_6.0.6\\program\\soffice --headless --invisible --convert-to html " + docJob.getFileName();
- Process process = Runtime.getRuntime().exec(command, null, tmpWorkDir);
- //结果信息
- System.out.println(IOUtils.toString(process.getInputStream()));
- //错误信息
- System.out.println(IOUtils.toString(process.getErrorStream()));
- //step2 转换成pdf
- //step3 提取页码
- //step4 提取首页缩略图
- //step5 利用solr建立索引
- //step6 上传结果
- //step7 清理临时目录
- //step8 任务成功回调
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
- }
工具类、
将hdfs上的文件下载到本地--HdfsUtil
import com.google.common.io.Resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
public class HdfsUtil {
public static final String HOME="hdfs://192.168.228.13:9000/";
//文档上传工具类
public static void upload(byte[] src, String docName, String dst) throws IOException {
//加载配置文件
Configuration coreSiteConf = new Configuration();
coreSiteConf.addResource(Resources.getResource("core-site.xml"));
//获取文件系统客户端对象
FileSystem fileSystem = FileSystem.get(coreSiteConf);
FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(dst + "/" + docName));
fsDataOutputStream.write(src);
fsDataOutputStream.close();
fileSystem.close();
}
/**
* 将集群的问价下载到本地
* @param dst
* @param localPath
* @throws IOException
*/
public static void copyToLocal(String dst,String localPath) throws IOException {
Configuration conf = new Configuration();
conf.addResource(Resources.getResource("core-site.xml"));
FileSystem fs = FileSystem.get(URI.create(dst),conf);
fs.copyToLocalFile(new Path(dst),new Path(localPath));
fs.close();
}
}
持久化队列,基于BDB实现--BdbPersistentQueue&&BdbEnvironment
- import java.io.File;
- import java.io.IOException;
- import java.io.Serializable;
- import java.util.AbstractQueue;
- import java.util.Iterator;
- import java.util.concurrent.atomic.AtomicLong;
-
- import org.apache.commons.io.FileUtils;
-
- import com.sleepycat.bind.EntryBinding;
- import com.sleepycat.bind.serial.SerialBinding;
- import com.sleepycat.bind.serial.StoredClassCatalog;
- import com.sleepycat.bind.tuple.TupleBinding;
- import com.sleepycat.collections.StoredMap;
- import com.sleepycat.collections.StoredSortedMap;
- import com.sleepycat.je.Database;
- import com.sleepycat.je.DatabaseConfig;
- import com.sleepycat.je.DatabaseException;
- import com.sleepycat.je.DatabaseExistsException;
- import com.sleepycat.je.DatabaseNotFoundException;
- import com.sleepycat.je.EnvironmentConfig;
- /**
- * 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭
- * 相比一般的内存Queue,插入和获取值需要多消耗一定的时间
- * 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可,
- * 其他如remove,addAll,AbstractQueue会基于这几个方法去实现
- *
- * @contributor
- * @param <E>
- */
- public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements
- Serializable {
- private static final long serialVersionUID = 3427799316155220967L;
- private transient BdbEnvironment dbEnv; // 数据库环境,无需序列化
- private transient Database queueDb; // 数据库,用于保存值,使得支持队列持久化,无需序列化
- private transient StoredMap<Long,E> queueMap; // 持久化Map,Key为指针位置,Value为值,无需序列化
- private transient String dbDir; // 数据库所在目录
- private transient String dbName; // 数据库名字
- //AtomicLong:元子类型,线程安全
- //i++线程不安全
- private AtomicLong headIndex; // 头部指针
- private AtomicLong tailIndex; // 尾部指针
- private transient E peekItem=null; // 当前获取的值
-
- /**
- * 构造函数,传入BDB数据库
- *
- * @param db
- * @param valueClass
- * @param classCatalog
- */
- public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){
- this.queueDb=db;
- this.dbName=db.getDatabaseName();
- headIndex=new AtomicLong(0);
- tailIndex=new AtomicLong(0);
- bindDatabase(queueDb,valueClass,classCatalog);
- }
- /**
- * 构造函数,传入BDB数据库位置和名字,自己创建数据库
- *
- * @param dbDir
- * @param dbName
- * @param valueClass
- */
- public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){
- //headIndex=new AtomicLong(0);
- //tailIndex=new AtomicLong(0);
- this.dbDir=dbDir;
- this.dbName=dbName;
- createAndBindDatabase(dbDir,dbName,valueClass);
- }
- /**
- * 绑定数据库
- *
- * @param db
- * @param valueClass
- * @param classCatalog
- */
- public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){
- EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
- if(valueBinding == null) {
- valueBinding = new SerialBinding<E>(classCatalog, valueClass); // 序列化绑定
- }
- queueDb = db;
- queueMap = new StoredSortedMap<Long,E>(
- db, // db
- TupleBinding.getPrimitiveBinding(Long.class), //Key 序列化类型
- valueBinding, // Value
- true); // allow write
- //todo
- Long firstKey = ((StoredSortedMap<Long, E>) queueMap).firstKey();
- Long lastKey = ((StoredSortedMap<Long, E>) queueMap).lastKey();
-
- headIndex=new AtomicLong(firstKey == null ? 0 : firstKey);
- tailIndex=new AtomicLong(lastKey==null?0:lastKey+1);
- }
- /**
- * 创建以及绑定数据库
- *
- * @param dbDir
- * @param dbName
- * @param valueClass
- * @throws DatabaseNotFoundException
- * @throws DatabaseExistsException
- * @throws DatabaseException
- * @throws IllegalArgumentException
- */
- private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,
- DatabaseExistsException,DatabaseException,IllegalArgumentException{
- File envFile = null;
- EnvironmentConfig envConfig = null;
- DatabaseConfig dbConfig = null;
- Database db=null;
-
- try {
- // 数据库位置
- envFile = new File(dbDir);
-
- // 数据库环境配置
- envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(true);
- //不支持事务
- envConfig.setTransactional(false);
-
- // 数据库配置
- dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- dbConfig.setTransactional(false);
- //是否要延迟写
- dbConfig.setDeferredWrite(true);
-
- // 创建环境
- dbEnv = new BdbEnvironment(envFile, envConfig);
- // 打开数据库
- db = dbEnv.openDatabase(null, dbName, dbConfig);
- // 绑定数据库
- bindDatabase(db,valueClass,dbEnv.getClassCatalog());
-
- } catch (DatabaseNotFoundException e) {
- throw e;
- } catch (DatabaseExistsException e) {
- throw e;
- } catch (DatabaseException e) {
- throw e;
- } catch (IllegalArgumentException e) {
- throw e;
- }
-
-
- }
-
- /**
- * 值遍历器
- */
- @Override
- public Iterator<E> iterator() {
- return queueMap.values().iterator();
- }
- /**
- * 大小
- */
- @Override
- public int size() {
- synchronized(tailIndex){
- synchronized(headIndex){
- return (int)(tailIndex.get()-headIndex.get());
- }
- }
- }
-
- /**
- * 插入值
- */
- @Override
- public boolean offer(E e) {
- synchronized(tailIndex){
- queueMap.put(tailIndex.getAndIncrement(), e);// 从尾部插入
- //将数据不保存在缓冲区,直接存入磁盘
- dbEnv.sync();
- }
- return true;
- }
-
- /**
- * 获取值,从头部获取
- */
- @Override
- public E peek() {
- synchronized(headIndex){
- if(peekItem!=null){
- return peekItem;
- }
- E headItem=null;
- while(headItem==null&&headIndex.get()<tailIndex.get()){ // 没有超出范围
- headItem=queueMap.get(headIndex.get());
- if(headItem!=null){
- peekItem=headItem;
- continue;
- }
- headIndex.incrementAndGet(); // 头部指针后移
- }
- return headItem;
- }
- }
-
- /**
- * 移出元素,移出头部元素
- */
- @Override
- public E poll() {
- synchronized(headIndex){
- E headItem=peek();
- if(headItem!=null){
- queueMap.remove(headIndex.getAndIncrement());
- //从磁盘上移除
- dbEnv.sync();
- peekItem=null;
- return headItem;
- }
- }
- return null;
- }
-
- /**
- * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境
- */
- public void close(){
- try {
- if(queueDb!=null){
- queueDb.sync();
- queueDb.close();
- }
- } catch (DatabaseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (UnsupportedOperationException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- /**
- * 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close()
- */
- @Override
- public void clear() {
- try {
- close();
- if(dbEnv!=null&&queueDb!=null){
- dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName);
- dbEnv.close();
- }
- } catch (DatabaseNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (DatabaseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally{
- try {
- if(this.dbDir!=null){
- FileUtils.deleteDirectory(new File(this.dbDir));
- }
-
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
-
- }
- import java.io.File;
-
- import com.sleepycat.bind.serial.StoredClassCatalog;
- import com.sleepycat.je.Database;
- import com.sleepycat.je.DatabaseConfig;
- import com.sleepycat.je.DatabaseException;
- import com.sleepycat.je.Environment;
- import com.sleepycat.je.EnvironmentConfig;
- /**
- * BDB数据库环境,可以缓存StoredClassCatalog并共享
- *
- * @contributor
- */
- public class BdbEnvironment extends Environment {
- StoredClassCatalog classCatalog;
- Database classCatalogDB;
-
- /**
- * Constructor
- *
- * @param envHome 数据库环境目录
- * @param envConfig config options 数据库换纪念馆配置
- * @throws DatabaseException
- */
- public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
- super(envHome, envConfig);
- }
-
- /**
- * 返回StoredClassCatalog
- * @return the cached class catalog
- */
- public StoredClassCatalog getClassCatalog() {
- if(classCatalog == null) {
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- try {
- //事务、数据库名、配置项
- classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
- classCatalog = new StoredClassCatalog(classCatalogDB);
- } catch (DatabaseException e) {
- // TODO Auto-generated catch block
- throw new RuntimeException(e);
- }
- }
- return classCatalog;
- }
-
- @Override
- public synchronized void close() throws DatabaseException {
- if(classCatalogDB!=null) {
- classCatalogDB.close();
- }
- super.close();
- }
-
- }
客户端--doccloudweb
1.将上面客户端的DocJob、DocJobType、JobDeamonService、JobStatus类复制到客户端
2.将DocController中接着添加
- //上传成功以后需要提交文档转换任务
- //转换成html,
- submitDocJob(docEntity,new Random().nextInt());
- /**
- * 提交任务到集群上运行--文档转换任务
- * @param docEntity
- * @param userId
- */
- private void submitDocJob(Doc docEntity, int userId) throws IOException {
- //创建一个文档转换任务对象
- DocJob docJob = new DocJob();
- //1.设置提交者
- docJob.setUserId(userId);
- //2.设置任务名
- docJob.setName("doc convent");
- //3.任务的状态
- docJob.setJobStatus(JobStatus.SUBMIT);
- //4.设置任务类型
- docJob.setJobType(DocJobType.DOC_JOB_CONVERT);
- //5.设置提交时间
- docJob.setSubmitTime(System.nanoTime());
- //6.设置输入路径
- docJob.setInput(docEntity.getDocDir()+"/"+docEntity.getDocName());
- //7.设置输出路径
- docJob.setOutput(docEntity.getDocDir());
- //8.设置重试次数
- docJob.setRetryTime(4);
- //9.设置文件名
- docJob.setFileName(docEntity.getDocName());
- //todo 将job元数据保存到数据库
- //获取动态代理对象
- JobDaemonService jobDaemonService = RPC.getProxy(JobDaemonService.class, 1L, new InetSocketAddress("localhost", 7788), new Configuration());
- //提交任务到服务器(hdfs上)
- log.info("submit job:{}",docJob);
- jobDaemonService.submitDocJob(docJob);
-
- }
将上传到hdfs上的文件下载到本地,将下载的文件转化为HTML(通过runtime调用exec来执行命令)并保存到本地(客户端提交任务到服务器)通过hadoop IPC来接受任务。将任务保存在序列化队列中,1.保证任务不丢失 2.并发控制,内存溢出
---------------------------<待更>-------------------------