当前位置:   article > 正文

大数据综合项目DocCloud之需求分析与功能实现详细(续更)

clouddoc研发组

DocCloud项目需求

项目背景:

在一些大型企事业单位,工作中存在各种各样的工作文档,技术文档,规范等等。这些文档以word,xls,ppt,wps,pdf,txt存在。在此项目之前,文档的分享主要靠单位内部人员的互相发送。没有一个统一的平台对企业现存的各种文档进行统一管理。DocCloud项目提供了统一的文档管理平台。用户可以将文档上传至平台,所有其他用户可以在线查看此文档。同时满足搜索文档,分享,收藏等等一系列需求。在实践中,有百度文库,doc88,豆丁等公网项目。但是没有一个专门为企业用户服务的一个文档管理平台。

项目需求:

1.文档的统一存储

2.文档的检索

3.文档的在线预览

4.文档分享

5.文档推荐

6.文档上传下载

7.用户的注册,登录

8.文档权限管理

项目架构:

HDFS+LibreOffice6.0+solr+nginx+flume+hive+springboot+jpa+js+html+css

文档存储: HDFS

文件存储:1.本地(linux)-web服务器 优缺点:内存小,但是存储方便

2.ftp服务器(搭建一个存储文件的集群)优缺点:文档存储内存够,但是不能容错

3.hdfs (hadoop集群)优缺点:可扩展、容错、分布式存储

文档格式转换: LibreOffice6.0

因为存储的内容不是纯文本,就是传统的io流不能用、需要变成纯文本文件(txt)

doc、docx、ppt---→html---→txt

使用LibreOffice6.0,不适用word的原因:1.没有接口(不开源)2.不能再linux上运行

进程间通信:hadoop ipc

全文检索: solr

日志记录服务器:ngnix

web日志采集:flume

日志分析:hive

webMvc:springboot

持久层框架:jpa

单元测试:junit4

前端:css+html+js+jquery+bootstrap

版本管理:svn

依赖管理:maven

开发环境:idea

部署环境:linux

数据库:mysql


 

项目具体设计:

1.文档的上传下载

a.用户在前端点击上传按钮

b.在本地选择上传文档

c.开始上传

b.服务端校验文件后缀是否符合文档格式。

允许格式:doc,docx,ppt,pptx,xls,xlsx,pdf,txt

目的:避免上传不能转码的文档如:exe,zip,….

e.校验文档大小,允许128兆以下的文档上传。

128M:为了使文档在hdfs是一个块的形式保存。

f.计算文档的md5值,判断文档是否在文库中已经存在,如果存在,告知用户已经存在。

g.不存在,则上传至hdfs,同时数据库中保存用户上传文档信息。

数据保存在hdfs上,元数据保存在数据库mysql

2.上传成功以后需要提交文档转换任务(主要功能如下)

      1>转换成html

       2>转换成pdf提取缩略图,页数

       3>提取文本 建立索引


以下代码没有涉及前端、只是后台测试(使用Postman测试),部分参数都没有从session中获取,都是随机生成的

没有软件的附上资源(下载双击安装就可以):链接:https://pan.baidu.com/s/1SibrDOB4GwkX4L0iw3nYTA 
提取码:bisn

一、功能一:上传文件/2018.10.29

日志配置:

         在类名上添加注解

         @Slf4j

         直接在类中使用log记录日志

文件上传:

         关键注解:

         @RequestParam("file") MultipartFile file

         获取上传文件名:

         file.getOriginalFilename()

 

创建一个java项目DocCloud

在DocCloud中创建model---->doccloudweb(模块名要小写)

在module中选择Spring Initializr

在创建module时,选择pom中的依赖如下,选完之后一路下一步

core下选择DevTools、LomBok

web下选择web

sql下选择JPA、mysql

NoSQL下选solr

 

<dependencies>
    <!--数据相关依赖-持久层-->
    <!--<dependency>-->
        <!--<groupId>org.springframework.boot</groupId>-->
        <!--<artifactId>spring-boot-starter-data-jpa</artifactId>-->
    <!--</dependency>-->
    <!--全文检索-->
    <!--<dependency>-->
        <!--<groupId>org.springframework.boot</groupId>-->
        <!--<artifactId>spring-boot-starter-data-solr</artifactId>-->
    <!--</dependency>-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--热部署-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
    </dependency>
    <!--数据库连接-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.0.6.RELEASE</version>
    </dependency>                   
</dependencies>

<!--编译、打jar包-->
<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

以上是创建项目选择的依赖,请添加以下外加依赖

<!--上传文件到hdfs上-->

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.7.5</version>

        </dependency>

数据库创建Doc_cloud

Doc:记录文件属性

使用jpa---->java+persistence+api

JPA是Java Persistence API的简称,中文名Java持久层API,是JDK 5.0注解或XML描述对象-关系表的映射关系,并将运行期的实体对象持久化到数据库中

1.配置数据源application.properties

#数据源配置
spring.datasource.name=root
spring.datasource.password=123
#true:表示展示sql语句
spring.jpa.show-sql=true
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/doc_cloud

2..上传文件到hdfs上用到的core-site.xml

