赞
踩
问题:我们在使用过程中请注意cdc版本和flink的版本,
目前flink 1.15.2还没有很好地cdc兼容版本有能力的可以自己编译当前时间23-04-25我使用flink 1.15.3版本已经正常生产运行,参见目前版本兼容;
SR官方推荐的是Flink sql版本(支持增删改同步,实时同步) 如果不可以修改或者删除,请检查你的flink版本和cdc版本以及sr sink的版本。
注意:这个例子基于flink 1.13,
截止目前不推荐1.15*版本,23-04-25 推荐flink 1.15.3版本
1、请注意scala版本jar包注释;
2、scope则是当你需要打包交由flink集群托管时需要设置provided
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.txlc</groupId> <artifactId>dwh-cdc</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.13.6</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> <!-- <scope>test</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java 1.15版本以上 flink-streaming-java 以下需要加上scala版本_2.12 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.12</artifactId> <version>${flink.version}</version> <!-- <scope>test</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_2.12</artifactId> <version>${flink.version}</version> <!-- <scope>test</scope>--> </dependency> <!--<!–1.14*–> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime_2.12</artifactId> <version>${flink.version}</version> <!– <scope>provided</scope>–> </dependency> <!–1.15*–> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader_2.12</artifactId> <version>${flink.version}</version> <!– <scope>provided</scope>–> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-test-utils_2.12</artifactId> <version>${flink.version}</version> <!– <scope>test</scope>–> </dependency>--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-guava</artifactId> <!-- <version>30.1.1-jre-15.0</version>--> <version>18.0-13.0</version> <!-- <version>30.1.1-jre-14.0</version>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>1.10.3</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.21</version> <!-- <version>5.1.49</version>--> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.4</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.3.4</version> <!-- <scope>test</scope>--> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.24</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.20.graal</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.2.0</version> <!-- <scope>provided</scope>--> </dependency> <!-- 1.30flink SR官方推荐的 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <!-- <version>1.2.4_flink-1.15</version>--> <!-- <version>1.2.4_flink-1.13_2.12</version>--> <version>1.2.3_flink-1.13_2.11</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.10</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <!--<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>--> </plugins> </build> </project>
注意:
1、mysql cdc同步过来的格式并不能直接由SR sink处理,需要拿出来before或者after中的json数据,并且如果你想要更新或者删除需要增加__op
字段.
2、这里同步有个小问题即日期需要自己处理才可以完美同步到SR.关于日式格式化参见我这片文章
package *; import com.alibaba.fastjson2.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.text.SimpleDateFormat; import java.util.Objects; /** * 自定义反序列化 * * @author JGMa */ public class TxlcCustomerSchema implements DebeziumDeserializationSchema<String> { @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { String topic = sourceRecord.topic(); String[] strings = topic.split("\\."); // String database = strings[1]; // String table = strings[2]; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Struct value = (Struct) sourceRecord.value(); // JSONObject data = new JSONObject(); Struct before = value.getStruct("before"); JSONObject beforeData = new JSONObject(); if (before != null) { for (Field field : before.schema().fields()) { Object o = before.get(field); beforeData.put(field.name(), o); } } Struct after = value.getStruct("after"); JSONObject afterData = new JSONObject(); if (after != null) { for (Field field : after.schema().fields()) { Object o = after.get(field); afterData.put(field.name(), o); } } Envelope.Operation op = Envelope.operationFor(sourceRecord); System.out.println("->" + value.toString()); System.out.println("===" + beforeData.toJSONString()); System.out.println(">>>" + afterData.toJSONString()); // JSONObject object = new JSONObject(); // object.put("database", database); // object.put("table", table); if (Objects.equals(op, Envelope.Operation.DELETE)) { // starrocks表需要使用主键模型,另外json中需要有{"__op":1}表示删除,{"__op":0}表示upsert beforeData.put("__op", 1); collector.collect(beforeData.toJSONString()); } else if (Objects.equals(op, Envelope.Operation.UPDATE)) { afterData.put("__op", 0); } collector.collect(afterData.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
package *; import com.starrocks.connector.flink.StarRocksSink; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.txlc.cdc.execute.core.TxlcCustomerSchema; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 全量监听mysql同步到starrocks * Print MySQL Snapshot + Binlog * <p> * warning:sr表字段容量要足够,否则会插入NULL * * @author JGMa */ public class FlinkMysqlCDCStarrocks { private static final Logger log = LoggerFactory.getLogger(FlinkMysqlCDCStarrocks.class); public static void main(String[] args) throws Exception { ParameterTool paramTool = ParameterTool.fromArgs(args); // String tableName = paramTool.get("table"); // String srcHost = paramTool.get("srcHost"); // String srcDatabase = paramTool.get("srcDatabase"); // String srcUsername = paramTool.get("srcUsername"); // String srcPassword = paramTool.get("srcPassword"); String tableName = "temp_flink"; String srcHost = "192.168.10.14"; String srcDatabase ="xcode"; String srcUsername ="root"; String srcPassword ="123456"; //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.flinkcdc 做断点续传,需要将flinkcdc读取binlog的位置信息以状态方式保存在checkpoint中即可. //(1)开启checkpoint 每隔5s 执行一次ck 指定ck的一致性语义 // env.enableCheckpointing(5000); // CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // // checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // // //3.设置任务关闭后,保存最后后一次cp数据. // checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); // // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); // // 设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃 // // checkpointConfig.setCheckpointTimeout(600000); // // 设置两次checkpoint之间的最小时间间隔 // checkpointConfig.setMinPauseBetweenCheckpoints(500); // // 设置并发checkpoint的数目 // checkpointConfig.setMaxConcurrentCheckpoints(1); // 有界数据流,则会采用批方式进行数据处理 env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // 开启checkpoints的外部持久化 这里设置了 清除job时保留checkpoint // 目前代码不能设置保留的checkpoint个数 默认值时保留一个 假如要保留3个 // 可以在flink-conf.yaml中配置 state.checkpoints.num-retained: 3 // env.setStateBackend(); //5.创建Sources数据源 MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname(srcHost) .port(3306) .databaseList(srcDatabase) .tableList(srcDatabase + "." + tableName) .username(srcUsername) .password(srcPassword) // converts SourceRecord to JSON String .deserializer(new TxlcCustomerSchema()) .build(); //6.添加数据源 DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "[MySQL Source]") .setParallelism(1); streamSource.addSink(StarRocksSink.sink( StarRocksSinkOptions.builder() .withProperty("connector", "starrocks") .withProperty("jdbc-url", "jdbc:mysql://192.168.10.245:9030?characterEncoding=utf-8&useSSL=false") .withProperty("load-url", "192.168.10.11:8030") .withProperty("username", "root") .withProperty("password", "123456") .withProperty("table-name", tableName) .withProperty("database-name", "data_center") .withProperty("sink.buffer-flush.interval-ms", "10000") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") // .withProperty("sink.properties.column_separator", "\\x01") // .withProperty("sink.properties.row_delimiter", "\\x02") .withProperty("sink.parallelism", "1") .build() )).name(">>>StarRocks Sink<<<"); env.execute("mysql sync StarRocks 表:" + tableName); } }
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.15.3</flink.version> <flink.connector.sr>1.2.5_flink-1.15</flink.connector.sr> <flink.connector.mysql>2.3.0</flink.connector.mysql> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 基础包 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>${flink.connector.sr}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>${flink.connector.mysql}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils</artifactId> <version>${flink.version}</version> <scope>test</scope> <exclusions> <exclusion> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-test-utils</artifactId> <version>${flink.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.29</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.26</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.4</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.3.4</version> <!-- <scope>test</scope>--> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>2.7.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2</artifactId> <version>2.8.3-10.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.5.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <!-- <scope>provided</scope>--> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.10</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> </excludes> </artifactSet> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.txlc.flink.job.CommonFlinkSingleTableStreamJob</mainClass> </transformer> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>TXLC-Flink-CDC</projectName> <encoding>UTF-8</encoding> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
import com.fasterxml.jackson.databind.ObjectMapper; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; /** * @author JGMa */ @Slf4j public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> { @Override public void deserialize(SourceRecord record, Collector<String> out) throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); if (Objects.equals(op, Envelope.Operation.DELETE)) { Map<String, Object> beforeRow = extractBeforeRow(value, valueSchema); beforeRow.put("__op", 1); ObjectMapper objectMapper = new ObjectMapper(); String delete = objectMapper.writeValueAsString(beforeRow); log.debug("\n====>DELETE record info:{}", delete); out.collect(delete); } else if (Objects.equals(op, Envelope.Operation.UPDATE)) { Map<String, Object> afterRow = extractAfterRow(value, valueSchema); afterRow.put("__op", 0); ObjectMapper objectMapper = new ObjectMapper(); String update = objectMapper.writeValueAsString(afterRow); log.debug("\n====>UPDATE record info:{}", update); out.collect(update); } else { Map<String, Object> row = extractAfterRow(value, valueSchema); ObjectMapper objectMapper = new ObjectMapper(); String res = objectMapper.writeValueAsString(row); log.debug("\n====>record info:{}", res); out.collect(res); } } private Map<String, Object> getRowMap(Struct after) { return after.schema().fields().stream().collect(Collectors.toMap(Field::name, f -> after.get(f.name()) == null ? "" : after.get(f.name()))); } private Map<String, Object> extractAfterRow(Struct value, Schema valueSchema) throws Exception { Struct after = value.getStruct(Envelope.FieldName.AFTER); return getRowMap(after); } private Map<String, Object> extractBeforeRow(Struct value, Schema valueSchema) throws Exception { Struct after = value.getStruct(Envelope.FieldName.BEFORE); return getRowMap(after); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } } 日期处理 import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.apache.kafka.connect.data.SchemaBuilder; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.Properties; /** * mysql日期字段时区/格式处理 * @author JGMa */ public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; private ZoneId timestampZoneId = ZoneId.systemDefault(); @Override public void configure(Properties props) { } @Override public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { String sqlType = column.typeName().toUpperCase(); SchemaBuilder schemaBuilder = null; Converter converter = null; if ("DATE".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string"); converter = this::convertDate; } if ("TIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string"); converter = this::convertTime; } if ("DATETIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string"); converter = this::convertDateTime; } if ("TIMESTAMP".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string"); converter = this::convertTimestamp; } if (schemaBuilder != null) { registration.register(schemaBuilder, converter); } } private String convertDate(Object input) { if (input == null) { return null; } if (input instanceof LocalDate) { return dateFormatter.format((LocalDate) input); } if (input instanceof Integer) { LocalDate date = LocalDate.ofEpochDay((Integer) input); return dateFormatter.format(date); } return String.valueOf(input); } private String convertTime(Object input) { if (input == null) { return null; } if (input instanceof Duration) { Duration duration = (Duration) input; long seconds = duration.getSeconds(); int nano = duration.getNano(); LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano); return timeFormatter.format(time); } return String.valueOf(input); } private String convertDateTime(Object input) { if (input == null) { return null; } if (input instanceof LocalDateTime) { return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " "); } return String.valueOf(input); } private String convertTimestamp(Object input) { if (input == null) { return null; } if (input instanceof ZonedDateTime) { // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间 ZonedDateTime zonedDateTime = (ZonedDateTime) input; LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime(); return timestampFormatter.format(localDateTime).replaceAll("T", " "); } return String.valueOf(input); } }
public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); String srcTable = params.get("srcTable", ""); String srcHost = params.get("srcHost", ""); String srcDb = params.get("srcDb", "test_deploy"); String srcUsername = params.get("srcUsername", ""); String srcPassword = params.get("srcPassword", ""); int checkpointInterval = params.getInt("checkpointInterval", 60000); String sinkHost = params.get("sinkHost", "192.168.10.2"); String sinkDb = params.get("sinkDb", "data_center"); String sinkUsername = params.get("sinkUsername", "root"); String sinkPassword = params.get("sinkPassword", "123456"); String sinkTable = params.get("sinkTable", srcTable); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // 关闭 Operator Chaining, 令运行图更容易初学者理解 env.disableOperatorChaining(); // env.setParallelism(parallelism); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, Time.of(10, TimeUnit.SECONDS) )); // 开启checkpoint 每隔5s(5000) 执行一次cp,精确一次(exactly-once)对比至少一次(at-least-once)对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。 env.enableCheckpointing(checkpointInterval); // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 允许两个连续的 checkpoint 错误 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 env.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 开启实验性的 unaligned checkpoints env.getCheckpointConfig().enableUnalignedCheckpoints(); // 设置同步状态存储位置 // env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://192.168.10.245:9000/flink/flink-checkpoints/" + srcTable)); // env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///home/txlc/txlc/flink/flink-checkpoints/" + srcTable)); Properties properties = new Properties(); properties.setProperty("converters", "dateConverters"); properties.setProperty("dateConverters.type", "com.txlc.flink.core.MySqlDateTimeConverter"); properties.setProperty("dateConverters.format.date", "yyyy-MM-dd"); properties.setProperty("dateConverters.format.time", "HH:mm:ss"); properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss"); properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss"); properties.setProperty("dateConverters.format.timestamp.zone", "UTC+8"); //全局读写锁,可能会影响在线业务,跳过锁设置 properties.setProperty("debezium.snapshot.locking.mode", "none"); properties.setProperty("include.schema.changes", "true"); properties.setProperty("bigint.unsigned.handling.mode", "long"); properties.setProperty("decimal.handling.mode", "double"); //自定义时间转换配置 MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname(srcHost) .port(3306) .databaseList(srcDb) .tableList(srcDb + "." + srcTable) .username(srcUsername) .password(srcPassword) // ZoneInfoFile.oldMappings .debeziumProperties(properties) .deserializer(new JsonStringDebeziumDeserializationSchema()) .build(); DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[" + srcTable + "<< source >>" + srcHost + srcDb + "]"); // .setParallelism(parallelism); streamSource.addSink(StarRocksSink.sink( // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://" + sinkHost + ":9030?characterEncoding=utf-8&useSSL=false&connectionTimeZone=Asia/Shanghai") .withProperty("load-url", sinkHost + ":8030") .withProperty("database-name", sinkDb) .withProperty("username", sinkUsername) .withProperty("password", sinkPassword) .withProperty("table-name", sinkTable) // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。 // .withProperty("sink.properties.partial_update", "true") // .withProperty("sink.properties.columns", "k1,k2,k3") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") // 设置并行度,多并行度情况下需要考虑如何保证数据有序性 // .withProperty("sink.parallelism", parallelism+"") .build()) ).name(">>>StarRocks " + sinkTable + " Sink<<<"); env.execute(srcTable + "<< stream sync job >>" + srcHost + srcDb); }
如果解决了你的问题
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。