当前位置:   article > 正文

Flink写入Doris的实时应用_flink doris

flink doris

想,全是问题;做,全是办法。

想深入交流doris的私聊我,加微信

引言

做实时数仓的同学对目前比较流行的KFC(Kafka\Flink\ClickHouse)套餐非常熟悉,其实KFD也不错。
大数据组件越来越丰富,但是还没有出现一个兼容OLAP和OLTP的工具,即满足DB和日志的实时存储和复杂查询,又能满足在此基础上的数仓建设,我们尝试过ClickHouse,缺点在于难维护、实时写入效率低,内部碎片合并和数据走zk难以实现大量数据的实时存储;之后使用过impala+kudu,缺点是impala实在是太占用内存,两者结合用起来比较费劲,也是开发了实时同步DB的工具,维护成本太高,也放弃了;最终在参考百度的doris和作业帮的资料下,正式的开始使用Doris,实现了log和DB(包含分表合并)的准实时同步,以及基于doris的数仓建模。
接下来我会就Doris的实时写入部分简单的说一下实现方式,代码和注释为主

表设计

不要把字段设计成"not null",好处在于后期改表(加字段)不会影响正常的数据,其他的暂时不方便透露,之后会慢慢讲

JSONStreamLoad

为什么选择StreamLoad呢?一开始使用的是insert into,insert into是使用的FE资源的,导致FE繁忙,后期数据量上来会出问题,而streamload不存在这个问题,官方是这么说的(0.12的文档):

Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。

用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。

导入的最终结果由 Coordinator BE 返回给用户。

                         ^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
 - Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distrbute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

之后参考京东做法,不断的load小文件实现,实时的数据插入。
踩坑:

  • 尽量的数据量要大,避免多次提交,而会出现线程占有的问题
  • load是以DB为单位的,一个DB默认100个线程,控制好load的线程数
  • load是很耗内存的,一是线程,二是数据合并
  • streaming_load_max_batch_size_mb默认是100,根据业务进行更改
  • 如果要同步DB的数据注意多线程执行curl

实现起来比较简单,无非是在flinkSink代码中嵌入一段执行curl的代码

## 原curl
curl --location-trusted -u 用户名:密码 -T /xxx/test -H "format: json" -H "strip_outer_array: true" http://doris_fe:8030/api/{database}/{table}/_stream_load
## -u 不用解释了,用户名和密码
## -T json文件的地址,内容为[json,json,json],就是jsonlist
## -H 指定参数
## http 指定库名和表名
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

步骤:生成临时文件createFile,将数据写入临时文件mappedFile,执行execCurl, 删除临时文件deleteFile (简化版)


    /**
     * 创建临时内存文件
     * @param fileName
     * @throws IOException
     */
    public static void createFile(String fileName) throws IOException {

        File testFile = new File(fileName);
        File fileParent = testFile.getParentFile();

        if (!fileParent.exists()) {
            fileParent.mkdirs();
        }
        if (!testFile.exists())
            testFile.createNewFile();
    }

    /**
     * 删除临时内存文件
     * @param fileName
     * @return
     */
    public static boolean deleteFile(String fileName) {
        boolean flag = false;
        File file = new File(fileName);
        // 路径为文件且不为空则进行删除
        if (file.isFile() && file.exists()) {
            file.delete();
            flag = true;
        }
        return flag;
    }
    
    /**
     * 写入内存文件
     * @param data
     * @param path
     */
    public static void mappedFile(String data, String path) {

        CharBuffer charBuffer = CharBuffer.wrap(data);

        try {
            FileChannel fileChannel = FileChannel.open(Paths.get(path), StandardOpenOption.READ, StandardOpenOption.WRITE,
                    StandardOpenOption.TRUNCATE_EXISTING);

            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.getBytes().length*4);

            if (mappedByteBuffer != null) {
                mappedByteBuffer.clear();
                mappedByteBuffer.put(Charset.forName("UTF-8").encode(charBuffer));
            }
            fileChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    /**
     * 执行curl
     * @param curl
     * @return
     */
    public static String execCurl(String[] curl) {

        ProcessBuilder process = new ProcessBuilder(curl);
        Process p;
        try {
            p = process.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
            StringBuilder builder = new StringBuilder();
            String line = null;
            while ((line = reader.readLine()) != null) {
                builder.append(line);
                builder.append(System.getProperty("line.separator"));
            }
            return builder.toString();

        } catch (IOException e) {
            System.out.print("error");
            e.printStackTrace();
        }
        return null;

    }
    /**
     * 生成Culr
     * @param filePath
     * @param databases
     * @param table
     * @return
     */
    public static String[] createCurl(String filePath, String databases, String table){
        String[] curl = {"curl","--location-trusted", "-u", "用户名:密码", "-T",filePath, "-H","format: json", "-H", "strip_outer_array: true", "http://doris_fe:8030/api/"+databases+"/"+table+"/_stream_load"};
        
        return curl;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

flink

实现自定义Sink比较简单,这里就简单的分享一下我的怎么写的(简化版)。

class LogCurlSink(insertTimenterval:Long,
                  insertBatchSize:Int) extends RichSinkFunction[(String, Int, Long, String)] with Serializable{
  private val Logger = LoggerFactory.getLogger(this.getClass)
  private val mesList = new java.util.ArrayList[String]()
  private var lastInsertTime = 0L
  
  override def open(parameters: Configuration): Unit ={
    val path = s"/tmp/doris/{databases}/{table}/{ThreadId}"
    CurlUtils.createFile(path)
    Logger.warn(s"init and create $topic filePath!!!")
  }
  
  	// (topic,partition,offset,jsonstr)
   override def invoke(value: (String, Int, Long, String), context: SinkFunction.Context[_]): Unit = {
    if(mesList.size >= this.insertBatchSize || isTimeToDoInsert){
      //存入
      insertData(mesList)
      //此处可以进行受到维护offset
      mesList.clear()
      this.lastInsertTime = System.currentTimeMillis()
    }
    mesList.add(value._4)
  }
  
  
  override def close(): Unit = {
    val path = s"/tmp/doris/{databases}/{table}/{ThreadId}"
    CurlUtils.deleteFile(path)
    Logger.warn("close and delete filePath!!!")
  }

  /**
    * 执行插入操作
    * @param dataList
    */
  private def insertData(dataList: java.util.ArrayList[String]): Unit ={}
  /**
    * 根据时间判断是否插入数据
    *
    * @return
    */
  private def isTimeToDoInsert = {
    val currTime = System.currentTimeMillis
    currTime - this.lastInsertTime >= this.insertCkTimenterval
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/504480
推荐阅读
相关标签
  

闽ICP备14008679号