<?xml version="1.0" encoding="UTF-8"?>

<configuration>

    <property>

        <name>fs.defaultFS</name>

        <value>hdfs://master2:9000</value>

    </property>

    <property>

        <name>fs.hdfs.impl</name>

        <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>

    </property>

</configuration>

3.编写controller层-- DocController

import com.zhiyou100.doccloudweb.service.DocService;

import com.zhiyou100.doccloudweb.util.HdfsUtil;

import com.zhiyou100.doccloudweb.util.MD5Util;

import com.zhiyou100.doccloudweb.entity.Doc;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.ResponseBody;

import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Optional;

import java.util.Random;

@Controller  //表示是controller层--业务层

@RequestMapping("/doc")

@Slf4j

public class DocController {

    @Autowired

    private DocService docService;

    //定义合法的文件后缀类型

    public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};

    //定义文件最大大小

    public static final int DOC_MAX_SIZE = 128*1024*1024;

    //定义文件保存到hdfs上的根目录

    public static final String HOME="hdfs://192.168.228.13:9000/doccloud";

    @RequestMapping("/upload")

    @ResponseBody

    public String upload(@RequestParam("file") MultipartFile file){

        //判断是否是文件

        if (file.isEmpty()){

            return "file is empty";

        }

        //获取文件名

        String filename = file.getOriginalFilename();

        //以点分割-获取文件后缀

        String[] strings = filename.split("\\.");

        if (strings.length==1){

            return "file does not has suffix";

        }

        String suffix = strings[1];

        log.info("doc suffix is {}",suffix);

        //1.判断文件后缀是否合法

        boolean flag = isSuffixLegal(suffix);

        if (!flag){

            return "file is illegal";

        }

        try {

            //2.判断文件大小是否合法

            byte[] bytes = file.getBytes();

            log.info("file size is {}",bytes.length);

            if (bytes.length>DOC_MAX_SIZE){

                return "file is large,file Max size:"+DOC_MAX_SIZE;

            }

            //3.计算文档的MD5值

            String md5 = getMD5(bytes);

            log.info("file is md5 {} ",md5);

            //用户上传文件,保存到数据库

            //1.校验数据库中的md5值,判断数据库中是否存在

            Optional<Doc> doc = docService.findByMd5(md5);

            if (doc.isPresent()){

                //2.如果存在,更新

                // 2.1获取文件对象

                Doc docEntity = doc.get();

                //2.2设置文件更新的人

                docEntity.setUserId(new Random().nextInt());

                //2.3保存到数据库

                docService.save(docEntity);

            }else {

                //3.如果不存在,将文件元数据保存到数据库,将数据保存到hdfs

                //3.1保存数据到hdfs

                //3.1.1生成文件保存路径:HOME+当前时间

                String date = getDate();

                String dst = HOME+"/"+date+"/"+file.getOriginalFilename()+"/";

                log.info("file dst {}",dst);

                //3.1.2上传文件

                HdfsUtil.upload(bytes,file.getOriginalFilename(),dst);

                //3.2将元数据保存到数据库

                //3.2.1创建一个文件对象

                Doc docEntity = new Doc();

                //3.2.2设置作者

                docEntity.setUserId(new Random().nextInt());

                //3.2.3设置备注

                docEntity.setDocComment("hadoop");

                //3.2.4设置文件路径

                docEntity.setDocDir(dst);

                //3.2.5设置文件名

                docEntity.setDocName(filename);

                //3.2.6设置文件大小

                docEntity.setDocSize(bytes.length);

                //3.2.7设置文件权限

                docEntity.setDocPermission("1");

                //3.2.8设置文件类型(后缀)

                docEntity.setDocType(suffix);

                //3.2.9设置文件状态

                docEntity.setDocStatus("upload");

                //3.2.10设置文件的md5值--保证文件的唯一性

                docEntity.setMd5(md5);

                //3.2.11设置文件创作时间

                docEntity.setDocCreateTime(new Date());

                //3.2.12保存元数据

                docService.save(docEntity);

            }

        } catch (IOException e) {

            e.printStackTrace();

        }

        return "upload success";

    }

    /**

     * 获取当前是时间,用于文件的保存路径

     * @return

     */

    private String getDate() {

        Date date = new Date();

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");

        return simpleDateFormat.format(date);

    }

    /**

     * 计算字节数组的MD5值

     * @param bytes

     * @return

     */

    private String getMD5(byte[] bytes) {

        return MD5Util.getMD5String(bytes);

    }

    /**

     * 判断文件后缀是否合法

     * @param suffix

     * @return

     */

    private boolean isSuffixLegal(String suffix) {

        for (String docsuffix :

                DOC_SUFFIXS) {

            if (suffix.equals(docsuffix)){

                return true;

            }

        }

        return false;

    }

}

 

4.编写业务层service层-- DocService

import com.zhiyou100.doccloudweb.dao.DocRepository;

import com.zhiyou100.doccloudweb.entity.Doc;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.util.Optional;

@Service

public class DocService {

    @Autowired

    private DocRepository docRepository;

    //通过id获取文件对象

