当前位置:   article > 正文

springboot集成starrocks、以及采用flink实现mysql与starrocks亚秒级同步_spring+starrocks_spring boot集成starrocks

spring boot集成starrocks
        "dtm,over_dtm,over_state,mc_tag,mc_cid,mc_uid,mc_man,mc_day,mc_sta,mc_end,audt_cid,audt_tag_chk,audt_tag_ok," +
        "audt_dtm,audt_uid,audt_man,audt_inf,submit,version,c_id,to_upd) VALUES " ;

@Resource
private IDevMcService devMcService;
@Resource
private DevMcMapper mapper;

@Test
public void test1() {
    LambdaQueryWrapper<DevMc> last = Wrappers.lambdaQuery(DevMc.class);
    List<DevMc> list = devMcService.list(last);
    System.err.println("查询完毕");
    DynamicDataSourceContextHolder.push("slave"); // 手动切换
    System.err.println("切换数据源成功---------------");
    StringBuilder sb = new StringBuilder();
    int count = 0;
    for (DevMc devMc : list) {
        count ++;
        sb.append("(");
        sb.append(devMc.getId()).append(",");
        sb.append(devMc.getSignId()).append(",");
        sb.append(devMc.getYear()).append(",");
        sb.append(devMc.getOid()).append(",");
        sb.append("'").append(devMc.getOno()).append("',");
        sb.append(devMc.getProv()).append(",");
        sb.append(devMc.getCity()).append(",");
        sb.append(devMc.getArea()).append(",");
        sb.append(devMc.getPark()).append(",");
        sb.append(devMc.getTmpl()).append(",");
        sb.append("'").append(devMc.getDtm()).append("',");
        sb.append("'").append(devMc.getOverDtm()).append("',");
        sb.append(devMc.getOverState()).append(",");
        sb.append(devMc.getMcTag()).append(",");
        sb.append(devMc.getMcCid()).append(",");
        sb.append("'").append(devMc.getMcUid()).append("',");
        sb.append("'").append(devMc.getMcMan()).append("',");
        sb.append("'").append(devMc.getMcDay()).append("',");
        sb.append("'").append(devMc.getMcSta()).append("',");
        sb.append("'").append(devMc.getMcEnd()).append("',");
        sb.append(devMc.getAudtCid()).append(",");
        sb.append(devMc.getAudtTagChk()).append(",");
        sb.append(devMc.getAudtTagOk()).append(",");
        sb.append("'").append(devMc.getAudtDtm()).append("',");
        sb.append(devMc.getAudtUid()).append(",");
        sb.append("'").append(devMc.getAudtMan()).append("',");
        sb.append("'").append(devMc.getAudtInf()).append("',");
        sb.append(devMc.getSubmit()).append(",");
        sb.append(devMc.getVersion()).append(",");
        sb.append(devMc.getCId()).append(",");
        sb.append(devMc.getToUpd());
        sb.append(")");
        if (count == 5000) {
            sb.append(";");
            StringBuilder insert = new StringBuilder(NAME);
            insert.append(sb.toString());
            mapper.insertBatch(insert.toString());
            count = 0;
            sb.setLength(0);
        } else {
            sb.append(",");
        }
    }

    if (sb.length() > 0) {
        StringBuilder insert = new StringBuilder(NAME);
        insert.append(sb.toString().substring(0, sb.length()-1));
        insert.append(";");
        mapper.insertBatch(insert.toString());
        count = 0;
        sb.setLength(0);
    }

}

@Test
public void test2() {
    DevMc devMc = devMcService.getById(1558);
    System.err.println("查询完毕");
    DynamicDataSourceContextHolder.push("slave"); // 手动切换
    System.err.println("切换数据源成功---------------");
    devMcService.save(devMc);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

}


## 三、使用flink往starrocks导入数据,并实现亚秒级同步


[第3.4章:StarRocks数据导入--Flink Connector与CDC秒级数据同步\_flink-connector-starrocks\_流木随风的博客-CSDN博客]( )


所需文件:注意!有些jar包在外网没得,只有阿里云仓库有。而且搜索的出来的jar包与博客展示的名称有**细微差别。按博客的名称搜根本搜不出来,所以我觉得是他文档好久没更新了**


**flink版本:**


flink-1.13.5-bin-scala\_2.11.tgz、



**需要jar包:**


flink-connector-starrocks-1.1.14\_flink-1.13\_2.11.jar


flink-sql-connector-mysql-cdc-2.0.2.jar


flink-connector-jdbc\_2.11-1.13.5.jar


jar包阿里网盘下载连接:[阿里云盘分享]( )


**步骤就不用说了,按那个博客进行操作**


## 四、相关建表语句(私人保留记录用,跟文章无关)


1、starrocks建库语句



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

create database inst_ops;


2、starrocks建表语句



  • 1
  • 2
  • 3
  • 4
  • 5

