赞
踩
本篇文章,首先会向大家阐述什么是 Flink connector 和 CDC , 然后会通过手把手的方式和大家一起构建一个简单的GBase8s的Flink connector,并完成实践项目,即通过Mysql CDC实时通过connector同步数据到GBase8s中。
Flink内置了一些基本数据源和接收器,这些数据源和接收器始终可用。该预定义的数据源包括文件、Mysql、RabbitMq、Kafka、ES等,同时也支持数据输出到文件、Mysql、RabbitMq、Kafka、ES等。
简单的说:flink连接器就是将某些数据源加载与数据输出做了封装(连接器),我们只要引入对应的连接器依赖,即可快速的完成对数据源的加载以及数据的输出。
首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。
其主要的应用场景:
异构数据库之间的数据同步或备份 / 建立数据分析计算平台
微服务之间共享数据状态
更新缓存 / CQRS 的 Query 视图更新
CDC 它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。
基于查询的 CDC | 基于日志的 CDC | |
---|---|---|
概念 | 每次捕获变更发起 Select 查询进行全表扫描,过滤出查询之间变更的数据 | 读取数据存储系统的 log ,例如 MySQL 里面的 binlog持续监控 |
开源产品 | Sqoop, Kafka JDBC Source | Canal, Maxwell, Debezium |
执行模式 | Batch | Streaming |
捕获所有数据的变化 | ❌ | ✅ |
低延迟,不增加数据库负载 | ❌ | ✅ |
不侵入业务(LastUpdated字段) | ❌ | ✅ |
捕获删除事件和旧记录的状态 | ❌ | ✅ |
捕获旧记录的状态 | ❌ | ✅ |
我们其实是可以自己手写Sink将CDC的数据直接汇入我们的目标数据库的。这样是不是不够优雅?我们是不是可以通过Flink SQL的方式将数据汇入到GBase8s呢?答案是肯定的,接下来我们就来实现一个简单的GBase8s的Flink connector
构建 行转换器(RowConverter)
构建 方言(Dialect)
注册动态表工厂(DynamicTableFactory),以及相关Sink程序
经过上面三步,就可以实现一个简单的connector了。接下来我们就来看,如何实现:
- package wang.datahub.converter;
-
- import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
- import org.apache.flink.table.types.logical.RowType;
-
- /**
- * @author lijiaqi
- */
- public class GBasedbtRowConverter extends AbstractJdbcRowConverter {
-
- public GBasedbtRowConverter(RowType rowType) {
- super(rowType);
- }
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String converterName() {
- return "gbasedbt";
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package wang.datahub.dialect;
-
- import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
- import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
- import org.apache.flink.table.api.TableSchema;
- import org.apache.flink.table.api.ValidationException;
- import org.apache.flink.table.types.logical.RowType;
- import wang.datahub.converter.GBasedbtRowConverter;
-
- import java.util.Optional;
-
- /**
- *
- * @author lijiaqi
- */
- public class GBasedbtDialect implements JdbcDialect {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String dialectName() {
- return "gbasedbt";
- }
-
- @Override
- public boolean canHandle(String url) {
- return url.startsWith("jdbc:gbasedbt-sqli:");
- }
-
- @Override
- public JdbcRowConverter getRowConverter(RowType rowType) {
- return new GBasedbtRowConverter(rowType);
- }
-
- @Override
- public String getLimitClause(long l) {
- return null;
- }
-
- @Override
- public void validate(TableSchema schema) throws ValidationException {
- JdbcDialect.super.validate(schema);
- }
-
- @Override
- public Optional<String> defaultDriverName() {
- return Optional.of("com.gbasedbt.jdbc.Driver");
- }
-
- @Override
- public String quoteIdentifier(String identifier) {
- return "'" + identifier + "'";
- }
-
- @Override
- public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
- return JdbcDialect.super.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);
- }
-
- @Override
- public String getRowExistsStatement(String tableName, String[] conditionFields) {
- return JdbcDialect.super.getRowExistsStatement(tableName, conditionFields);
- }
-
- @Override
- public String getInsertIntoStatement(String tableName, String[] fieldNames) {
- return JdbcDialect.super.getInsertIntoStatement(tableName, fieldNames);
- }
-
- @Override
- public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
- return JdbcDialect.super.getUpdateStatement(tableName, fieldNames, conditionFields);
- }
-
- @Override
- public String getDeleteStatement(String tableName, String[] conditionFields) {
- return JdbcDialect.super.getDeleteStatement(tableName, conditionFields);
- }
-
- @Override
- public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
- return JdbcDialect.super.getSelectFromStatement(tableName, selectFields, conditionFields);
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
首先创建 GBasedbtSinkFunction
用于接受RowData数据输入,并将其Sink到配置的数据库中
- package wang.datahub.table;
-
- import org.apache.flink.api.common.serialization.SerializationSchema;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.apache.flink.table.data.RowData;
- import org.apache.flink.table.types.DataType;
-
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.Statement;
-
- /**
- * @author lijiaqi
- */
- public class GBasedbtSinkFunction extends RichSinkFunction<RowData> {
-
- private static final long serialVersionUID = 1L;
-
- private final JdbcOptions jdbcOptions;
- private final SerializationSchema<RowData> serializationSchema = null;
- private DataType dateType;
-
- private Connection conn;
- private Statement stmt;
-
- public GBasedbtSinkFunction(JdbcOptions jdbcOptions) {
- this.jdbcOptions = jdbcOptions;
- }
-
- public GBasedbtSinkFunction(JdbcOptions jdbcOptions, DataType dataType) {
- this.jdbcOptions = jdbcOptions;
- this.dateType = dataType;
- }
-
- @Override
- public void open(Configuration parameters) {
- System.out.println("open connection !!!!!");
- try {
- if (null == conn) {
- Class.forName(jdbcOptions.getDriverName());
- conn = DriverManager.getConnection(jdbcOptions.getDbURL(),jdbcOptions.getUsername().orElse(null),jdbcOptions.getPassword().orElse(null));
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void invoke(RowData value, Context context) throws Exception {
-
- try {
- stmt = conn.createStatement();
- String sql = "insert into " + this.jdbcOptions.getTableName() + " values ( ";
- for (int i = 0; i < value.getArity(); i++) {
- //这里需要根据事情类型进行匹配
- if(dateType.getChildren().get(i).getConversionClass().equals(Integer.class)){
- sql += +value.getInt(i)+ " ,";
- }else {
- sql += "'"+value.getString(i) + "' ,";
- }
- }
- sql = sql.substring(0, sql.length() - 1);
- sql += " ); ";
-
- System.out.println("sql ==>" + sql);
-
- stmt.execute(sql);
- }catch(Exception e){
- e.printStackTrace();
- }
- }
-
- @Override
- public void close() throws Exception {
- if (stmt != null) {
- stmt.close();
- }
- if (conn != null) {
- conn.close();
- }
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
构建 GBasedbtDynamicTableSink
- package wang.datahub.table;
-
- import org.apache.flink.api.common.serialization.SerializationSchema;
- import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
- import org.apache.flink.table.connector.ChangelogMode;
- import org.apache.flink.table.connector.format.EncodingFormat;
- import org.apache.flink.table.connector.sink.DynamicTableSink;
- import org.apache.flink.table.connector.sink.SinkFunctionProvider;
- import org.apache.flink.table.data.RowData;
- import org.apache.flink.table.types.DataType;
-
- /**
- * @author lijiaqi
- */
- public class GBasedbtDynamicTableSink implements DynamicTableSink {
-
- private final JdbcOptions jdbcOptions;
- private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
- private final DataType dataType;
-
- public GBasedbtDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {
- this.jdbcOptions = jdbcOptions;
- this.encodingFormat = encodingFormat;
- this.dataType = dataType;
- }
-
- @Override
- public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
- return requestedMode;
- }
-
- @Override
- public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
- System.out.println("SinkRuntimeProvider");
- System.out.println(dataType);
- GBasedbtSinkFunction gbasedbtSinkFunction = new GBasedbtSinkFunction(jdbcOptions,dataType);
- return SinkFunctionProvider.of(gbasedbtSinkFunction);
- }
-
- @Override
- public DynamicTableSink copy() {
- return new GBasedbtDynamicTableSink(jdbcOptions, encodingFormat, dataType);
- }
-
- @Override
- public String asSummaryString() {
- return "gbasedbt Table Sink";
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
构建GBasedbtDynamicTableFactory
- package wang.datahub.table;
-
-
- import org.apache.flink.configuration.ConfigOption;
- import org.apache.flink.configuration.ConfigOptions;
- import org.apache.flink.configuration.ReadableConfig;
- import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
- import org.apache.flink.table.api.TableSchema;
- import org.apache.flink.table.connector.sink.DynamicTableSink;
- import org.apache.flink.table.connector.source.DynamicTableSource;
- import org.apache.flink.table.factories.DynamicTableSinkFactory;
- import org.apache.flink.table.factories.DynamicTableSourceFactory;
- import org.apache.flink.table.factories.FactoryUtil;
- import org.apache.flink.table.types.DataType;
- import org.apache.flink.table.utils.TableSchemaUtils;
- import wang.datahub.dialect.GBasedbtDialect;
-
- import java.util.HashSet;
- import java.util.Set;
-
- /**
- * @author lijiaqi
- */
- public class GBasedbtDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
-
- public static final String IDENTIFIER = "gbasedbt";
-
- private static final String DRIVER_NAME = "com.gbasedbt.jdbc.Driver";
-
- public static final ConfigOption<String> URL = ConfigOptions
- .key("url")
- .stringType()
- .noDefaultValue()
- .withDescription("the jdbc database url.");
-
- public static final ConfigOption<String> DRIVER = ConfigOptions
- .key("driver")
- .stringType()
- .defaultValue(DRIVER_NAME)
- .withDescription("the jdbc driver.");
-
- public static final ConfigOption<String> TABLE_NAME = ConfigOptions
- .key("table-name")
- .stringType()
- .noDefaultValue()
- .withDescription("the jdbc table name.");
-
- public static final ConfigOption<String> USERNAME = ConfigOptions
- .key("username")
- .stringType()
- .noDefaultValue()
- .withDescription("the jdbc user name.");
-
- public static final ConfigOption<String> PASSWORD = ConfigOptions
- .key("password")
- .stringType()
- .noDefaultValue()
- .withDescription("the jdbc password.");
-
- // public static final ConfigOption<String> FORMAT = ConfigOptions
- // .key("format")
- // .stringType()
- // .noDefaultValue()
- // .withDescription("the format.");
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- Set<ConfigOption<?>> requiredOptions = new HashSet<>();
- requiredOptions.add(URL);
- requiredOptions.add(TABLE_NAME);
- requiredOptions.add(USERNAME);
- requiredOptions.add(PASSWORD);
- // requiredOptions.add(FORMAT);
- return requiredOptions;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- return new HashSet<>();
- }
-
- @Override
- public DynamicTableSource createDynamicTableSource(Context context) {
-
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
-
- final ReadableConfig config = helper.getOptions();
-
- helper.validate();
-
- JdbcOptions jdbcOptions = getJdbcOptions(config);
-
- TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-
- return new GBasedbtDynamicTableSource(jdbcOptions, physicalSchema);
-
- }
-
- @Override
- public DynamicTableSink createDynamicTableSink(Context context) {
-
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
-
- // final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
- // SerializationFormatFactory.class,
- // FactoryUtil.FORMAT);
-
- final ReadableConfig config = helper.getOptions();
-
- helper.validate();
-
- JdbcOptions jdbcOptions = getJdbcOptions(config);
-
- final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-
- return new GBasedbtDynamicTableSink(jdbcOptions, null, dataType);
- }
-
- private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
- final String url = readableConfig.get(URL);
- final JdbcOptions.Builder builder = JdbcOptions.builder()
- .setDriverName(DRIVER_NAME)
- .setDBUrl(url)
- .setTableName(readableConfig.get(TABLE_NAME))
- .setDialect(new GBasedbtDialect());
-
- readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
- readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
- return builder.build();
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
接下来通过SPI注册动态表:创建文件resources\META-INF\services\org.apache.flink.table.factories.Factory
内容注册为wang.datahub.table.GBasedbtDynamicTableFactory
至此,我们的Flink connector 就构建完成,接下来,我们要使用其,来完成一个真正的项目。
下面是项目的整体架构图,我们通过flink cdc 从mysql获取变更数据,然后通过 flink sql 将数据 sink 到 gbase8s 里
接下来,我们看一下如何通过Flink SQL实现CDC ,只需3条SQL语句即可。
创建数据源表
- // 数据源表
- String sourceDDL =
- "CREATE TABLE mysql_binlog (\n" +
- " id INT NOT NULL,\n" +
- " name STRING,\n" +
- " description STRING\n" +
- ") WITH (\n" +
- " 'connector' = 'mysql-cdc',\n" +
- " 'hostname' = 'localhost',\n" +
- " 'port' = '3306',\n" +
- " 'username' = 'flinkcdc',\n" +
- " 'password' = '123456',\n" +
- " 'database-name' = 'test',\n" +
- " 'table-name' = 'test_cdc'\n" +
- ")";
创建输出表,输出到GBase8s ,这里 connector设置成gbasedbt
- String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";
- String userName = "gbasedbt";
- String password = "123456";
- String gbasedbtSinkTable = "ta";
- // 输出目标表
- String sinkDDL =
- "CREATE TABLE test_cdc_sink (\n" +
- " id INT NOT NULL,\n" +
- " name STRING,\n" +
- " description STRING,\n" +
- " PRIMARY KEY (id) NOT ENFORCED \n " +
- ") WITH (\n" +
- " 'connector' = 'gbasedbt',\n" +
- // " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
- " 'url' = '" + url + "',\n" +
- " 'username' = '" + userName + "',\n" +
- " 'password' = '" + password + "',\n" +
- " 'table-name' = '" + gbasedbtSinkTable + "' \n" +
- ")";
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这里我们直接将数据汇入
- String transformSQL =
- "insert into test_cdc_sink select * from mysql_binlog";
完整参考代码
- package wang.datahub.cdc;
-
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.EnvironmentSettings;
- import org.apache.flink.table.api.SqlDialect;
- import org.apache.flink.table.api.TableResult;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
- public class MysqlToGBasedbtlMain {
- public static void main(String[] args) throws Exception {
- EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
-
-
-
- tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
-
-
- // 数据源表
- String sourceDDL =
- "CREATE TABLE mysql_binlog (\n" +
- " id INT NOT NULL,\n" +
- " name STRING,\n" +
- " description STRING\n" +
- ") WITH (\n" +
- " 'connector' = 'mysql-cdc',\n" +
- " 'hostname' = 'localhost',\n" +
- " 'port' = '3306',\n" +
- " 'username' = 'flinkcdc',\n" +
- " 'password' = '123456',\n" +
- " 'database-name' = 'test',\n" +
- " 'table-name' = 'test_cdc'\n" +
- ")";
-
-
- String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";
- String userName = "gbasedbt";
- String password = "123456";
- String gbasedbtSinkTable = "ta";
- // 输出目标表
- String sinkDDL =
- "CREATE TABLE test_cdc_sink (\n" +
- " id INT NOT NULL,\n" +
- " name STRING,\n" +
- " description STRING,\n" +
- " PRIMARY KEY (id) NOT ENFORCED \n " +
- ") WITH (\n" +
- " 'connector' = 'gbasedbt',\n" +
- // " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
- " 'url' = '" + url + "',\n" +
- " 'username' = '" + userName + "',\n" +
- " 'password' = '" + password + "',\n" +
- " 'table-name' = '" + gbasedbtSinkTable + "' \n" +
- ")";
-
- String transformSQL =
- "insert into test_cdc_sink select * from mysql_binlog";
-
- tableEnv.executeSql(sourceDDL);
- tableEnv.executeSql(sinkDDL);
- TableResult result = tableEnv.executeSql(transformSQL);
-
- result.print();
- env.execute("sync-flink-cdc");
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
运行结果
查看数据,已经录入进数据库里
参考链接:
https://blog.csdn.net/zhangjun5965/article/details/107605396
https://cloud.tencent.com/developer/article/1745233?from=article.detail.1747773
https://segmentfault.com/a/1190000039662261
https://www.cnblogs.com/weijiqian/p/13994870.html
https://blog.csdn.net/dafei1288/article/details/118192917
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。