当前位置:   article > 正文

Apache Doris Binlog Load使用方法及示例_doris jdbc连接 batch update

doris jdbc连接 batch update

 Apache Doris 代码仓库地址:apache/incubator-doris 欢迎大家关注加星
 


Binlog Load提供了一种使Doris增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能,使用户更方面的完成Mysql数据的导入

注意:
该功能需要在0.15及以后的版本里使用

1. 安装配置 Mysql

  1. 安装Mysql
    快速使用Docker安装配置Mysql,具体参照下面的连接
    https://segmentfault.com/a/1190000021523570
    如果是在物理机上安装可以参考下面的连接:
    在 CentOS 7 中安装 MySQL 8 的教程详解
  2. 开启Mysql binlog
    进入 Docker 容器或者物理机上修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,
    log_bin=mysql_bin
    binlog-format=Row
    server-id=1
    然后重启Mysql
    systemctl restart mysqld
  3. 创建 Mysql 表
    create database demo;

    CREATE TABLE `test_cdc` (
    `id` int NOT NULL AUTO_INCREMENT,
    `sex` TINYINT(1) DEFAULT NULL,
    `name` varchar(20) DEFAULT NULL,
    `address` varchar(255) DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB

2. 安装配置Canal

下载canal-1.1.5: https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

  1. 解压Canal到指定目录:
    tar zxvf canal.deployer-1.1.5.tar.gz -C ./canal
  2. 在conf文件夹下新建目录并重命名,作为instance的根目录,目录名你可以自己命名便于识别即可
    例如我这里的命名是和我的数据库库名一致:demo
    vi conf/demo/instance.properties
    下面给出的是一个我的示例配置:
    这里面的参数说明请参考Canal官方文档:QuickStart
  1. #################################################
  2. ## mysql serverId , v1.0.26+ will autoGen
  3. canal.instance.mysql.slaveId=12115
  4. # enable gtid use true/false
  5. canal.instance.gtidon=false
  6. # position info
  7. canal.instance.master.address=10.220.146.11:3306
  8. canal.instance.master.journal.name=
  9. canal.instance.master.position=
  10. canal.instance.master.timestamp=
  11. canal.instance.master.gtid=
  12. # rds oss binlog
  13. canal.instance.rds.accesskey=
  14. canal.instance.rds.secretkey=
  15. canal.instance.rds.instanceId=
  16. # table meta tsdb info
  17. canal.instance.tsdb.enable=true
  18. #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
  19. #canal.instance.tsdb.dbUsername=canal
  20. #canal.instance.tsdb.dbPassword=canal
  21. #canal.instance.standby.address =
  22. #canal.instance.standby.journal.name =
  23. #canal.instance.standby.position =
  24. #canal.instance.standby.timestamp =
  25. #canal.instance.standby.gtid=
  26. # username/password
  27. canal.instance.dbUsername=zhangfeng
  28. canal.instance.dbPassword=zhangfeng800729)(*Q
  29. canal.instance.connectionCharset = UTF-8
  30. # enable druid Decrypt database password
  31. canal.instance.enableDruid=false
  32. #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
  33. # table regex
  34. canal.instance.filter.regex=demo\\..*
  35. # table black regex
  36. canal.instance.filter.black.regex=
  37. # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  38. #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
  39. # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  40. #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
  41. # mq config
  42. #canal.mq.topic=
  43. # dynamic topic route by schema or table regex
  44. #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
  45. #canal.mq.partition=0
  46. # hash partition config
  47. #canal.mq.partitionsNum=3
  48. #canal.mq.partitionHash=test.table:id^name,.*\\..*
  49. #################################################
  1. 启动Canal
    sh bin/startup.sh
注意:canal instance user/passwd
1.1.5 版本,在canal.properties里加上这两个配置
canal.user = canal canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
默认密码为canal/canal,canal.passwd的密码值可以通过select password("xxx") 来获取
  1. 验证是否启动成功
 tail -200f logs/demo/demo.log

3.开始同步数据

3.1 创建Doris目标表

用户需要先在Doris端创建好与Mysql端对应的目标表

Binlog Load只能支持Unique类型的目标表,且必须激活目标表的Batch Delete功能。

开启Batch Delete的方法可以参考help alter table中的批量删除功能。

  1. CREATE TABLE `doris_mysql_binlog_demo` (
  2. `id` int NOT NULL,
  3. `sex` TINYINT(1),
  4. `name` varchar(20),
  5. `address` varchar(255)
  6. ) ENGINE=OLAP
  7. UNIQUE KEY(`id`,sex)
  8. COMMENT "OLAP"
  9. DISTRIBUTED BY HASH(`sex`) BUCKETS 1
  10. PROPERTIES (
  11. "replication_allocation" = "tag.location.default: 3",
  12. "in_memory" = "false",
  13. "storage_format" = "V2"
  14. );
  15. -- enable batch delete
  16. ALTER TABLE test_2.doris_mysql_binlog_demo ENABLE FEATURE "BATCH_DELETE";

3.1 创建同步作业

3.1.1 Create Sync Job 语法说明

Name: 'CREATE SYNC JOB' Description:

数据同步(Sync Job)功能,支持用户提交一个常驻的数据同步作业,通过从指定的远端地址读取Binlog日志,增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能。 目前数据同步作业只支持对接Canal,从Canal Server上获取解析好的Binlog数据,导入到Doris内。 用户可通过 SHOW SYNC JOB 查看数据同步作业状态。 语法:

  1. CREATE SYNC [db.]job_name
  2. (
  3. channel_desc,
  4. channel_desc
  5. ...
  6. )
  7. binlog_desc
  1. job_name
    同步作业名称,是作业在当前数据库内的唯一标识,相同job_name的作业只能有一个在运行。
  2. channel_desc
    作业下的数据通道,用来描述mysql源表到doris目标表的映射关系。
    语法:
    FROM mysql_db.src_tbl INTO des_tbl
    [partitions]
    [columns_mapping]
    1. mysql_db.src_tbl
      指定mysql端的数据库和源表。
    2. des_tbl
      指定doris端的目标表,只支持Unique表,且需开启表的batch delete功能(开启方法请看help alter table的'批量删除功能')。
    3. partitions
      指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
      示例:
      PARTITION(p1, p2, p3)
    4. column_mapping
      指定mysql源表和doris目标表的列之间的映射关系。如果不指定,FE会默认源表和目标表的列按顺序一一对应。
      不支持 col_name = expr 的形式表示列。
      示例:
      假设目标表列为(k1, k2, v1),

      改变列k1和k2的顺序
      COLUMNS(k2, k1, v1)

      忽略源数据的第四列
      COLUMNS(k2, k1, v1, dummy_column)

  1. binlog_desc
    用来描述远端数据源,目前仅支持canal一种。
    语法:
    FROM BINLOG
    (
    "key1" = "value1",
    "key2" = "value2"
    )
    1. Canal 数据源对应的属性,以canal.为前缀
      1. canal.server.ip: canal server的地址
      2. canal.server.port: canal server的端口
      3. canal.destination: instance的标识
      4. canal.batchSize: 获取的batch大小的最大值,默认8192
      5. canal.username: instance的用户名
      6. canal.password: instance的密码
      7. canal.debug: 可选,设置为true时,会将batch和每一行数据的详细信息都打印出来 Examples:

  1. 简单为 test_db 的 test_tbl 创建一个名为 job1 的数据同步作业,连接本地的Canal服务器,对应Mysql源表 mysql_db1.tbl1
    CREATE SYNC `test_db`.`job1`
    (
    FROM `mysql_db1`.`tbl1` INTO `test_tbl `
    )
    FROM BINLOG
    (
    "type" = "canal",
    "canal.server.ip" = "127.0.0.1",
    "canal.server.port" = "11111",
    "canal.destination" = "example",
    "canal.username" = "",
    "canal.password" = ""
    );
  2. 为 test_db 的多张表创建一个名为 job1 的数据同步作业,一一对应多张Mysql源表,并显式的指定列映射。
    CREATE SYNC `test_db`.`job1`
    (
    FROM `mysql_db`.`t1` INTO `test1` COLUMNS(k1, k2, v1) PARTITIONS (p1, p2),
    FROM `mysql_db`.`t2` INTO `test2` COLUMNS(k3, k4, v2) PARTITION p1
    )
    FROM BINLOG
    (
    "type" = "canal",
    "canal.server.ip" = "xx.xxx.xxx.xx",
    "canal.server.port" = "12111",
    "canal.destination" = "example",
    "canal.username" = "username",
    "canal.password" = "password"
    );

3.1.2 开始同步mysql表里数据到Doris

注意:
创建同步任务之前,首先要在fe.conf里配置enable_create_sync_job=true,这个默认是false不启用,否则就不能创建同步任务
  1. CREATE SYNC test_2.doris_mysql_binlog_demo_job
  2. (
  3. FROM demo.test_cdc INTO doris_mysql_binlog_demo
  4. )
  5. FROM BINLOG
  6. (
  7. "type" = "canal",
  8. "canal.server.ip" = "10.220.146.10",
  9. "canal.server.port" = "11111",
  10. "canal.destination" = "demo",
  11. "canal.username" = "canal",
  12. "canal.password" = "canal"
  13. );

3.1.3 查看同步任务

 SHOW SYNC JOB from test_2;

3.1.4 查看表里的数据

 select * from doris_mysql_binlog_demo;

3.1.5 删除数据

我们在Mysql 数据表里删除数据,然后看Doris表里的变化

 delete from test_cdc where id in (12,13)

我们在去看Doris表里,id是12,13这两条数据已经被删除

3.1.6 多表同步

多表同步只需要像下面这样写法就可以了

  1. CREATE SYNC test_2.doris_mysql_binlog_demo_job
  2. (
  3. FROM demo.test_cdc INTO doris_mysql_binlog_demo,
  4. FROM demo.test_cdc_1 INTO doris_mysql_binlog_demo,
  5. FROM demo.test_cdc_2 INTO doris_mysql_binlog_demo,
  6. FROM demo.test_cdc_3 INTO doris_mysql_binlog_demo
  7. )

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/625274
推荐阅读
相关标签
  

闽ICP备14008679号