当前位置:   article > 正文

springboot集成starrocks、以及采用flink实现mysql与starrocks亚秒级同步_spring+starrocks

spring+starrocks

一、application.yml

(因采用dynamic-datasource-spring-boot-starter动态数据源,所以才是以下配置文件的样式,像redis,druid根据自己情况导入依赖)

这个配置文件的场景是把starrocks当成slave库在用。某些大数据慢查询就走starrocks

就这样配置好后就可把starrocks当mysql用了

  1. # spring配置
  2. spring:
  3. redis:
  4. host: localhost
  5. port: 6379
  6. password:
  7. datasource:
  8. druid:
  9. stat-view-servlet:
  10. enabled: true
  11. loginUsername: admin
  12. loginPassword: 123456
  13. dynamic:
  14. druid:
  15. initial-size: 5
  16. min-idle: 5
  17. maxActive: 20
  18. maxWait: 60000
  19. timeBetweenEvictionRunsMillis: 60000
  20. minEvictableIdleTimeMillis: 300000
  21. validationQuery: SELECT 1
  22. testWhileIdle: true
  23. testOnBorrow: false
  24. testOnReturn: false
  25. poolPreparedStatements: true
  26. maxPoolPreparedStatementPerConnectionSize: 20
  27. filters: stat,slf4j
  28. connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
  29. datasource:
  30. # 主库数据源
  31. master:
  32. driver-class-name: com.mysql.cj.jdbc.Driver
  33. url: jdbc:mysql://xx.xx.xx.xx:3306/inst_ops?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
  34. username: root
  35. password: 123456
  36. # 从库数据源
  37. slave:
  38. driver-class-name: com.mysql.cj.jdbc.Driver
  39. username: root
  40. password:
  41. url: jdbc:mysql://xx.xx.xx.xx:9030/inst_ops?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&autoReconnect=true&failOverReadOnly=false
  42. seata: true
  43. # mybatis配置
  44. # mybatis:
  45. # # 搜索指定包别名
  46. # typeAliasesPackage: com.ruoyi.stream
  47. # # 配置mapper的扫描,找到所有的mapper.xml映射文件
  48. # mapperLocations: classpath:mapper/**/*.xml
  49. # swagger配置
  50. swagger:
  51. title: mongodb接口文档
  52. license: Powered By ruoyi
  53. licenseUrl: http://localhost:8080/doc.html

二、实验场景:查询mysql数据,使用insert into批量往starrocks插入数据

