赞
踩
CREATE TABLE test(
id INT,
student VARCHAR(32),
phone VARCHAR(32)
) UNIQUE KEY(id)
DISTRIBUTED BY HASH(id)
BUCKETS 3
PROPERTIES("replication_num" = "1");
CREATE TABLE `test` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`student` varchar(30),
`phone` varchar(30),
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
flink版本:1.13
scala版本:2.11
<!--flink cdc --> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.2.0</version> </dependency> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.6</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.13.6</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.13.6</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.13.6</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.13.6</version> </dependency> <!-- flink doris connector --> <dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>1.2.3_flink-1.13_2.11</version> </dependency>
主类:FlinkToDoris
import com.starrocks.connector.flink.StarRocksSink; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FlinkToDoris { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("ip") .port(port) .databaseList("db_name") // set captured database .tableList("db_name.table_name") // set captured table .username("xxx") .password("xxx") // 不需要自定义schema可以直接用JsonDebeziumDeserializationSchema //.deserializer(new JsonDebeziumDeserializationSchema()) .deserializer(new MyDeserializationSchema) // 从最新数据开始 .startupOptions(StartupOptions.latest()) // 从最开始的数据开始 //.startupOptions(StartupOptions.initial()) .build(); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql source") .addSink(StarRocksSink.sink( StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") // 设置并行度,多并行度情况下需要考虑如何保证数据有序性 .withProperty("sink.parallelism", "1") .build())); env.execute("FlinkToDoris"); } }
自定义Schema:MyDeserializationSchema
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import java.util.Date; import java.util.List; public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> { /** * 封装为 * {"update_time":"2022-9-19 14:24:26","student":"ccc","phone":"ddd","id":16,"is_delete":0} * 因为doris是不能根据非主键字段做删除操作的,所以当mysql中有删除操作时,这边边采用逻辑删除的方式,将删出字段标记为已删除 * 后期在做数据分析时,可以将这部分数据过滤掉即可。 * is_delete:逻辑删除标志符 0表示正常 1表示删除 * update_time:数据的更新时间 */ @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { JSONObject res = new JSONObject(); Struct value = (Struct) sourceRecord.value(); // c: insert; u: update; d: delete String op = value.getString("op"); if ("d".equals(op)) { // 如果是delete操作,则取before中的数据,将其设置为1 Struct before = value.getStruct("before"); if (before != null) { Schema beforeSchema = before.schema(); List<Field> beforeFields = beforeSchema.fields(); for (Field field : beforeFields) { Object beforeValue = before.get(field); res.put(field.name(), beforeValue); } res.put("is_delete", 1); res.put("update_time", new Date().toLocaleString()); } } else { //如果是insert或者update操作,则取after中的数据,将是否删除设置为0 Struct after = value.getStruct("after"); if (after != null) { Schema afterSchema = after.schema(); List<Field> afterFields = afterSchema.fields(); for (Field field : afterFields) { Object afterValue = after.get(field); res.put(field.name(), afterValue); } res.put("is_delete", 0); res.put("update_time", new Date().toLocaleString()); } } //输出数据 collector.collect(res.toString()); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
插入数据到mysql
INSERT INTO `test`(student, phone) VALUES('aaa', 'bbb');
DorisDB中查看数据
更新数据到mysql
UPDATE `test` SET student = 'ddd', phone = 'qqq' WHERE id = 20;
DorisDB中查看数据
删除数据到mysql
DELETE FROM `iferry_fall_record_test1` WHERE id = 20;
DorisDB中查看数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。