    public Optional<Doc> findById(int id) {

        return docRepository.findById(id);

    }

    //通过MD5获取文件对象

    public Optional<Doc> findByMd5(String md5) {

        return docRepository.findByMd5(md5);

    }

    //保存文件对象到数据库

    public void save(Doc docEntity) {

        docRepository.save(docEntity);

    }

}

    //定义合法的文件后缀类型

    public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};

    //定义文件最大大小

    public static final int DOC_MAX_SIZE = 128*1024*1024;

    @RequestMapping("/doclist")

    @ResponseBody

    Doc doList(){

        Optional<Doc> id = docService.findById(1);

        return null;

    }

 

5.dao层—持久层--DocRepository

import com.zhiyou100.doccloudweb.entity.Doc;

import org.springframework.data.jpa.repository.JpaRepository;

import org.springframework.stereotype.Repository;

import java.util.Optional;

@Repository

//Doc:表示定义的实体类,Integer:表示主键类型

public interface DocRepository extends JpaRepository<Doc,Integer> {

    //利用反射机制自动识别

    Optional<Doc> findByMd5(String md5);

}

 

6.实体层—Doc

import lombok.Data;

import org.springframework.web.bind.annotation.ResponseBody;

import javax.persistence.*;

import java.util.Date;

/**

 * 文件属性

 */

@Entity

@Table(name = "doc") //映射到数据库中的表

@Data //get/set

public class Doc {

 

    @Id //主键

    //告诉框架id生成策略(怎么生成)GenerationType.IDENTITY:表示自动生成

    @GeneratedValue(strategy = GenerationType.IDENTITY)

    private int id;

    @Column(name = "md5")//如果数据库字段与entity中字段名一样,则不用加此注解

    private String md5;

    @Column(name = "doc_name")

    private String docName;

    @Column(name = "doc_type")

    private String docType;

    @Column(name = "doc_status")

    private String docStatus;

    @Column(name = "doc_size")

    private int docSize;

    @Column(name = "doc_dir")

    private String docDir;

    @Column(name = "user_id")

    private int userId;

    @Column(name = "doc_create_time")

    private Date docCreateTime;

    @Column(name = "doc_comment")

    private String docComment;

    @Column(name = "doc_permission")

    private String docPermission;

}

7.工具类

HdfsUtil 

  1. import com.google.common.io.Resources;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FSDataOutputStream;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import java.io.IOException;
  7. /*
  8. *@ClassName:HdfsUtil
  9. @Description:TODO
  10. @Author:
  11. @Date:2018/10/29 17:17
  12. @Version:v1.0
  13. */
  14. public class HdfsUtil {
  15. //文档上传工具类
  16. public static void upload(byte[] src, String docName, String dst) throws IOException {
  17. //加载配置文件
  18. Configuration coreSiteConf = new Configuration();
  19. coreSiteConf.addResource(Resources.getResource("core-site.xml"));
  20. //获取文件系统客户端对象
  21. FileSystem fileSystem = FileSystem.get(coreSiteConf);
  22. FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(dst + "/" + docName));
  23. fsDataOutputStream.write(src);
  24. fsDataOutputStream.close();
  25. fileSystem.close();
  26. }
  27. }

MD5Util 

  1. import java.io.File;
  2. import java.io.FileInputStream;
  3. import java.io.IOException;
  4. import java.nio.MappedByteBuffer;
  5. import java.nio.channels.FileChannel;
  6. import java.security.MessageDigest;
  7. import java.security.NoSuchAlgorithmException;
  8. public class MD5Util {
  9. protected static char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
  10. protected static MessageDigest messagedigest = null;
  11. /**
  12. * MessageDigest初始化
  13. *
  14. * @author
  15. */
  16. static {
  17. try {
  18. messagedigest = MessageDigest.getInstance("MD5");
  19. } catch (NoSuchAlgorithmException e) {
  20. System.err.println("MD5FileUtil messagedigest初始化失败");
  21. e.printStackTrace();
  22. }
  23. }
  24. /**
  25. * 对文件进行MD5加密
  26. *
  27. * @author
  28. */
  29. public static String getFileMD5String(File file) throws IOException {
  30. FileInputStream in = new FileInputStream(file);
  31. FileChannel ch = in.getChannel();
  32. MappedByteBuffer byteBuffer = ch.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
  33. messagedigest.update(byteBuffer);
  34. return bufferToHex(messagedigest.digest());
  35. }
  36. /**
  37. * 对字符串进行MD5加密
  38. *
  39. * @author
  40. */
  41. public static String getMD5String(String s) {
  42. return getMD5String(s.getBytes());
  43. }
  44. /**
  45. * 对byte类型的数组进行MD5加密
  46. *
  47. * @author
  48. */
  49. public static String getMD5String(byte[] bytes) {
  50. messagedigest.update(bytes);
  51. return bufferToHex(messagedigest.digest());
  52. }
  53. private static String bufferToHex(byte bytes[]) {
  54. return bufferToHex(bytes, 0, bytes.length);
  55. }
  56. private static String bufferToHex(byte bytes[], int m, int n) {
  57. StringBuffer stringbuffer = new StringBuffer(2 * n);
  58. int k = m + n;
  59. for (int l = m; l < k; l++) {
  60. char c0 = hexDigits[(bytes[l] & 0xf0) >> 4];
  61. char c1 = hexDigits[bytes[l] & 0xf];
  62. stringbuffer.append(c0);
  63. stringbuffer.append(c1);
  64. }
  65. return stringbuffer.toString();
  66. }
  67. }

