赞
踩
Flink CDC 1.12版本引入了对SQL Server的支持,包括SqlServerCatalog
和SqlServerTable
。在SqlServerCatalog
中,你可以根据表名获取对应的字段和字段类型。
SQL Server 2008 开始支持变更数据捕获 (CDC) 功能。CDC 允许你捕获对表中数据更改的数据,这样你就可以查询更改的数据而不需要扫描整个表。
软件版本
Flink 1.17.1
数据库版本 Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64)
-- 开启SQL Server数据库CDC。 在需要开启CDC的数据库执行此命令
EXEC sys.sp_cdc_enable_db
-- 查询开启CDC的数据库
select name, is_cdc_enabled from sys.databases
打开 SQL Server配置管理器 => 选择SQL Server服务 => 选择SQL Server代理 右击开启
-- 开启表级别的CDC --需要开启先SQL Server代理 然后执行
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo', -- source_schema
@source_name = 'AIR_STATION_HOUR_DATA', -- table_name
@capture_instance = NULL, -- capture_instance
@supports_net_changes = 1, -- supports_net_changes
@role_name = NULL -- role_name
-- 验证表是否开启cdc成功
EXEC sys.sp_cdc_help_change_data_capture
<properties> <flink.version>1.17.1</flink.version> </properties> <dependencies> <dependency> <groupId>com.microsoft.sqlserver</groupId> <artifactId>mssql-jdbc</artifactId> <version>9.4.1.jre8</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.26</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-sqlserver-cdc</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
public class SQLServerConstant {
public static final String SQLSERVER_HOST = "0.0.0.0"; //数据库地址
public static final Integer SQLSERVER_PORT = 1433; //端口
public static final String SQLSERVER_DATABASE = "HBDC_AQI"; //库
public static final String SQLSERVER_TABLE_LIST= "dbo.AIR_STATION_HOUR_DATA"; // 表
public static final String SQLSERVER_USER_NAME = "sa"; //用户
public static final String SQLSERVER_PASSWORD = "*******"; //密码
}
@Data public class DataChangeInfo implements Serializable { /** * 数据库名 */ private String database; /** * 表名 */ private String tableName; /** * 变更时间 */ private LocalDateTime changeTime; /** * 变更类型 1新增 2修改 3删除 */ private Integer eventType; /** * 变更前数据 */ private String beforeData; /** * 变更后数据 */ private String afterData; }
@Slf4j public class SQLServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> { public static final String TS_MS = "ts_ms"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String SOURCE = "source"; public static final String CREATE = "CREATE"; public static final String UPDATE = "UPDATE"; @Override public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception { try { String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); String database = fields[1]; String tableName = fields[2]; Struct struct = (Struct) sourceRecord.value(); final Struct source = struct.getStruct(SOURCE); DataChangeInfo dataChangeInfo = new DataChangeInfo(); dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString()); dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString()); // 获取操作类型 CREATE UPDATE DELETE 1新增 2修改 3删除 Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toUpperCase(); int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3; dataChangeInfo.setEventType(eventType); dataChangeInfo.setDatabase(database); dataChangeInfo.setTableName(tableName); ZoneId zone = ZoneId.systemDefault(); Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis); dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone)); //7.输出数据 collector.collect(dataChangeInfo); } catch (Exception e) { log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage()); } } /** * * 从源数据获取出变更之前或之后的数据 */ private JSONObject getJsonObject(Struct value, String fieldElement) { Struct element = value.getStruct(fieldElement); JSONObject jsonObject = new JSONObject(); if (element != null) { Schema afterSchema = element.schema(); List<Field> fieldList = afterSchema.fields(); for (Field field : fieldList) { Object afterValue = element.get(field); jsonObject.put(field.name(), afterValue); } } return jsonObject; } @Override public TypeInformation<DataChangeInfo> getProducedType() { return TypeInformation.of(DataChangeInfo.class); } }
public class FlinkSourceUtil { /** * 构造SQL Server CDC数据源 */ public static DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() { String[] tables = SQLSERVER_TABLE_LIST.replace(" ", "").split(","); return SqlServerSource.<DataChangeInfo>builder() .hostname(SQLSERVER_HOST) .port(SQLSERVER_PORT) .database(SQLSERVER_DATABASE) // monitor sqlserver database .tableList(tables) // monitor products table .username(SQLSERVER_USER_NAME) .password(SQLSERVER_PASSWORD) /* *initial初始化快照,即全量导入后增量导入(检测更新数据写入) * latest:只进行增量导入(不读取历史变化) */ .startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.initial()) .deserializer(new SQLServerJsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource(); DataStream<DataChangeInfo> streamSource = env .addSource(dataChangeInfoMySqlSource, "SQLServer-source") .setParallelism(1); streamSource.print(); env.execute("SQLServer-stream-cdc"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。