当前位置:   article > 正文

使用flink将mysql数据入湖delta_mysqlsource.builder()

mysqlsource.builder()

使用flink将mysql数据入湖delta

1.简介

Delta数据湖原来是强绑定于Spark引擎,而近期社区实现了使用Flink引擎将数据入湖,简单写个demo使用以下。

  • Flink 1.13.0
  • delta 1.0.0
  • flink-mysql-cdc 2.1.0

2.Mysql入湖代码

2.1 Flink运行环境

设置下checkpoint的时间大小

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
  • 1
  • 2

2.2 构建MysqlSouce

使用flink-cdc-mysql依赖中的方法,输入ip,表名等直接构建

MySqlSource<String> source = MySqlSource
    .<String>builder()
    .hostname("ip")
    .port(3306)
    .databaseList("database")
    .tableList("database.table")
    .username("username")
    .password("password")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.3 Mysql表的Schema转变成Flink-RowType

使用flink将数据入湖时,需要将source的Schema转换成Flink的RowType

通过RowType.RowField实现,这里定义三个字段的RowType

public static RowType getMysqlRowType(){
    return new RowType(Arrays.asList(
        new RowType.RowField("id", new BigIntType()),
        new RowType.RowField("name", new VarCharType(VarCharType.MAX_LENGTH)),
        new RowType.RowField("dept_id",new IntType())
    ));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.4 构建Sink

使用delta-flink依赖中的DeltaSink
.forRowData()方法,指定lakePath,hadoop-conf,rowType,生成Sink

public static org.apache.hadoop.conf.Configuration getHadoopConf() {
    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
    conf.set("parquet.compression", "SNAPPY");
    return conf;
}

public static DeltaSink<RowData> createDeltaSink(String deltaTablePath, RowType rowType) {
    return DeltaSink
        .forRowData(
        new Path(deltaTablePath),
        getHadoopConf(),
        rowType).build();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.5 String转为RowData

Source端使用String类型,Sink端使用RowData类型,所以需要使用Map函数进行一次转换。

使用fastJson获取每个字段的值,然后变成Flink row类型,最后使用convertor转换为RowData

//存在于flink-table-runtime-blink_2.12依赖中 
public static final DataFormatConverters.DataFormatConverter<RowData, Row> MYSQL_CONVERTER =
            DataFormatConverters.getConverterForDataType(
                    TypeConversions.fromLogicalToDataType(getMysqlRowType())
            );

public static RowData mysqlJsonToRowData(String line){
    String body = JSON.parseObject(line).getString("after");
    Long id = JSON.parseObject(body).getLong("id");
    String name = JSON.parseObject(body).getString("name");
    Integer deptId = JSON.parseObject(body).getInteger("dept_id");
    Row row = Row.of(id,name,deptId);
    return MYSQL_CONVERTER.toInternal(row);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.6 执行

依次将source,sink放入env中执行即可

env.fromSource(source, WatermarkStrategy.noWatermarks(),"demo-mysql-cdc")
    .setParallelism(2)
    //将json数据转为FlinkRowData
    .map(FlinkDeltaUtil::mysqlJsonToRowData)
    .sinkTo(deltaSink)
    .setParallelism(1);

env.execute("flink-cdc-to-delta");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3. 源码

仓库地址 (https://gitee.com/zhiling-chen/demo-mysql-flink-delta)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/767853
推荐阅读
相关标签
  

闽ICP备14008679号