赞
踩
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wys</groupId> <artifactId>flink</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.16.1</flink.version> <flink-cdc.version>2.3.0</flink-cdc.version> <slf4j.version>1.7.30</slf4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>${flink.version}</version> </dependency> <!-- mysql-cdc fat jar --> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>${flink-cdc.version}</version> </dependency> <!-- flink webui --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> </dependency> <!--日志相关的依赖 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> <scope>provided</scope> </dependency> <!--flink-connector-starrocks --> <dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>1.2.9_flink-1.16</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.60</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> </dependencies> </project>
package com.wys.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.wys.flink.bean.DataCenterShine; import com.wys.flink.util.DataStreamUtil; import com.wys.flink.util.SourceAndSinkInfo; public class DataStreamMySQLToStarRocks { public static void main(String[] args) throws Exception { // 流执行环境 Configuration conf = new Configuration(); // 设置WebUI绑定的本地端口 conf.setString(RestOptions.BIND_PORT, "8081"); // 使用配置 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.enableCheckpointing(180000l, CheckpointingMode.EXACTLY_ONCE); //设置source和sink的ip端口等信息 SourceAndSinkInfo info=SourceAndSinkInfo.builder() .sourceIp("ip") .sourcePort(3306) .sourceUserName("root") .sourcePassword("****") .sinkIp("ip") .sinkPort(9030) .sinkUserName("root") .sinkPassword("") .build(); //设置DataCenterShine实体类对应表的source和sink DataStreamUtil.setStarRocksSourceAndSink(env, info, DataCenterShine.class); //可以设置多个同步 //DataStreamUtil.setStarRocksSourceAndSink(env, info, Organization.class); //定义任务名称 env.execute("data_center_shine_job"); } }
package com.wys.flink.util; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data @Builder @AllArgsConstructor @NoArgsConstructor public class SourceAndSinkInfo { /** * 数据源ip */ private String sourceIp; /** * 数据源端口 */ private int sourcePort; /** * 数据源账号 */ private String sourceUserName; /** * 数据源密码 */ private String sourcePassword; /** * 输出源ip */ private String sinkIp; /** * 输出源端口 */ private int sinkPort; /** * 输出源账号 */ private String sinkUserName; /** * 输出源密码 */ private String sinkPassword; }
package com.wys.flink.bean; import com.wys.flink.annotation.FieldInfo; import com.wys.flink.annotation.TableName; import lombok.Data; import lombok.EqualsAndHashCode; import java.io.Serializable; /** * <p> * 业务类型映射表 * </p> * * @author wys * @since 2023-05-23 11:16:24 */ @Data @TableName("wsmg.data_center_shine") @EqualsAndHashCode(callSuper=false) public class DataCenterShine extends StarRocksPrimary implements Serializable { private static final long serialVersionUID = 1L; /** * 主键 */ @FieldInfo(order = 1,isPrimaryKey=true,notNull=true) private Integer id; /** * mapper名称 */ @FieldInfo(order = 2) private String busName; /** * mapper类名 */ @FieldInfo(order = 3) private String mapperClassName; /** * 实体类名称 */ @FieldInfo(order = 4) private String entityClassName; }
package com.wys.flink.bean; import org.apache.flink.types.RowKind; import lombok.Data; @Data public class StarRocksPrimary { /** * 用于存储StarRocks数据类型:增、删、改 */ private RowKind rowKind; }
package com.wys.flink.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.FIELD}) public @interface FieldInfo { /** * 字段排序:插入的字段顺序。 * @return */ int order(); /** * 是否为主键:StarRocks主键模型时需要使用 * @methodName isPrimaryKey * @return boolean * @author wys * @date 2023-12-12 */ boolean isPrimaryKey() default false; /** * 不为空:字段是否为空 * @methodName notNull * @return boolean * @author wys * @date 2023-12-12 */ boolean notNull() default false; }
package com.wys.flink.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface TableName { /*** * 表名:库名.表名称,如:sys.user * @return */ String value(); }
package com.wys.flink.util; import java.util.function.Supplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.wys.flink.annotation.TableName; import com.wys.flink.bean.DataCenterShine; import com.wys.flink.sink.MysqlAndStarRocksSink; public class DataStreamUtil { /** * MySQL同步到MySQL的数据源和输出源设置 * @methodName setMySQLSourceAndSink * @param env * @param info * @param cls void * @author wys * @date 2023-12-12 */ /*@SuppressWarnings({ "unchecked", "rawtypes" }) public static <T> void setMySQLSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls) { setSourceAndSink(env, info, cls, ()->new MysqlAndStarRocksSink(cls,info.getSinkIp(), info.getSinkPort())); }*/ /** * MySQL同步到StarRocks的数据源和输出源设置 * @methodName setStarRocksSourceAndSink * @param env * @param info * @param cls void * @author wys * @date 2023-12-12 */ public static <T> void setStarRocksSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls) { setSourceAndSink(env, info, cls, ()->StarRocksSinkUtil.getStarRocksSink(cls, info)); } /** * 数据源和输出源设置 * @methodName setSourceAndSink * @param env * @param info * @param cls * @param sink void * @author wys * @date 2023-12-12 */ @SuppressWarnings({ "unchecked", "rawtypes" }) private static <T> void setSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls,Supplier<SinkFunction<T>> sink) { if(cls.isAnnotationPresent(TableName.class)){ String table=cls.getAnnotation(TableName.class).value(); String[] tableArr=table.split("\\."); // source MySqlSource<T> mySQLSource= MySqlSource.<DataCenterShine>builder() .hostname(info.getSourceIp()) .port(info.getSourcePort()) .databaseList(tableArr[0]) // 设置捕获的数据库, 如果需要同步整个数据库,请将tableList 设置为 ".*". .tableList(table) // 设置捕获的表 .username(info.getSourceUserName()) .password(info.getSourcePassword()) .deserializer(new CustomDebeziumDeserializationSchema(cls)).build(); // 流执行环境添加source DataStreamSource<T> source=env.fromSource(mySQLSource, WatermarkStrategy.noWatermarks(),tableArr[1]+"_source"); // sink source.addSink(sink.get()).name(tableArr[1]+"_sink"); } } }
package com.wys.flink.util; import java.lang.reflect.Field; import java.math.BigDecimal; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.TableSchema.Builder; import org.apache.flink.table.types.DataType; import com.starrocks.connector.flink.StarRocksSink; import com.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.wys.flink.annotation.FieldInfo; import com.wys.flink.annotation.TableName; import com.wys.flink.bean.StarRocksPrimary; /** * StarRocksSink辅助类 * @className StarRocksSinkUtil * @author wys * @date 2023-12-12 */ public class StarRocksSinkUtil { private static final Pattern TPATTERN = Pattern.compile("[A-Z0-9]"); /** * 获取StarRocksSink * @methodName getStarRocksSink * @param cls * @param info * @return SinkFunction<T> * @author wys * @date 2023-12-12 */ @SuppressWarnings("serial") public static <T> SinkFunction<T> getStarRocksSink(Class<T> cls, SourceAndSinkInfo info) { Map<Integer, String> fieldMap = getFieldMap(cls); return StarRocksSink.sink(getTableSchema(cls), getStarRocksSinkOptions(info, cls), new StarRocksSinkRowBuilder<T>() { @Override public void accept(Object[] objects, T beanDataJava) { try { //反射设置objects for (Entry<Integer, String> entry : fieldMap.entrySet()) { Field field = cls.getDeclaredField(entry.getValue()); field.setAccessible(true); Object obj = field.get(beanDataJava); objects[entry.getKey() - 1] = obj; } //设置该数据类型 if(beanDataJava instanceof StarRocksPrimary){ objects[objects.length - 1] = ((StarRocksPrimary) beanDataJava).getRowKind().ordinal(); } } catch (Exception e) { e.printStackTrace(); } } }); } /** * 获取FieldMap * * @methodName initFieldMap void * @author wys * @date 2023-12-11 */ private static <T> Map<Integer, String> getFieldMap(Class<T> cls) { Map<Integer, String> fieldMap = new HashMap<>(); Field[] fields = cls.getDeclaredFields(); for (Field field : fields) { if (field.isAnnotationPresent(FieldInfo.class)) { fieldMap.put(field.getAnnotation(FieldInfo.class).order(), field.getName()); } } return fieldMap; } /** * 获取TableSchema * @methodName getTableSchema * @param cls * @return TableSchema * @author wys * @date 2023-12-12 */ @SuppressWarnings("deprecation") private static <T> TableSchema getTableSchema(Class<T> cls) { Builder builder = TableSchema.builder(); Field[] fields = cls.getDeclaredFields(); //反射设置TableSchema for (Field field : fields) { if (!field.isAnnotationPresent(FieldInfo.class)) { continue; } FieldInfo fi = field.getAnnotation(FieldInfo.class); if (fi.isPrimaryKey()) { builder.primaryKey(field.getName()); } DataType dataType = getDataType(field.getType()); if (fi.notNull()) { dataType = dataType.notNull(); } builder.field(humpToUnderlined(field.getName()), dataType); } return builder.build(); } /** * 获取StarRocksSinkOptions * @methodName getStarRocksSinkOptions * @param info * @param cls * @return StarRocksSinkOptions * @author wys * @date 2023-12-12 */ private static <T> StarRocksSinkOptions getStarRocksSinkOptions(SourceAndSinkInfo info, Class<T> cls) { String table = cls.getAnnotation(TableName.class).value(); String[] tableArr = table.split("\\."); return StarRocksSinkOptions.builder() .withProperty("jdbc-url",String.format("jdbc:mysql://%s:%s/%s", info.getSinkIp(), info.getSinkPort(), tableArr[0])) .withProperty("load-url", info.getSinkIp() + ":8030") .withProperty("username", info.getSinkUserName()) .withProperty("password", info.getSinkPassword()) .withProperty("table-name", tableArr[1]) .withProperty("database-name", tableArr[0]) .withProperty("sink.properties.row_delimiter", "\\x02") .withProperty("sink.properties.column_separator", "\\x01") .withProperty("sink.buffer-flush.interval-ms", "5000").build(); } /** * 驼峰转下划线 * * @methodName humpToUnderlined * @param str * @return String * @author wys * @date 2023-12-12 */ private static String humpToUnderlined(String str) { Matcher matcher = TPATTERN.matcher(str); StringBuffer sb = new StringBuffer(); while (matcher.find()) { matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase()); } matcher.appendTail(sb); return sb.toString(); } /** * 获取数据类型 * @methodName getDataType * @param cls * @return DataType * @author wys * @date 2023-12-12 */ private static DataType getDataType(Class<?> cls) { if (cls.equals(Integer.class)) { return DataTypes.INT(); } else if (cls.equals(String.class)) { return DataTypes.STRING(); } else if (cls.equals(Date.class)) { return DataTypes.TIMESTAMP(); } else if (cls.equals(BigDecimal.class)) { return DataTypes.DECIMAL(8, 2); } throw new RuntimeException("未找到属性相应类型"); } }
package com.wys.flink.util; import java.util.List; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; /** * 自定义反序列化方案 * @className CustomDebeziumDeserializationSchema * @author wys * @date 2023-12-12 */ public class CustomDebeziumDeserializationSchema<T> implements DebeziumDeserializationSchema<T> { private static final long serialVersionUID = 1L; private Class<T> cls; public CustomDebeziumDeserializationSchema(Class<T> cls) { this.cls=cls; } /** * 只有after,则表明插入;若只有before,说明删除;若既有before,也有after,则代表更新 * @methodName deserialize * @param sourceRecord * @param collector void * @author wys * @date 2023-12-12 */ @Override public void deserialize(SourceRecord sourceRecord, Collector<T> collector) { JSONObject resJson = new JSONObject(); try { Struct valueStruct = (Struct) sourceRecord.value(); Struct afterStruct = valueStruct.getStruct("after"); Struct beforeStruct = valueStruct.getStruct("before"); // 修改 if (null!=beforeStruct && null!=afterStruct) { setDataContent(afterStruct, resJson); resJson.put("rowKind", RowKind.UPDATE_AFTER); } // 插入 else if (null!= afterStruct) { setDataContent(afterStruct, resJson); resJson.put("rowKind", RowKind.INSERT); } // 删除 else if (null!= beforeStruct ) { setDataContent(beforeStruct, resJson); resJson.put("rowKind", RowKind.UPDATE_BEFORE); } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("反序列化失败"); } T t =resJson.toJavaObject(cls); collector.collect(t); } /** * 设置数据内容 * @methodName setDataContent * @param struct * @param resJson void * @author wys * @date 2023-12-12 */ private void setDataContent(Struct struct,JSONObject resJson){ List<Field> fields = struct.schema().fields(); for (Field field : fields) { String name = field.name(); Object value = struct.get(name); resJson.put(name, value); } } @Override public TypeInformation<T> getProducedType() { return BasicTypeInfo.of(cls); } }
一、功能描述
二、代码实现
package com.wys.flink; import java.util.ArrayList; import java.util.List; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.wys.flink.annotation.TableName; import com.wys.flink.util.DataStreamUtil; import com.wys.flink.util.SourceAndSinkInfo; /** * 自定义任务:--entity DataCenterShine,Organization * @className CustomStreamCDC * @author wys * @date 2023-12-11 */ public class StarRocksCustomStreamCDC { public static void main(String[] args) throws Exception { List<Class<?>> clsList=new ArrayList<>(); StringBuilder jobName=new StringBuilder(); ParameterTool parameters = ParameterTool.fromArgs(args); String entitys = parameters.get("entity",null); if(null==entitys){ throw new RuntimeException("在Program Arguments中输入需要同步表对应的实体类名称,格式:--entity User,Role..."); } //获取参数内容这里是实体名称的数组 String[] entityArr=entitys.split(","); for(String className:entityArr){ Class<?> cls=getBeanClass(String.format("com.wys.flink.bean.%s", className)); clsList.add(cls); jobName.append(cls.getSimpleName()).append("_"); } jobName.append("job"); // 流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(180000l,CheckpointingMode.EXACTLY_ONCE); SourceAndSinkInfo ssi=SourceAndSinkInfo.builder() .sourceIp("ip") .sourcePort(3306) .sourceUserName("root") .sourcePassword("****") .sinkIp("ip") .sinkPort(9030) .sinkUserName("root") .sinkPassword("****") .build(); //设置输入输出源 clsList.forEach(item->DataStreamUtil.setStarRocksSourceAndSink(env, ssi, item)); env.execute(jobName.toString().toLowerCase()); } /** * 获取class * @methodName getBeanClass * @param className 为全路径 * @return Class<?> * @author wys * @date 2023-05-18 */ private static Class<?> getBeanClass(String className) { try { Class<?> cls= Class.forName(className); if(!cls.isAnnotationPresent(TableName.class)){ throw new RuntimeException("同步的实体类不存在@TableName"); } return cls; } catch (ClassNotFoundException e) { //抛出异常:获取Class失败 throw new RuntimeException(String.format("未找到实体类[%s]", className)); } } }
三、Apache Flink Dashboard执行任务
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。