CREATE TABLE dev_mc (
id bigint NOT NULL COMMENT ‘维保流水号’,
sign_id bigint NOT NULL COMMENT ‘签到ID’,
year int NOT NULL COMMENT ‘维保年份’,
oid int NOT NULL COMMENT ‘设备编号’,
ono varchar(90) NOT NULL DEFAULT ‘’ COMMENT ‘设备号’,
prov int NOT NULL COMMENT ‘省份’,
city int NOT NULL COMMENT ‘城市’,
area int NOT NULL,
park int NOT NULL,
tmpl int NOT NULL COMMENT ‘模板编号’,
dtm varchar(57) NOT NULL DEFAULT ‘’ COMMENT ‘时间-维保最迟时间’,
over_dtm varchar(57) NOT NULL DEFAULT ‘’ COMMENT ‘维保超期时间’,
over_state tinyint(1) NOT NULL COMMENT ‘超期是否计算’,
mc_tag tinyint(1) NOT NULL COMMENT ‘是否维保’,
mc_cid int NOT NULL COMMENT ‘维保单位’,
mc_uid varchar(90) NOT NULL DEFAULT ‘0’ COMMENT ‘维保人编号’,
mc_man varchar(90) NOT NULL DEFAULT ‘’ COMMENT ‘维保人姓名’,
mc_day varchar(33) NOT NULL DEFAULT ‘’ COMMENT ‘保养日期(xxxx年xx月xx日)’,
mc_sta varchar(57) NOT NULL DEFAULT ‘’ COMMENT ‘维保开始时间(xx时xx分)’,
mc_end varchar(57) NOT NULL DEFAULT ‘’ COMMENT ‘维保结束时间(xx时xx分)’,
audt_cid int NOT NULL COMMENT ‘审核公司ID’,
audt_tag_chk tinyint(1) NOT NULL COMMENT ‘使用单位是否审核’,
audt_tag_ok tinyint(1) NOT NULL COMMENT ‘使用单位审核结果:通过/不通过’,
audt_dtm varchar(57) NOT NULL DEFAULT ‘’ COMMENT ‘审核日期’,
audt_uid int NOT NULL COMMENT ‘使用单位审核人’,
audt_man varchar(90) NOT NULL COMMENT ‘使用单位审核人’,
audt_inf varchar(150) NOT NULL COMMENT ‘使用单位审核结果’,
submit tinyint(1) NOT NULL COMMENT ‘是否提交物业审核’,
version int NOT NULL COMMENT ‘版本号(1判断审核后是否更新)’,
c_id bigint NOT NULL COMMENT ‘96333返回ID’,
to_upd tinyint NULL COMMENT ‘是否要同步(0否1是)’
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES(“replication_num” = “1”,
“enable_persistent_index” = “true”);


3、flink sql——mysql、starrocks的建表映射语句


mysql:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

CREATE TABLE source_mysql_devmc (
id BIGINT,
sign_id BIGINT,
year int ,
oid int ,
ono STRING ,
prov int ,
city int,
area int,
park int ,
tmpl int ,
dtm STRING ,
over_dtm STRING ,
over_state BOOLEAN ,
mc_tag BOOLEAN ,
mc_cid int ,
mc_uid STRING ,
mc_man STRING ,
mc_day STRING ,
mc_sta STRING ,
mc_end STRING ,
audt_cid int ,
audt_tag_chk BOOLEAN ,
audt_tag_ok BOOLEAN ,
audt_dtm STRING ,
audt_uid int ,
audt_man STRING ,
audt_inf STRING ,
submit BOOLEAN ,
version int ,
c_id bigint ,
to_upd tinyint ,
PRIMARY KEY (id) NOT ENFORCED
)WITH (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://localhost:3306/inst_ops’,
‘table-name’ = ‘dev_mc’,
‘username’ = ‘root’,
‘password’ = ‘123456’
);


starrocks:



  • 1
  • 2
  • 3
  • 4
  • 5

CREATE TABLE sink_starrocks_devmc (
id BIGINT,
sign_id BIGINT,
year int ,
oid int ,
ono STRING ,
prov int ,
city int,
area int,
park int ,
tmpl int ,
dtm STRING ,
over_dtm STRING ,
over_state BOOLEAN ,
mc_tag BOOLEAN ,
mc_cid int ,
mc_uid STRING ,
mc_man STRING ,
mc_day STRING ,
mc_sta STRING ,
mc_end STRING ,
audt_cid int ,
audt_tag_chk BOOLEAN ,
audt_tag_ok BOOLEAN ,
audt_dtm STRING ,
audt_uid int ,
audt_man STRING ,
audt_inf STRING ,
submit BOOLEAN ,
version int ,
c_id bigint ,
to_upd tinyint ,
PRIMARY KEY (id) NOT ENFORCED
)WITH (
‘connector’ = ‘starrocks’,
‘jdbc-url’=‘jdbc:mysql://localhost:9030’,
‘load-url’=‘localhost:8030’,
‘database-name’ = ‘inst_ops’,
‘table-name’ = ‘dev_mc’,
‘username’ = ‘root’,

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
img

一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

大数据开发知识点,真正体系化!**

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
[外链图片转存中…(img-F7Xttsnw-1712990233304)]

一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/949329
推荐阅读
相关标签
  

闽ICP备14008679号