赞
踩
(因采用dynamic-datasource-spring-boot-starter动态数据源,所以才是以下配置文件的样式,像redis,druid根据自己情况导入依赖)
这个配置文件的场景是把starrocks当成slave库在用。某些大数据慢查询就走starrocks
就这样配置好后就可把starrocks当mysql用了
- # spring配置
- spring:
- redis:
- host: localhost
- port: 6379
- password:
- datasource:
- druid:
- stat-view-servlet:
- enabled: true
- loginUsername: admin
- loginPassword: 123456
- dynamic:
- druid:
- initial-size: 5
- min-idle: 5
- maxActive: 20
- maxWait: 60000
- timeBetweenEvictionRunsMillis: 60000
- minEvictableIdleTimeMillis: 300000
- validationQuery: SELECT 1
- testWhileIdle: true
- testOnBorrow: false
- testOnReturn: false
- poolPreparedStatements: true
- maxPoolPreparedStatementPerConnectionSize: 20
- filters: stat,slf4j
- connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
- datasource:
- # 主库数据源
- master:
- driver-class-name: com.mysql.cj.jdbc.Driver
- url: jdbc:mysql://xx.xx.xx.xx:3306/inst_ops?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
- username: root
- password: 123456
- # 从库数据源
- slave:
- driver-class-name: com.mysql.cj.jdbc.Driver
- username: root
- password:
- url: jdbc:mysql://xx.xx.xx.xx:9030/inst_ops?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&autoReconnect=true&failOverReadOnly=false
- seata: true
-
- # mybatis配置
- # mybatis:
- # # 搜索指定包别名
- # typeAliasesPackage: com.ruoyi.stream
- # # 配置mapper的扫描,找到所有的mapper.xml映射文件
- # mapperLocations: classpath:mapper/**/*.xml
- # swagger配置
- swagger:
- title: mongodb接口文档
- license: Powered By ruoyi
- licenseUrl: http://localhost:8080/doc.html
重点:采用这种方式有限制,插入几千条starrocks就不让你插入了,只用于demo测试
- package com.ruoyi.starrocks.test;
-
- import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.ruoyi.starrocks.domain.DevMc;
- import com.ruoyi.starrocks.mapper.DevMcMapper;
- import com.ruoyi.starrocks.service.IDevMcService;
- import lombok.extern.slf4j.Slf4j;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import javax.annotation.Resource;
- import java.util.List;
-
- /**
- *
- * @author yh
- * @date 2023-03-24 10:50
- */
- @SpringBootTest
- @Slf4j
- public class StarrocksTest {
-
- public static final String NAME = "INSERT INTO `inst_ops`.dev_mc (id,sign_id,`year`,oid,ono,prov,city,area,park,tmpl," +
- "dtm,over_dtm,over_state,mc_tag,mc_cid,mc_uid,mc_man,mc_day,mc_sta,mc_end,audt_cid,audt_tag_chk,audt_tag_ok," +
- "audt_dtm,audt_uid,audt_man,audt_inf,submit,version,c_id,to_upd) VALUES " ;
-
- @Resource
- private IDevMcService devMcService;
- @Resource
- private DevMcMapper mapper;
-
- @Test
- public void test1() {
- LambdaQueryWrapper<DevMc> last = Wrappers.lambdaQuery(DevMc.class);
- List<DevMc> list = devMcService.list(last);
- System.err.println("查询完毕");
- DynamicDataSourceContextHolder.push("slave"); // 手动切换
- System.err.println("切换数据源成功---------------");
- StringBuilder sb = new StringBuilder();
- int count = 0;
- for (DevMc devMc : list) {
- count ++;
- sb.append("(");
- sb.append(devMc.getId()).append(",");
- sb.append(devMc.getSignId()).append(",");
- sb.append(devMc.getYear()).append(",");
- sb.append(devMc.getOid()).append(",");
- sb.append("'").append(devMc.getOno()).append("',");
- sb.append(devMc.getProv()).append(",");
- sb.append(devMc.getCity()).append(",");
- sb.append(devMc.getArea()).append(",");
- sb.append(devMc.getPark()).append(",");
- sb.append(devMc.getTmpl()).append(",");
- sb.append("'").append(devMc.getDtm()).append("',");
- sb.append("'").append(devMc.getOverDtm()).append("',");
- sb.append(devMc.getOverState()).append(",");
- sb.append(devMc.getMcTag()).append(",");
- sb.append(devMc.getMcCid()).append(",");
- sb.append("'").append(devMc.getMcUid()).append("',");
- sb.append("'").append(devMc.getMcMan()).append("',");
- sb.append("'").append(devMc.getMcDay()).append("',");
- sb.append("'").append(devMc.getMcSta()).append("',");
- sb.append("'").append(devMc.getMcEnd()).append("',");
- sb.append(devMc.getAudtCid()).append(",");
- sb.append(devMc.getAudtTagChk()).append(",");
- sb.append(devMc.getAudtTagOk()).append(",");
- sb.append("'").append(devMc.getAudtDtm()).append("',");
- sb.append(devMc.getAudtUid()).append(",");
- sb.append("'").append(devMc.getAudtMan()).append("',");
- sb.append("'").append(devMc.getAudtInf()).append("',");
- sb.append(devMc.getSubmit()).append(",");
- sb.append(devMc.getVersion()).append(",");
- sb.append(devMc.getCId()).append(",");
- sb.append(devMc.getToUpd());
- sb.append(")");
- if (count == 5000) {
- sb.append(";");
- StringBuilder insert = new StringBuilder(NAME);
- insert.append(sb.toString());
- mapper.insertBatch(insert.toString());
- count = 0;
- sb.setLength(0);
- } else {
- sb.append(",");
- }
- }
-
- if (sb.length() > 0) {
- StringBuilder insert = new StringBuilder(NAME);
- insert.append(sb.toString().substring(0, sb.length()-1));
- insert.append(";");
- mapper.insertBatch(insert.toString());
- count = 0;
- sb.setLength(0);
- }
-
- }
-
- @Test
- public void test2() {
- DevMc devMc = devMcService.getById(1558);
- System.err.println("查询完毕");
- DynamicDataSourceContextHolder.push("slave"); // 手动切换
- System.err.println("切换数据源成功---------------");
- devMcService.save(devMc);
- }
- }
第3.4章:StarRocks数据导入--Flink Connector与CDC秒级数据同步_flink-connector-starrocks_流木随风的博客-CSDN博客
所需文件:注意!有些jar包在外网没得,只有阿里云仓库有。而且搜索的出来的jar包与博客展示的名称有细微差别。按博客的名称搜根本搜不出来,所以我觉得是他文档好久没更新了
flink版本:
flink-1.13.5-bin-scala_2.11.tgz、
需要jar包:
flink-connector-starrocks-1.1.14_flink-1.13_2.11.jar
flink-sql-connector-mysql-cdc-2.0.2.jar
flink-connector-jdbc_2.11-1.13.5.jar
jar包阿里网盘下载连接:阿里云盘分享
步骤就不用说了,按那个博客进行操作
1、starrocks建库语句
create database inst_ops;
2、starrocks建表语句
- CREATE TABLE `dev_mc` (
- `id` bigint NOT NULL COMMENT '维保流水号',
- `sign_id` bigint NOT NULL COMMENT '签到ID',
- `year` int NOT NULL COMMENT '维保年份',
- `oid` int NOT NULL COMMENT '设备编号',
- `ono` varchar(90) NOT NULL DEFAULT '' COMMENT '设备号',
- `prov` int NOT NULL COMMENT '省份',
- `city` int NOT NULL COMMENT '城市',
- `area` int NOT NULL,
- `park` int NOT NULL,
- `tmpl` int NOT NULL COMMENT '模板编号',
- `dtm` varchar(57) NOT NULL DEFAULT '' COMMENT '时间-维保最迟时间',
- `over_dtm` varchar(57) NOT NULL DEFAULT '' COMMENT '维保超期时间',
- `over_state` tinyint(1) NOT NULL COMMENT '超期是否计算',
- `mc_tag` tinyint(1) NOT NULL COMMENT '是否维保',
- `mc_cid` int NOT NULL COMMENT '维保单位',
- `mc_uid` varchar(90) NOT NULL DEFAULT '0' COMMENT '维保人编号',
- `mc_man` varchar(90) NOT NULL DEFAULT '' COMMENT '维保人姓名',
- `mc_day` varchar(33) NOT NULL DEFAULT '' COMMENT '保养日期(xxxx年xx月xx日)',
- `mc_sta` varchar(57) NOT NULL DEFAULT '' COMMENT '维保开始时间(xx时xx分)',
- `mc_end` varchar(57) NOT NULL DEFAULT '' COMMENT '维保结束时间(xx时xx分)',
- `audt_cid` int NOT NULL COMMENT '审核公司ID',
- `audt_tag_chk` tinyint(1) NOT NULL COMMENT '使用单位是否审核',
- `audt_tag_ok` tinyint(1) NOT NULL COMMENT '使用单位审核结果:通过/不通过',
- `audt_dtm` varchar(57) NOT NULL DEFAULT '' COMMENT '审核日期',
- `audt_uid` int NOT NULL COMMENT '使用单位审核人',
- `audt_man` varchar(90) NOT NULL COMMENT '使用单位审核人',
- `audt_inf` varchar(150) NOT NULL COMMENT '使用单位审核结果',
- `submit` tinyint(1) NOT NULL COMMENT '是否提交物业审核',
- `version` int NOT NULL COMMENT '版本号(1判断审核后是否更新)',
- `c_id` bigint NOT NULL COMMENT '96333返回ID',
- `to_upd` tinyint NULL COMMENT '是否要同步(0否1是)'
- ) PRIMARY KEY (id)
- DISTRIBUTED BY HASH(id) BUCKETS 4
- PROPERTIES("replication_num" = "1",
- "enable_persistent_index" = "true");
3、flink sql——mysql、starrocks的建表映射语句
mysql:
- CREATE TABLE source_mysql_devmc (
- id BIGINT,
- sign_id BIGINT,
- `year` int ,
- `oid` int ,
- `ono` STRING ,
- `prov` int ,
- `city` int,
- `area` int,
- `park` int ,
- `tmpl` int ,
- `dtm` STRING ,
- `over_dtm` STRING ,
- `over_state` BOOLEAN ,
- `mc_tag` BOOLEAN ,
- `mc_cid` int ,
- `mc_uid` STRING ,
- `mc_man` STRING ,
- `mc_day` STRING ,
- `mc_sta` STRING ,
- `mc_end` STRING ,
- `audt_cid` int ,
- `audt_tag_chk` BOOLEAN ,
- `audt_tag_ok` BOOLEAN ,
- `audt_dtm` STRING ,
- `audt_uid` int ,
- `audt_man` STRING ,
- `audt_inf` STRING ,
- `submit` BOOLEAN ,
- `version` int ,
- `c_id` bigint ,
- `to_upd` tinyint ,
- PRIMARY KEY (id) NOT ENFORCED
- )WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://localhost:3306/inst_ops',
- 'table-name' = 'dev_mc',
- 'username' = 'root',
- 'password' = '123456'
- );
starrocks:
- CREATE TABLE sink_starrocks_devmc (
- id BIGINT,
- sign_id BIGINT,
- `year` int ,
- `oid` int ,
- `ono` STRING ,
- `prov` int ,
- `city` int,
- `area` int,
- `park` int ,
- `tmpl` int ,
- `dtm` STRING ,
- `over_dtm` STRING ,
- `over_state` BOOLEAN ,
- `mc_tag` BOOLEAN ,
- `mc_cid` int ,
- `mc_uid` STRING ,
- `mc_man` STRING ,
- `mc_day` STRING ,
- `mc_sta` STRING ,
- `mc_end` STRING ,
- `audt_cid` int ,
- `audt_tag_chk` BOOLEAN ,
- `audt_tag_ok` BOOLEAN ,
- `audt_dtm` STRING ,
- `audt_uid` int ,
- `audt_man` STRING ,
- `audt_inf` STRING ,
- `submit` BOOLEAN ,
- `version` int ,
- `c_id` bigint ,
- `to_upd` tinyint ,
- PRIMARY KEY (id) NOT ENFORCED
- )WITH (
- 'connector' = 'starrocks',
- 'jdbc-url'='jdbc:mysql://localhost:9030',
- 'load-url'='localhost:8030',
- 'database-name' = 'inst_ops',
- 'table-name' = 'dev_mc',
- 'username' = 'root',
- 'password' = '',
- 'sink.buffer-flush.interval-ms' = '5000',
- 'sink.properties.column_separator' = '\x01',
- 'sink.properties.row_delimiter' = '\x02'
- );
需要注意的点:因为我mysql与starrocks都是用docker启动的,所以ip是localhost。用公网ip就会连不上数据库,这是因为docker自身的问题,他们说用红帽版的docker就没有这个问题
4、同步数据语句
insert into sink_starrocks_devmc select * from source_mysql_devmc limit 300000;
5、亚秒级同步(只需要修改mysql映射)
- CREATE TABLE source_mysql_devmc (
- id BIGINT,
- sign_id BIGINT,
- `year` int ,
- `oid` int ,
- `ono` STRING ,
- `prov` int ,
- `city` int,
- `area` int,
- `park` int ,
- `tmpl` int ,
- `dtm` STRING ,
- `over_dtm` STRING ,
- `over_state` BOOLEAN ,
- `mc_tag` BOOLEAN ,
- `mc_cid` int ,
- `mc_uid` STRING ,
- `mc_man` STRING ,
- `mc_day` STRING ,
- `mc_sta` STRING ,
- `mc_end` STRING ,
- `audt_cid` int ,
- `audt_tag_chk` BOOLEAN ,
- `audt_tag_ok` BOOLEAN ,
- `audt_dtm` STRING ,
- `audt_uid` int ,
- `audt_man` STRING ,
- `audt_inf` STRING ,
- `submit` BOOLEAN ,
- `version` int ,
- `c_id` bigint ,
- `to_upd` tinyint ,
- PRIMARY KEY (id) NOT ENFORCED
- )WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = 'localhost',
- 'port' = '3306',
- 'username' = 'root',
- 'password' = '123456',
- 'database-name' = 'inst_ops',
- 'scan.incremental.snapshot.enabled'='false',
- 'table-name' = 'dev_mc'
- );
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。