二、功能:文档转换

上传成功以后需要提交文档转换任务(主要功能如下)

      1>转换成html

       2>转换成pdf提取缩略图,页数

       3>提取文本 建立索引

1.定义一个docjob对象,用于封装任务信息

2.实现writable接口。因为要通过hadoop ipc序列化实现文档转换守护进程
该进程的作用是完成存放在本节点文档的转换,索引的任务。

1.文档转换成htm!通过runtime.exec执行命令来实现

2.通过hadoop ipc来接受任务

hadoop ipc
hadoop ipc是一套hadoop自带的成熟的rpc框架,性能高,稳定性性强。

server:
a.服务端定义接口b.定义按口的实现类
c用hadoop ipc暴露服务

client;
通过rpc.geproxy来调用服务岗的接口。


下面创建的是服务端的代码: 

1.新建模块--docservicedeamon文件转换守护进程

2.pom文件中的依赖

  1. <dependency>
  2. <groupId>commons-io</groupId>
  3. <artifactId>commons-io</artifactId>
  4. <version>2.5</version>
  5. </dependency>
  6. <!--ipc通信模块-->
  7. <dependency>
  8. <groupId>org.apache.hadoop</groupId>
  9. <artifactId>hadoop-common</artifactId>
  10. <version>2.7.5</version>
  11. </dependency>
  12. <!--注解、-->
  13. <dependency>
  14. <groupId>org.projectlombok</groupId>
  15. <artifactId>lombok</artifactId>
  16. <version>1.16.20</version>
  17. </dependency>
  18. <!--berklydb数据库依赖-->
  19. <!-- https://mvnrepository.com/artifact/com.sleepycat/je -->
  20. <dependency>
  21. <groupId>com.sleepycat</groupId>
  22. <artifactId>je</artifactId>
  23. <version>5.0.73</version>
  24. </dependency>
  25. <!--hdfs文件上传与下载-->
  26. <dependency>
  27. <groupId>org.apache.hadoop</groupId>
  28. <artifactId>hadoop-hdfs</artifactId>
  29. <version>2.7.5</version>
  30. </dependency>

 

3.配置core-site.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration>
  3. <property>
  4. <name>fs.defaultFS</name>
  5. <value>hdfs://master2:9000</value>
  6. </property>
  7. <property>
  8. <name>fs.hdfs.impl</name>
  9. <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
  10. </property>
  11. </configuration>

4.类一---DocJob

  1. import lombok.Data;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. import java.io.Serializable;
  7. /**
  8. *此方法用于封装任务信息
  9. */
  10. @Data
  11. public class DocJob implements Writable,Serializable {
  12. private static final long serialVersionUID = 12345678L;
  13. //任务id
  14. private int id;
  15. //任务名
  16. private String name;
  17. //任务类型
  18. private DocJobType jobType;
  19. //提交者
  20. private int userId;
  21. //提交时间
  22. private long submitTime;
  23. //完成时间
  24. private long finishTime;
  25. //任务状态
  26. private JobStatus jobStatus;
  27. //任务重试次数
  28. private int retryTime;
  29. //文档输入路径
  30. private String input;
  31. //任务输出路径
  32. private String output;
  33. //文件名
  34. private String fileName;
  35. public void write(DataOutput out) throws IOException {
  36. out.writeInt(id);
  37. out.writeUTF(name);
  38. out.writeUTF(jobType.name());
  39. out.writeInt(userId);
  40. out.writeLong(finishTime);
  41. out.writeLong(submitTime);
  42. out.writeUTF(jobStatus.name());
  43. out.writeInt(retryTime);
  44. out.writeUTF(input);
  45. out.writeUTF(output);
  46. out.writeUTF(fileName);
  47. }
  48. public void readFields(DataInput in) throws IOException {
  49. id= in.readInt();
  50. name=in.readUTF();
  51. jobType=DocJobType.valueOf(in.readUTF());
  52. userId=in.readInt();
  53. finishTime=in.readLong();
  54. submitTime=in.readLong();
  55. jobStatus=JobStatus.valueOf(in.readUTF());
  56. retryTime=in.readInt();
  57. input=in.readUTF();
  58. output=in.readUTF();
  59. fileName=in.readUTF();
  60. }
  61. }

5.类二、DocJobType

  1. /**
  2. * 项目的类型:文档转换、定义索引...
  3. */
  4. public enum DocJobType {
  5. DOC_JOB_CONVERT,DOC_JOB_CREATE_INDEX,DOC_JOB_UPDATE_INDEX
  6. }

6.类三、JobStatus

  1. /**
  2. * 文档状态:准备、提交、运行、失败、完成
  3. */
  4. public enum JobStatus {
  5. PREPARE,SUBMIT,RUNNING,FAILED,SUCCEED,
  6. }