重点:采用这种方式有限制,插入几千条starrocks就不让你插入了,只用于demo测试

  1. package com.ruoyi.starrocks.test;
  2. import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
  3. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  4. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  5. import com.ruoyi.starrocks.domain.DevMc;
  6. import com.ruoyi.starrocks.mapper.DevMcMapper;
  7. import com.ruoyi.starrocks.service.IDevMcService;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.junit.jupiter.api.Test;
  10. import org.springframework.boot.test.context.SpringBootTest;
  11. import javax.annotation.Resource;
  12. import java.util.List;
  13. /**
  14. *
  15. * @author yh
  16. * @date 2023-03-24 10:50
  17. */
  18. @SpringBootTest
  19. @Slf4j
  20. public class StarrocksTest {
  21. public static final String NAME = "INSERT INTO `inst_ops`.dev_mc (id,sign_id,`year`,oid,ono,prov,city,area,park,tmpl," +
  22. "dtm,over_dtm,over_state,mc_tag,mc_cid,mc_uid,mc_man,mc_day,mc_sta,mc_end,audt_cid,audt_tag_chk,audt_tag_ok," +
  23. "audt_dtm,audt_uid,audt_man,audt_inf,submit,version,c_id,to_upd) VALUES " ;
  24. @Resource
  25. private IDevMcService devMcService;
  26. @Resource
  27. private DevMcMapper mapper;
  28. @Test
  29. public void test1() {
  30. LambdaQueryWrapper<DevMc> last = Wrappers.lambdaQuery(DevMc.class);
  31. List<DevMc> list = devMcService.list(last);
  32. System.err.println("查询完毕");
  33. DynamicDataSourceContextHolder.push("slave"); // 手动切换
  34. System.err.println("切换数据源成功---------------");
  35. StringBuilder sb = new StringBuilder();
  36. int count = 0;
  37. for (DevMc devMc : list) {
  38. count ++;
  39. sb.append("(");
  40. sb.append(devMc.getId()).append(",");
  41. sb.append(devMc.getSignId()).append(",");
  42. sb.append(devMc.getYear()).append(",");
  43. sb.append(devMc.getOid()).append(",");
  44. sb.append("'").append(devMc.getOno()).append("',");
  45. sb.append(devMc.getProv()).append(",");
  46. sb.append(devMc.getCity()).append(",");
  47. sb.append(devMc.getArea()).append(",");
  48. sb.append(devMc.getPark()).append(",");
  49. sb.append(devMc.getTmpl()).append(",");
  50. sb.append("'").append(devMc.getDtm()).append("',");
  51. sb.append("'").append(devMc.getOverDtm()).append("',");
  52. sb.append(devMc.getOverState()).append(",");
  53. sb.append(devMc.getMcTag()).append(",");
  54. sb.append(devMc.getMcCid()).append(",");
  55. sb.append("'").append(devMc.getMcUid()).append("',");
  56. sb.append("'").append(devMc.getMcMan()).append("',");
  57. sb.append("'").append(devMc.getMcDay()).append("',");
  58. sb.append("'").append(devMc.getMcSta()).append("',");
  59. sb.append("'").append(devMc.getMcEnd()).append("',");
  60. sb.append(devMc.getAudtCid()).append(",");
  61. sb.append(devMc.getAudtTagChk()).append(",");
  62. sb.append(devMc.getAudtTagOk()).append(",");
  63. sb.append("'").append(devMc.getAudtDtm()).append("',");
  64. sb.append(devMc.getAudtUid()).append(",");
  65. sb.append("'").append(devMc.getAudtMan()).append("',");
  66. sb.append("'").append(devMc.getAudtInf()).append("',");
  67. sb.append(devMc.getSubmit()).append(",");
  68. sb.append(devMc.getVersion()).append(",");
  69. sb.append(devMc.getCId()).append(",");
  70. sb.append(devMc.getToUpd());
  71. sb.append(")");
  72. if (count == 5000) {
  73. sb.append(";");
  74. StringBuilder insert = new StringBuilder(NAME);
  75. insert.append(sb.toString());
  76. mapper.insertBatch(insert.toString());
  77. count = 0;
  78. sb.setLength(0);
  79. } else {
  80. sb.append(",");
  81. }
  82. }
  83. if (sb.length() > 0) {
  84. StringBuilder insert = new StringBuilder(NAME);
  85. insert.append(sb.toString().substring(0, sb.length()-1));
  86. insert.append(";");
  87. mapper.insertBatch(insert.toString());
  88. count = 0;
  89. sb.setLength(0);
  90. }
  91. }
  92. @Test
  93. public void test2() {
  94. DevMc devMc = devMcService.getById(1558);
  95. System.err.println("查询完毕");
  96. DynamicDataSourceContextHolder.push("slave"); // 手动切换
  97. System.err.println("切换数据源成功---------------");
  98. devMcService.save(devMc);
  99. }
  100. }

三、使用flink往starrocks导入数据,并实现亚秒级同步

第3.4章:StarRocks数据导入--Flink Connector与CDC秒级数据同步_flink-connector-starrocks_流木随风的博客-CSDN博客

所需文件:注意!有些jar包在外网没得,只有阿里云仓库有。而且搜索的出来的jar包与博客展示的名称有细微差别。按博客的名称搜根本搜不出来,所以我觉得是他文档好久没更新了

flink版本:

flink-1.13.5-bin-scala_2.11.tgz、

需要jar包:

flink-connector-starrocks-1.1.14_flink-1.13_2.11.jar

flink-sql-connector-mysql-cdc-2.0.2.jar

flink-connector-jdbc_2.11-1.13.5.jar

jar包阿里网盘下载连接:阿里云盘分享

步骤就不用说了,按那个博客进行操作

四、相关建表语句(私人保留记录用,跟文章无关)

1、starrocks建库语句

create database inst_ops;

