当前位置:   article > 正文

Flink1.15.4+flinkcdc+Dinky0.7.3+Mysql5.7+Doris1.2.1实现实时同步_dinky cdcsource 整库同步mysql数据到doris

dinky cdcsource 整库同步mysql数据到doris

一、下载软件

Flink1.15.1: https://download.csdn.net/download/qq_37247664/88192279

Dinky0.7.3:https://download.csdn.net/download/qq_37247664/88192354

flinkcdc+doris连接器:https://download.csdn.net/download/qq_37247664/88192362

二、软件安装

flink安装:下载flink后解压文件 tar -zxvf 文件命名

                  将下载的flincdc和doris连接器jar包放入flink的lib包下

                  修改conf下的flink的yaml文件中rest.bind-address:0.0.0.0

                   启动flink bin/start-cluster.sh

此时flink启动完毕,可以通过ip:8081访问flink页面

 

Dinky安装:下载Dinky后解压文件 tar -zxvf 文件名

                     修改config下的application.yml,主要修改数据库连接

                     

                        将dinky下的sql同步到dinky库中。

                        将flink1.15下的lib中jar包放到dinky下plugins中flink1.15下

                      需要将dinky的dlink-common-0.7.3.jar,dlink-client-1.15-0.7.3.jar,dlink-client-base-0.7.3.jar 这三个jar包放入flink的lib下

                       

                    启动dinky ./auto.sh start 1.15 使用1.15版本

                    访问:ip:8888

三、使用dinky创建job

注册flink集群选择standalone

 数据开发

 

测试脚本

CREATE TABLE t_migrate_way(

  id STRING,

  created_user STRING,

  created_date timestamp NOT NULL,

  updated_user STRING,

  updated_date timestamp NOT NULL,

  `date` STRING,

  bus STRING,

  bus_num INT,

  airplane STRING,

  airplane_num INT,

  train STRING,

  train_num INT,

  car STRING,

  car_num INT,

  block_name STRING,

  PRIMARY KEY (id) NOT ENFORCED

)WITH(

     'connector' = 'mysql-cdc',

     'hostname' = '127.0.0.1',

     'port' = '3306',

     'username' = 'zc',

     'password' = '123456',

     'database-name' = '库名',

     'table-name' = '表名'

);

CREATE TABLE t_migrate_way_sink(

  id STRING,

  created_user STRING,

  created_date timestamp NOT NULL,

  updated_user STRING,

  updated_date timestamp NOT NULL,

  `date` STRING,

  bus STRING,

  bus_num INT,

  airplane STRING,

  airplane_num INT,

  train STRING,

  train_num INT,

  car STRING,

  car_num INT,

  block_name STRING,

  PRIMARY KEY (id) NOT ENFORCED

)WITH(

      'connector' = 'doris',

      'fenodes' = 'localhost:8030',

      'table.identifier' = 'bigdata.t_migrate_way',

      'username' = '账号',

      'password' = '密码',

      'sink.label-prefix' = '唯一'

);

INSERT INTO t_migrate_way_sink select id,created_user,created_date,updated_user,updated_date,`date`,bus,bus_num,airplane,airplane_num,train,train_num,car,car_num,block_name from t_migrate_way;

四、效果

 

 其中flink中的job等核心参数需自己设置

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/398661
推荐阅读
相关标签
  

闽ICP备14008679号