7类四、.JobDaemonService

  1. /**
  2. * 服务端
  3. * 1.定义接口继承VersionedProtocol
  4. */
  5. public interface JobDaemonService extends VersionedProtocol {
  6. //定义通信间的暗号
  7. long versionID=1L;
  8. //定义提交方法
  9. void submitDocJob(DocJob job);
  10. }

8.类五、JobDaemonServiceImpl

  1. import com.zhiyou100.doccloud.utils.BdbPersistentQueue;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.hadoop.ipc.ProtocolSignature;
  4. import java.io.File;
  5. import java.io.IOException;
  6. import java.util.concurrent.ExecutorService;
  7. import java.util.concurrent.Executors;
  8. /**
  9. * 2.定义接口的实现类
  10. * 实现Runnable接口:是为了使用多线程处理
  11. */
  12. @Slf4j
  13. public class JobDaemonServiceImpl implements JobDaemonService,Runnable{
  14. //定义将hdfs下载到本地的目录的根路径
  15. private static final String WORK_DIR="/tmp/docjobdaemon/";
  16. //定义持久化对象
  17. public BdbPersistentQueue<DocJob> queue;
  18. //定义线程池--多线程并行处理
  19. private ExecutorService pool = Executors.newFixedThreadPool(4);
  20. //定义一个标准-让线程运行
  21. private boolean flag = true;
  22. //构造方法:用于创建berkly数据库目录,并初始化持久化队列
  23. public JobDaemonServiceImpl(){
  24. //创建工作目录--本地保存路径
  25. File workDir = new File(WORK_DIR + "/" + "bdb/");
  26. if (!workDir.exists()){
  27. //如果不存在将创建
  28. workDir.mkdirs();
  29. System.out.println(workDir.getAbsolutePath());
  30. }
  31. //初始化持久化队列
  32. queue = new BdbPersistentQueue<DocJob>(WORK_DIR+"/"+"bdb/", "docjob", DocJob.class);
  33. }
  34. public void submitDocJob(DocJob job) {
  35. System.out.println(job);
  36. //将任务保存在序列化队列中,1.保证任务不丢失 2.并发控制,内存溢出
  37. log.info("receive job {}",job);
  38. queue.offer(job);
  39. }
  40. public long getProtocolVersion(String s, long l) throws IOException {
  41. return versionID;
  42. }
  43. public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
  44. return null;
  45. }
  46. @Override
  47. public void run() {
  48. while (flag){
  49. //将任务从序列化队列中取出任务,poll:每取出一个就从磁盘中移除一个
  50. DocJob docJob = queue.poll();
  51. //判断docjob中否为空
  52. if (docJob==null){
  53. //为空,等待5000毫秒
  54. try {
  55. Thread.sleep(5000);
  56. System.out.println("waiting for docjob");
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. }
  60. }else {
  61. pool.submit(new DocJobHandler(docJob));
  62. }
  63. }
  64. }
  65. }

9.类六、Main

  1. import com.zhiyou100.doccloud.job.JobDaemonService;
  2. import com.zhiyou100.doccloud.job.JobDaemonServiceImpl;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.ipc.RPC;
  5. import java.io.IOException;
  6. /**
  7. * 守护进程--项目的入口类
  8. * 3.服务端:暴露端口
  9. */
  10. public class Main {
  11. public static void main(String[] args) throws IOException {
  12. //创建服务端接口实现类对象
  13. JobDaemonServiceImpl instance = new JobDaemonServiceImpl();
  14. //开启线程
  15. new Thread(instance).start();
  16. // 创建一个RPC builder
  17. RPC.Builder builder = new RPC.Builder(new Configuration());
  18. //指定RPC Server的参数
  19. builder.setBindAddress("localhost");
  20. builder.setPort(7788);
  21. //将自己的程序部署到server上
  22. builder.setProtocol(JobDaemonService.class);
  23. builder.setInstance(instance);
  24. //创建Server
  25. RPC.Server server = builder.build();
  26. //启动服务
  27. server.start();
  28. }
  29. }