2、starrocks建表语句

  1. CREATE TABLE `dev_mc` (
  2. `id` bigint NOT NULL COMMENT '维保流水号',
  3. `sign_id` bigint NOT NULL COMMENT '签到ID',
  4. `year` int NOT NULL COMMENT '维保年份',
  5. `oid` int NOT NULL COMMENT '设备编号',
  6. `ono` varchar(90) NOT NULL DEFAULT '' COMMENT '设备号',
  7. `prov` int NOT NULL COMMENT '省份',
  8. `city` int NOT NULL COMMENT '城市',
  9. `area` int NOT NULL,
  10. `park` int NOT NULL,
  11. `tmpl` int NOT NULL COMMENT '模板编号',
  12. `dtm` varchar(57) NOT NULL DEFAULT '' COMMENT '时间-维保最迟时间',
  13. `over_dtm` varchar(57) NOT NULL DEFAULT '' COMMENT '维保超期时间',
  14. `over_state` tinyint(1) NOT NULL COMMENT '超期是否计算',
  15. `mc_tag` tinyint(1) NOT NULL COMMENT '是否维保',
  16. `mc_cid` int NOT NULL COMMENT '维保单位',
  17. `mc_uid` varchar(90) NOT NULL DEFAULT '0' COMMENT '维保人编号',
  18. `mc_man` varchar(90) NOT NULL DEFAULT '' COMMENT '维保人姓名',
  19. `mc_day` varchar(33) NOT NULL DEFAULT '' COMMENT '保养日期(xxxx年xx月xx日)',
  20. `mc_sta` varchar(57) NOT NULL DEFAULT '' COMMENT '维保开始时间(xx时xx分)',
  21. `mc_end` varchar(57) NOT NULL DEFAULT '' COMMENT '维保结束时间(xx时xx分)',
  22. `audt_cid` int NOT NULL COMMENT '审核公司ID',
  23. `audt_tag_chk` tinyint(1) NOT NULL COMMENT '使用单位是否审核',
  24. `audt_tag_ok` tinyint(1) NOT NULL COMMENT '使用单位审核结果:通过/不通过',
  25. `audt_dtm` varchar(57) NOT NULL DEFAULT '' COMMENT '审核日期',
  26. `audt_uid` int NOT NULL COMMENT '使用单位审核人',
  27. `audt_man` varchar(90) NOT NULL COMMENT '使用单位审核人',
  28. `audt_inf` varchar(150) NOT NULL COMMENT '使用单位审核结果',
  29. `submit` tinyint(1) NOT NULL COMMENT '是否提交物业审核',
  30. `version` int NOT NULL COMMENT '版本号(1判断审核后是否更新)',
  31. `c_id` bigint NOT NULL COMMENT '96333返回ID',
  32. `to_upd` tinyint NULL COMMENT '是否要同步(0否1是)'
  33. ) PRIMARY KEY (id)
  34. DISTRIBUTED BY HASH(id) BUCKETS 4
  35. PROPERTIES("replication_num" = "1",
  36. "enable_persistent_index" = "true");

3、flink sql——mysql、starrocks的建表映射语句

mysql:

  1. CREATE TABLE source_mysql_devmc (
  2. id BIGINT,
  3. sign_id BIGINT,
  4. `year` int ,
  5. `oid` int ,
  6. `ono` STRING ,
  7. `prov` int ,
  8. `city` int,
  9. `area` int,
  10. `park` int ,
  11. `tmpl` int ,
  12. `dtm` STRING ,
  13. `over_dtm` STRING ,
  14. `over_state` BOOLEAN ,
  15. `mc_tag` BOOLEAN ,
  16. `mc_cid` int ,
  17. `mc_uid` STRING ,
  18. `mc_man` STRING ,
  19. `mc_day` STRING ,
  20. `mc_sta` STRING ,
  21. `mc_end` STRING ,
  22. `audt_cid` int ,
  23. `audt_tag_chk` BOOLEAN ,
  24. `audt_tag_ok` BOOLEAN ,
  25. `audt_dtm` STRING ,
  26. `audt_uid` int ,
  27. `audt_man` STRING ,
  28. `audt_inf` STRING ,
  29. `submit` BOOLEAN ,
  30. `version` int ,
  31. `c_id` bigint ,
  32. `to_upd` tinyint ,
  33. PRIMARY KEY (id) NOT ENFORCED
  34. )WITH (
  35. 'connector' = 'jdbc',
  36. 'url' = 'jdbc:mysql://localhost:3306/inst_ops',
  37. 'table-name' = 'dev_mc',
  38. 'username' = 'root',
  39. 'password' = '123456'
  40. );