10.类七--DocJobHandler

  1. import com.zhiyou100.doccloud.utils.HdfsUtil;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.commons.io.IOUtils;
  4. import java.io.File;
  5. import java.io.IOException;
  6. import java.util.UUID;
  7. @Slf4j
  8. public class DocJobHandler implements Runnable {
  9. private DocJob docJob;
  10. public DocJobHandler(DocJob docJob) {
  11. this.docJob = docJob;
  12. log.info("start to deal job {}",docJob);
  13. }
  14. /**
  15. *将文件冲hdfs上下载到本地,再将文件格式转化成HTML,最终上传到hdfs上
  16. */
  17. @Override
  18. public void run() {
  19. //1.将hdfs上的文件下载到本地
  20. //1.1获取文件的下载路径(在hdfs上的位置)
  21. String input = docJob.getInput();
  22. //1.2创建目标路径(下载到本地的路径)
  23. String tmpWorkDirPath = "/tmp/docjobdaemon/" + UUID.randomUUID().toString() + "/";
  24. File tmpWorkDir = new File(tmpWorkDirPath);
  25. tmpWorkDir.mkdirs();
  26. System.out.println("tmpWorkDirPath: "+tmpWorkDirPath);
  27. //1.3下载文件到临时目录
  28. try {
  29. HdfsUtil.copyToLocal(input,tmpWorkDirPath);
  30. log.info("download file to {}",tmpWorkDirPath);
  31. //step1:将下载到本地的文件格式转化成HTML
  32. String command = "D:\\soft\\LibreOffice_6.0.6\\program\\soffice --headless --invisible --convert-to html " + docJob.getFileName();
  33. Process process = Runtime.getRuntime().exec(command, null, tmpWorkDir);
  34. //结果信息
  35. System.out.println(IOUtils.toString(process.getInputStream()));
  36. //错误信息
  37. System.out.println(IOUtils.toString(process.getErrorStream()));
  38. //step2 转换成pdf
  39. //step3 提取页码
  40. //step4 提取首页缩略图
  41. //step5 利用solr建立索引
  42. //step6 上传结果
  43. //step7 清理临时目录
  44. //step8 任务成功回调
  45. } catch (IOException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }

工具类、

将hdfs上的文件下载到本地--HdfsUtil

import com.google.common.io.Resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.net.URI;

public class HdfsUtil {
    public static final String HOME="hdfs://192.168.228.13:9000/";
    //文档上传工具类
    public static void upload(byte[] src, String docName, String dst) throws IOException {
        //加载配置文件
        Configuration coreSiteConf = new Configuration();
        coreSiteConf.addResource(Resources.getResource("core-site.xml"));
        //获取文件系统客户端对象
        FileSystem fileSystem = FileSystem.get(coreSiteConf);

        FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(dst + "/" + docName));

        fsDataOutputStream.write(src);
        fsDataOutputStream.close();
        fileSystem.close();
    }

    /**
     * 将集群的问价下载到本地
     * @param dst
     * @param localPath
     * @throws IOException
     */
    public static void copyToLocal(String dst,String localPath) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(Resources.getResource("core-site.xml"));
        FileSystem fs = FileSystem.get(URI.create(dst),conf);
        fs.copyToLocalFile(new Path(dst),new Path(localPath));
        fs.close();
    }
}

持久化队列,基于BDB实现--BdbPersistentQueue&&BdbEnvironment

  1. import java.io.File;
  2. import java.io.IOException;
  3. import java.io.Serializable;
  4. import java.util.AbstractQueue;
  5. import java.util.Iterator;
  6. import java.util.concurrent.atomic.AtomicLong;
  7. import org.apache.commons.io.FileUtils;
  8. import com.sleepycat.bind.EntryBinding;
  9. import com.sleepycat.bind.serial.SerialBinding;
  10. import com.sleepycat.bind.serial.StoredClassCatalog;
  11. import com.sleepycat.bind.tuple.TupleBinding;
  12. import com.sleepycat.collections.StoredMap;
  13. import com.sleepycat.collections.StoredSortedMap;
  14. import com.sleepycat.je.Database;
  15. import com.sleepycat.je.DatabaseConfig;
  16. import com.sleepycat.je.DatabaseException;
  17. import com.sleepycat.je.DatabaseExistsException;
  18. import com.sleepycat.je.DatabaseNotFoundException;
  19. import com.sleepycat.je.EnvironmentConfig;
  20. /**
  21. * 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭
  22. * 相比一般的内存Queue,插入和获取值需要多消耗一定的时间
  23. * 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可,
  24. * 其他如remove,addAll,AbstractQueue会基于这几个方法去实现
  25. *
  26. * @contributor
  27. * @param <E>
  28. */
  29. public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements
  30. Serializable {
  31. private static final long serialVersionUID = 3427799316155220967L;
  32. private transient BdbEnvironment dbEnv; // 数据库环境,无需序列化
  33. private transient Database queueDb; // 数据库,用于保存值,使得支持队列持久化,无需序列化
  34. private transient StoredMap<Long,E> queueMap; // 持久化Map,Key为指针位置,Value为值,无需序列化
  35. private transient String dbDir; // 数据库所在目录
  36. private transient String dbName; // 数据库名字
  37. //AtomicLong:元子类型,线程安全
  38. //i++线程不安全
  39. private AtomicLong headIndex; // 头部指针
  40. private AtomicLong tailIndex; // 尾部指针
  41. private transient E peekItem=null; // 当前获取的值
  42. /**
  43. * 构造函数,传入BDB数据库
  44. *
  45. * @param db
  46. * @param valueClass
  47. * @param classCatalog
  48. */
  49. public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){
  50. this.queueDb=db;
  51. this.dbName=db.getDatabaseName();
  52. headIndex=new AtomicLong(0);
  53. tailIndex=new AtomicLong(0);
  54. bindDatabase(queueDb,valueClass,classCatalog);
  55. }
  56. /**
  57. * 构造函数,传入BDB数据库位置和名字,自己创建数据库
  58. *
  59. * @param dbDir
  60. * @param dbName
  61. * @param valueClass
  62. */
  63. public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){
  64. //headIndex=new AtomicLong(0);
  65. //tailIndex=new AtomicLong(0);
  66. this.dbDir=dbDir;
  67. this.dbName=dbName;
  68. createAndBindDatabase(dbDir,dbName,valueClass);
  69. }
  70. /**
  71. * 绑定数据库
  72. *
  73. * @param db
  74. * @param valueClass
  75. * @param classCatalog
  76. */
  77. public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){
  78. EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
  79. if(valueBinding == null) {
  80. valueBinding = new SerialBinding<E>(classCatalog, valueClass); // 序列化绑定
  81. }
  82. queueDb = db;
  83. queueMap = new StoredSortedMap<Long,E>(
  84. db, // db
  85. TupleBinding.getPrimitiveBinding(Long.class), //Key 序列化类型
  86. valueBinding, // Value
  87. true); // allow write
  88. //todo
  89. Long firstKey = ((StoredSortedMap<Long, E>) queueMap).firstKey();
  90. Long lastKey = ((StoredSortedMap<Long, E>) queueMap).lastKey();
  91. headIndex=new AtomicLong(firstKey == null ? 0 : firstKey);
  92. tailIndex=new AtomicLong(lastKey==null?0:lastKey+1);
  93. }
  94. /**
  95. * 创建以及绑定数据库
  96. *
  97. * @param dbDir
  98. * @param dbName
  99. * @param valueClass
  100. * @throws DatabaseNotFoundException
  101. * @throws DatabaseExistsException
  102. * @throws DatabaseException
  103. * @throws IllegalArgumentException
  104. */
  105. private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,
  106. DatabaseExistsException,DatabaseException,IllegalArgumentException{
  107. File envFile = null;
  108. EnvironmentConfig envConfig = null;
  109. DatabaseConfig dbConfig = null;
  110. Database db=null;
  111. try {
  112. // 数据库位置
  113. envFile = new File(dbDir);
  114. // 数据库环境配置
  115. envConfig = new EnvironmentConfig();
  116. envConfig.setAllowCreate(true);
  117. //不支持事务
  118. envConfig.setTransactional(false);
  119. // 数据库配置
  120. dbConfig = new DatabaseConfig();
  121. dbConfig.setAllowCreate(true);
  122. dbConfig.setTransactional(false);
  123. //是否要延迟写
  124. dbConfig.setDeferredWrite(true);
  125. // 创建环境
  126. dbEnv = new BdbEnvironment(envFile, envConfig);
  127. // 打开数据库
  128. db = dbEnv.openDatabase(null, dbName, dbConfig);
  129. // 绑定数据库
  130. bindDatabase(db,valueClass,dbEnv.getClassCatalog());
  131. } catch (DatabaseNotFoundException e) {
  132. throw e;
  133. } catch (DatabaseExistsException e) {
  134. throw e;
  135. } catch (DatabaseException e) {
  136. throw e;
  137. } catch (IllegalArgumentException e) {
  138. throw e;
  139. }
  140. }
  141. /**
  142. * 值遍历器
  143. */
  144. @Override
  145. public Iterator<E> iterator() {
  146. return queueMap.values().iterator();
  147. }
  148. /**
  149. * 大小
  150. */
  151. @Override
  152. public int size() {
  153. synchronized(tailIndex){
  154. synchronized(headIndex){
  155. return (int)(tailIndex.get()-headIndex.get());
  156. }
  157. }
  158. }
  159. /**
  160. * 插入值
  161. */
  162. @Override
  163. public boolean offer(E e) {
  164. synchronized(tailIndex){
  165. queueMap.put(tailIndex.getAndIncrement(), e);// 从尾部插入
  166. //将数据不保存在缓冲区,直接存入磁盘
  167. dbEnv.sync();
  168. }
  169. return true;
  170. }
  171. /**
  172. * 获取值,从头部获取
  173. */
  174. @Override
  175. public E peek() {
  176. synchronized(headIndex){
  177. if(peekItem!=null){
  178. return peekItem;
  179. }
  180. E headItem=null;
  181. while(headItem==null&&headIndex.get()<tailIndex.get()){ // 没有超出范围
  182. headItem=queueMap.get(headIndex.get());
  183. if(headItem!=null){
  184. peekItem=headItem;
  185. continue;
  186. }
  187. headIndex.incrementAndGet(); // 头部指针后移
  188. }
  189. return headItem;
  190. }
  191. }
  192. /**
  193. * 移出元素,移出头部元素
  194. */
  195. @Override
  196. public E poll() {
  197. synchronized(headIndex){
  198. E headItem=peek();
  199. if(headItem!=null){
  200. queueMap.remove(headIndex.getAndIncrement());
  201. //从磁盘上移除
  202. dbEnv.sync();
  203. peekItem=null;
  204. return headItem;
  205. }
  206. }
  207. return null;
  208. }
  209. /**
  210. * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境
  211. */
  212. public void close(){
  213. try {
  214. if(queueDb!=null){
  215. queueDb.sync();
  216. queueDb.close();
  217. }
  218. } catch (DatabaseException e) {
  219. // TODO Auto-generated catch block
  220. e.printStackTrace();
  221. } catch (UnsupportedOperationException e) {
  222. // TODO Auto-generated catch block
  223. e.printStackTrace();
  224. }
  225. }
  226. /**
  227. * 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close()
  228. */
  229. @Override
  230. public void clear() {
  231. try {
  232. close();
  233. if(dbEnv!=null&&queueDb!=null){
  234. dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName);
  235. dbEnv.close();
  236. }
  237. } catch (DatabaseNotFoundException e) {
  238. // TODO Auto-generated catch block
  239. e.printStackTrace();
  240. } catch (DatabaseException e) {
  241. // TODO Auto-generated catch block
  242. e.printStackTrace();
  243. } finally{
  244. try {
  245. if(this.dbDir!=null){
  246. FileUtils.deleteDirectory(new File(this.dbDir));
  247. }
  248. } catch (IOException e) {
  249. // TODO Auto-generated catch block
  250. e.printStackTrace();
  251. }
  252. }
  253. }
  254. }

 

  1. import java.io.File;
  2. import com.sleepycat.bind.serial.StoredClassCatalog;
  3. import com.sleepycat.je.Database;
  4. import com.sleepycat.je.DatabaseConfig;
  5. import com.sleepycat.je.DatabaseException;
  6. import com.sleepycat.je.Environment;
  7. import com.sleepycat.je.EnvironmentConfig;
  8. /**
  9. * BDB数据库环境,可以缓存StoredClassCatalog并共享
  10. *
  11. * @contributor
  12. */
  13. public class BdbEnvironment extends Environment {
  14. StoredClassCatalog classCatalog;
  15. Database classCatalogDB;
  16. /**
  17. * Constructor
  18. *
  19. * @param envHome 数据库环境目录
  20. * @param envConfig config options 数据库换纪念馆配置
  21. * @throws DatabaseException
  22. */
  23. public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
  24. super(envHome, envConfig);
  25. }
  26. /**
  27. * 返回StoredClassCatalog
  28. * @return the cached class catalog
  29. */
  30. public StoredClassCatalog getClassCatalog() {
  31. if(classCatalog == null) {
  32. DatabaseConfig dbConfig = new DatabaseConfig();
  33. dbConfig.setAllowCreate(true);
  34. try {
  35. //事务、数据库名、配置项
  36. classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
  37. classCatalog = new StoredClassCatalog(classCatalogDB);
  38. } catch (DatabaseException e) {
  39. // TODO Auto-generated catch block
  40. throw new RuntimeException(e);
  41. }
  42. }
  43. return classCatalog;
  44. }
  45. @Override
  46. public synchronized void close() throws DatabaseException {
  47. if(classCatalogDB!=null) {
  48. classCatalogDB.close();
  49. }
  50. super.close();
  51. }
  52. }

 