starrocks:

  1. CREATE TABLE sink_starrocks_devmc (
  2. id BIGINT,
  3. sign_id BIGINT,
  4. `year` int ,
  5. `oid` int ,
  6. `ono` STRING ,
  7. `prov` int ,
  8. `city` int,
  9. `area` int,
  10. `park` int ,
  11. `tmpl` int ,
  12. `dtm` STRING ,
  13. `over_dtm` STRING ,
  14. `over_state` BOOLEAN ,
  15. `mc_tag` BOOLEAN ,
  16. `mc_cid` int ,
  17. `mc_uid` STRING ,
  18. `mc_man` STRING ,
  19. `mc_day` STRING ,
  20. `mc_sta` STRING ,
  21. `mc_end` STRING ,
  22. `audt_cid` int ,
  23. `audt_tag_chk` BOOLEAN ,
  24. `audt_tag_ok` BOOLEAN ,
  25. `audt_dtm` STRING ,
  26. `audt_uid` int ,
  27. `audt_man` STRING ,
  28. `audt_inf` STRING ,
  29. `submit` BOOLEAN ,
  30. `version` int ,
  31. `c_id` bigint ,
  32. `to_upd` tinyint ,
  33. PRIMARY KEY (id) NOT ENFORCED
  34. )WITH (
  35. 'connector' = 'starrocks',
  36. 'jdbc-url'='jdbc:mysql://localhost:9030',
  37. 'load-url'='localhost:8030',
  38. 'database-name' = 'inst_ops',
  39. 'table-name' = 'dev_mc',
  40. 'username' = 'root',
  41. 'password' = '',
  42. 'sink.buffer-flush.interval-ms' = '5000',
  43. 'sink.properties.column_separator' = '\x01',
  44. 'sink.properties.row_delimiter' = '\x02'
  45. );

需要注意的点:因为我mysql与starrocks都是用docker启动的,所以ip是localhost。用公网ip就会连不上数据库,这是因为docker自身的问题,他们说用红帽版的docker就没有这个问题

4、同步数据语句

insert into sink_starrocks_devmc select * from source_mysql_devmc limit 300000;

5、亚秒级同步(只需要修改mysql映射)

  1. CREATE TABLE source_mysql_devmc (
  2. id BIGINT,
  3. sign_id BIGINT,
  4. `year` int ,
  5. `oid` int ,
  6. `ono` STRING ,
  7. `prov` int ,
  8. `city` int,
  9. `area` int,
  10. `park` int ,
  11. `tmpl` int ,
  12. `dtm` STRING ,
  13. `over_dtm` STRING ,
  14. `over_state` BOOLEAN ,
  15. `mc_tag` BOOLEAN ,
  16. `mc_cid` int ,
  17. `mc_uid` STRING ,
  18. `mc_man` STRING ,
  19. `mc_day` STRING ,
  20. `mc_sta` STRING ,
  21. `mc_end` STRING ,
  22. `audt_cid` int ,
  23. `audt_tag_chk` BOOLEAN ,
  24. `audt_tag_ok` BOOLEAN ,
  25. `audt_dtm` STRING ,
  26. `audt_uid` int ,
  27. `audt_man` STRING ,
  28. `audt_inf` STRING ,
  29. `submit` BOOLEAN ,
  30. `version` int ,
  31. `c_id` bigint ,
  32. `to_upd` tinyint ,
  33. PRIMARY KEY (id) NOT ENFORCED
  34. )WITH (
  35. 'connector' = 'mysql-cdc',
  36. 'hostname' = 'localhost',
  37. 'port' = '3306',
  38. 'username' = 'root',
  39. 'password' = '123456',
  40. 'database-name' = 'inst_ops',
  41. 'scan.incremental.snapshot.enabled'='false',
  42. 'table-name' = 'dev_mc'
  43. );

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/381301
推荐阅读
相关标签
  

闽ICP备14008679号