客户端--doccloudweb

1.将上面客户端的DocJob、DocJobType、JobDeamonService、JobStatus类复制到客户端

2.将DocController中接着添加

  1. //上传成功以后需要提交文档转换任务
  2. //转换成html,
  3. submitDocJob(docEntity,new Random().nextInt());
  1. /**
  2. * 提交任务到集群上运行--文档转换任务
  3. * @param docEntity
  4. * @param userId
  5. */
  6. private void submitDocJob(Doc docEntity, int userId) throws IOException {
  7. //创建一个文档转换任务对象
  8. DocJob docJob = new DocJob();
  9. //1.设置提交者
  10. docJob.setUserId(userId);
  11. //2.设置任务名
  12. docJob.setName("doc convent");
  13. //3.任务的状态
  14. docJob.setJobStatus(JobStatus.SUBMIT);
  15. //4.设置任务类型
  16. docJob.setJobType(DocJobType.DOC_JOB_CONVERT);
  17. //5.设置提交时间
  18. docJob.setSubmitTime(System.nanoTime());
  19. //6.设置输入路径
  20. docJob.setInput(docEntity.getDocDir()+"/"+docEntity.getDocName());
  21. //7.设置输出路径
  22. docJob.setOutput(docEntity.getDocDir());
  23. //8.设置重试次数
  24. docJob.setRetryTime(4);
  25. //9.设置文件名
  26. docJob.setFileName(docEntity.getDocName());
  27. //todo 将job元数据保存到数据库
  28. //获取动态代理对象
  29. JobDaemonService jobDaemonService = RPC.getProxy(JobDaemonService.class, 1L, new InetSocketAddress("localhost", 7788), new Configuration());
  30. //提交任务到服务器(hdfs上)
  31. log.info("submit job:{}",docJob);
  32. jobDaemonService.submitDocJob(docJob);
  33. }

 

将上传到hdfs上的文件下载到本地,将下载的文件转化为HTML(通过runtime调用exec来执行命令)并保存到本地(客户端提交任务到服务器)通过hadoop IPC来接受任务。将任务保存在序列化队列中,1.保证任务不丢失 2.并发控制,内存溢出

---------------------------<待更>-------------------------


 

 


 


 


 


 


 


 


 


 

 


 

 

转载于:https://www.cnblogs.com/pigdata/p/10305583.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/375837
推荐阅读
相关标签
  

闽ICP备14008679号