当前位置:   article > 正文

Flink DataStream API CDC同步MySQL数据到StarRocks_flink tableapi 同步starrocks

flink tableapi 同步starrocks

一、版本信息

  • Flink:1.16.1

二代码实现

  • pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wys</groupId>
    <artifactId>flink</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.16.1</flink.version>
        <flink-cdc.version>2.3.0</flink-cdc.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- mysql-cdc fat jar -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>

        <!-- flink webui -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--日志相关的依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <scope>provided</scope>
        </dependency>
        
		<!--flink-connector-starrocks -->
		<dependency>
			<groupId>com.starrocks</groupId>
			<artifactId>flink-connector-starrocks</artifactId>
			<version>1.2.9_flink-1.16</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.60</version>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.12</version>
		</dependency>
    </dependencies>

</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • Java代码
package com.wys.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.wys.flink.bean.DataCenterShine;
import com.wys.flink.util.DataStreamUtil;
import com.wys.flink.util.SourceAndSinkInfo;

public class DataStreamMySQLToStarRocks {

	public static void main(String[] args) throws Exception {
		// 流执行环境
		Configuration conf = new Configuration();
		// 设置WebUI绑定的本地端口
		conf.setString(RestOptions.BIND_PORT, "8081");
		// 使用配置
		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
		env.enableCheckpointing(180000l, CheckpointingMode.EXACTLY_ONCE);
		//设置source和sink的ip端口等信息
		SourceAndSinkInfo info=SourceAndSinkInfo.builder()
				.sourceIp("ip")
				.sourcePort(3306)
				.sourceUserName("root")
				.sourcePassword("****")
				.sinkIp("ip")
				.sinkPort(9030)
				.sinkUserName("root")
				.sinkPassword("")
				.build();
		
		//设置DataCenterShine实体类对应表的source和sink
		DataStreamUtil.setStarRocksSourceAndSink(env, info, DataCenterShine.class);
		//可以设置多个同步
		//DataStreamUtil.setStarRocksSourceAndSink(env, info, Organization.class);
		//定义任务名称
		env.execute("data_center_shine_job");
		
	}
	
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息
package com.wys.flink.util;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SourceAndSinkInfo {
	
	/**
	 * 数据源ip
	 */
	private String sourceIp;
	/**
	 * 数据源端口
	 */
	private int sourcePort;
	/**
	 * 数据源账号
	 */
	private String sourceUserName;
	/**
	 * 数据源密码
	 */
	private String sourcePassword;
	
	
	/**
	 * 输出源ip
	 */
	private String sinkIp;
	/**
	 * 输出源端口
	 */
	private int sinkPort;
	/**
	 * 输出源账号
	 */
	private String sinkUserName;
	/**
	 * 输出源密码
	 */
	private String sinkPassword;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • DataCenterShine实体类,字段与数据库一一对应。

	package com.wys.flink.bean;


import com.wys.flink.annotation.FieldInfo;
import com.wys.flink.annotation.TableName;

import lombok.Data;
import lombok.EqualsAndHashCode;

import java.io.Serializable;


/**
 * <p>
 * 业务类型映射表
 * </p>
 *
 * @author wys
 * @since 2023-05-23 11:16:24
 */
@Data
@TableName("wsmg.data_center_shine")
@EqualsAndHashCode(callSuper=false)
public class DataCenterShine extends StarRocksPrimary implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 主键
     */
    @FieldInfo(order = 1,isPrimaryKey=true,notNull=true)
    private Integer id;

    /**
     * mapper名称
     */
    @FieldInfo(order = 2)
    private String busName;

    /**
     * mapper类名
     */
    @FieldInfo(order = 3)
    private String mapperClassName;

    /**
     * 实体类名称
     */
    @FieldInfo(order = 4)
    private String entityClassName;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • StarRocksPrimary 实体类
package com.wys.flink.bean;

import org.apache.flink.types.RowKind;

import lombok.Data;

@Data
public class StarRocksPrimary {
	
	/**
	 * 用于存储StarRocks数据类型:增、删、改
	 */
	private RowKind rowKind;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。
package com.wys.flink.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface FieldInfo {
    /**
     * 字段排序:插入的字段顺序。
     * @return
     */
    int order();
    
    /**
     * 是否为主键:StarRocks主键模型时需要使用
     * @methodName isPrimaryKey
     * @return boolean
     * @author wys
     * @date 2023-12-12
     */
    boolean isPrimaryKey() default false;
    
    /**
     * 不为空:字段是否为空
     * @methodName notNull
     * @return boolean
     * @author wys
     * @date 2023-12-12
     */
    boolean notNull() default false;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • TableName 注解类,用于记录实体类对应的库与表
package com.wys.flink.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface TableName {
    /***
     * 表名:库名.表名称,如:sys.user
     * @return
     */
    String value();
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • DataStreamUtil工具类,用于设置source和sink。目前定义了MySQL同步到MySQL以及MySQL同步到StarRocks。
package com.wys.flink.util;

import java.util.function.Supplier;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.wys.flink.annotation.TableName;
import com.wys.flink.bean.DataCenterShine;
import com.wys.flink.sink.MysqlAndStarRocksSink;

public class DataStreamUtil {

	/**
	 * MySQL同步到MySQL的数据源和输出源设置
	 * @methodName setMySQLSourceAndSink
	 * @param env
	 * @param info
	 * @param cls void
	 * @author wys
	 * @date 2023-12-12
	 */
	/*@SuppressWarnings({ "unchecked", "rawtypes" })
	public static <T> void setMySQLSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls) {
		setSourceAndSink(env, info, cls, ()->new MysqlAndStarRocksSink(cls,info.getSinkIp(), info.getSinkPort()));
	}*/

	
	/**
	 * MySQL同步到StarRocks的数据源和输出源设置
	 * @methodName setStarRocksSourceAndSink
	 * @param env
	 * @param info
	 * @param cls void
	 * @author wys
	 * @date 2023-12-12
	 */
	public static <T> void setStarRocksSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls) {
		setSourceAndSink(env, info, cls, ()->StarRocksSinkUtil.getStarRocksSink(cls, info));
	}
	
	/**
	 * 数据源和输出源设置
	 * @methodName setSourceAndSink
	 * @param env
	 * @param info
	 * @param cls
	 * @param sink void
	 * @author wys
	 * @date 2023-12-12
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	private static <T> void setSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls,Supplier<SinkFunction<T>> sink) {
		if(cls.isAnnotationPresent(TableName.class)){
			String table=cls.getAnnotation(TableName.class).value();
			String[] tableArr=table.split("\\.");
			// source
			MySqlSource<T> mySQLSource= MySqlSource.<DataCenterShine>builder()
					.hostname(info.getSourceIp())
					.port(info.getSourcePort())
					.databaseList(tableArr[0]) // 设置捕获的数据库, 如果需要同步整个数据库,请将tableList 设置为 ".*".
					.tableList(table) // 设置捕获的表
					.username(info.getSourceUserName())
					.password(info.getSourcePassword())
					.deserializer(new CustomDebeziumDeserializationSchema(cls)).build();
			// 流执行环境添加source
			DataStreamSource<T> source=env.fromSource(mySQLSource, WatermarkStrategy.noWatermarks(),tableArr[1]+"_source");
			// sink
			source.addSink(sink.get()).name(tableArr[1]+"_sink");
		}
	}
		
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • StarRocksSinkUtil辅助类,用于设置StarRocksSink
package com.wys.flink.util;

import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.TableSchema.Builder;
import org.apache.flink.table.types.DataType;

import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.wys.flink.annotation.FieldInfo;
import com.wys.flink.annotation.TableName;
import com.wys.flink.bean.StarRocksPrimary;

/**
 * StarRocksSink辅助类
 * @className StarRocksSinkUtil
 * @author wys
 * @date 2023-12-12
 */
public class StarRocksSinkUtil {

	private static final Pattern TPATTERN = Pattern.compile("[A-Z0-9]");

	/**
	 * 获取StarRocksSink
	 * @methodName getStarRocksSink
	 * @param cls
	 * @param info
	 * @return SinkFunction<T>
	 * @author wys
	 * @date 2023-12-12
	 */
	@SuppressWarnings("serial")
	public static <T> SinkFunction<T> getStarRocksSink(Class<T> cls, SourceAndSinkInfo info) {
		Map<Integer, String> fieldMap = getFieldMap(cls);
		return StarRocksSink.sink(getTableSchema(cls), getStarRocksSinkOptions(info, cls),
				new StarRocksSinkRowBuilder<T>() {
					@Override
					public void accept(Object[] objects, T beanDataJava) {
						try {
							//反射设置objects
							for (Entry<Integer, String> entry : fieldMap.entrySet()) {
								Field field = cls.getDeclaredField(entry.getValue());
								field.setAccessible(true);
								Object obj = field.get(beanDataJava);
								objects[entry.getKey() - 1] = obj;
							}
							//设置该数据类型
							if(beanDataJava instanceof StarRocksPrimary){
								objects[objects.length - 1] = ((StarRocksPrimary) beanDataJava).getRowKind().ordinal();
							}
						} catch (Exception e) {
							e.printStackTrace();
						}

					}
				});
	}

	/**
	 * 获取FieldMap
	 * 
	 * @methodName initFieldMap void
	 * @author wys
	 * @date 2023-12-11
	 */
	private static <T> Map<Integer, String> getFieldMap(Class<T> cls) {
		Map<Integer, String> fieldMap = new HashMap<>();
		Field[] fields = cls.getDeclaredFields();
		for (Field field : fields) {
			if (field.isAnnotationPresent(FieldInfo.class)) {
				fieldMap.put(field.getAnnotation(FieldInfo.class).order(), field.getName());
			}
		}
		return fieldMap;
	}

	/**
	 * 获取TableSchema
	 * @methodName getTableSchema
	 * @param cls
	 * @return TableSchema
	 * @author wys
	 * @date 2023-12-12
	 */
	@SuppressWarnings("deprecation")
	private static <T> TableSchema getTableSchema(Class<T> cls) {
		Builder builder = TableSchema.builder();
		Field[] fields = cls.getDeclaredFields();
		//反射设置TableSchema
		for (Field field : fields) {
			if (!field.isAnnotationPresent(FieldInfo.class)) {
				continue;
			}
			FieldInfo fi = field.getAnnotation(FieldInfo.class);
			if (fi.isPrimaryKey()) {
				builder.primaryKey(field.getName());
			}
			DataType dataType = getDataType(field.getType());
			if (fi.notNull()) {
				dataType = dataType.notNull();
			}
			builder.field(humpToUnderlined(field.getName()), dataType);
		}
		return builder.build();
	}
	
	/**
	 * 获取StarRocksSinkOptions
	 * @methodName getStarRocksSinkOptions
	 * @param info
	 * @param cls
	 * @return StarRocksSinkOptions
	 * @author wys
	 * @date 2023-12-12
	 */
	private static <T> StarRocksSinkOptions getStarRocksSinkOptions(SourceAndSinkInfo info, Class<T> cls) {
		String table = cls.getAnnotation(TableName.class).value();
		String[] tableArr = table.split("\\.");
		return StarRocksSinkOptions.builder()
				.withProperty("jdbc-url",String.format("jdbc:mysql://%s:%s/%s", info.getSinkIp(), info.getSinkPort(), tableArr[0]))
				.withProperty("load-url", info.getSinkIp() + ":8030")
				.withProperty("username", info.getSinkUserName())
				.withProperty("password", info.getSinkPassword())
				.withProperty("table-name", tableArr[1])
				.withProperty("database-name", tableArr[0])
				.withProperty("sink.properties.row_delimiter", "\\x02")
				.withProperty("sink.properties.column_separator", "\\x01")
				.withProperty("sink.buffer-flush.interval-ms", "5000").build();
	}

	/**
	 * 驼峰转下划线
	 * 
	 * @methodName humpToUnderlined
	 * @param str
	 * @return String
	 * @author wys
	 * @date 2023-12-12
	 */
	private static String humpToUnderlined(String str) {
		Matcher matcher = TPATTERN.matcher(str);
		StringBuffer sb = new StringBuffer();
		while (matcher.find()) {
			matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
		}
		matcher.appendTail(sb);
		return sb.toString();
	}

	/**
	 * 获取数据类型
	 * @methodName getDataType
	 * @param cls
	 * @return DataType
	 * @author wys
	 * @date 2023-12-12
	 */
	private static DataType getDataType(Class<?> cls) {
		if (cls.equals(Integer.class)) {
			return DataTypes.INT();
		} else if (cls.equals(String.class)) {
			return DataTypes.STRING();
		} else if (cls.equals(Date.class)) {
			return DataTypes.TIMESTAMP();
		} else if (cls.equals(BigDecimal.class)) {
			return DataTypes.DECIMAL(8, 2);
		}
		throw new RuntimeException("未找到属性相应类型");
	}

	

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • CustomDebeziumDeserializationSchema实体类,自定义反序列化方案
package com.wys.flink.util;




import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;



/**
 * 自定义反序列化方案
 * @className CustomDebeziumDeserializationSchema
 * @author wys
 * @date 2023-12-12
 */
public class CustomDebeziumDeserializationSchema<T> implements DebeziumDeserializationSchema<T> {

	private static final long serialVersionUID = 1L;

    private Class<T> cls;

    public CustomDebeziumDeserializationSchema(Class<T> cls) {
        this.cls=cls;
    }

    /**
     * 只有after,则表明插入;若只有before,说明删除;若既有before,也有after,则代表更新
     * @methodName deserialize
     * @param sourceRecord
     * @param collector void
     * @author wys
     * @date 2023-12-12
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<T> collector) {
        JSONObject resJson = new JSONObject();
        try {
            Struct valueStruct = (Struct) sourceRecord.value();
            Struct afterStruct = valueStruct.getStruct("after");
            Struct beforeStruct = valueStruct.getStruct("before");
            // 修改
            if (null!=beforeStruct &&  null!=afterStruct) {
            	setDataContent(afterStruct, resJson);
                resJson.put("rowKind", RowKind.UPDATE_AFTER);
            } 
            // 插入
            else if (null!= afterStruct) {
            	setDataContent(afterStruct, resJson);
            	resJson.put("rowKind", RowKind.INSERT);
            } 
            // 删除
            else if (null!= beforeStruct ) {
            	setDataContent(beforeStruct, resJson);
                resJson.put("rowKind", RowKind.UPDATE_BEFORE);
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("反序列化失败");
        }
        T t =resJson.toJavaObject(cls);
        collector.collect(t);
    }
    
    /**
     * 设置数据内容
     * @methodName setDataContent
     * @param struct
     * @param resJson void
     * @author wys
     * @date 2023-12-12
     */
    private void setDataContent(Struct struct,JSONObject resJson){
    	  List<Field> fields = struct.schema().fields();
          for (Field field : fields) {
          	String name = field.name();
          	Object value = struct.get(name);
            resJson.put(name, value);
          }
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return BasicTypeInfo.of(cls);
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98

三、自定义MySQL同步数据到StarRocks

一、功能描述

  • 通过上传jar到Apache Flink Dashboard,输入需要同步的表,可自动生成任务

二、代码实现

package com.wys.flink;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.wys.flink.annotation.TableName;
import com.wys.flink.util.DataStreamUtil;
import com.wys.flink.util.SourceAndSinkInfo;

/**
 * 自定义任务:--entity DataCenterShine,Organization
 * @className CustomStreamCDC
 * @author wys
 * @date 2023-12-11
 */
public class StarRocksCustomStreamCDC {
	
	public static void main(String[] args) throws Exception {
		List<Class<?>> clsList=new ArrayList<>();
		StringBuilder jobName=new StringBuilder();
		ParameterTool parameters = ParameterTool.fromArgs(args);
		String entitys = parameters.get("entity",null);
		if(null==entitys){
			throw new RuntimeException("在Program Arguments中输入需要同步表对应的实体类名称,格式:--entity User,Role...");
		}
		//获取参数内容这里是实体名称的数组
		String[] entityArr=entitys.split(",");
		for(String className:entityArr){
			Class<?> cls=getBeanClass(String.format("com.wys.flink.bean.%s", className));
			clsList.add(cls);
			jobName.append(cls.getSimpleName()).append("_");
		}
		jobName.append("job");

		// 流执行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.enableCheckpointing(180000l,CheckpointingMode.EXACTLY_ONCE);
		SourceAndSinkInfo ssi=SourceAndSinkInfo.builder()
				.sourceIp("ip")
				.sourcePort(3306)
				.sourceUserName("root")
				.sourcePassword("****")
				.sinkIp("ip")
				.sinkPort(9030)
				.sinkUserName("root")
				.sinkPassword("****")
				.build();
		
		//设置输入输出源
		clsList.forEach(item->DataStreamUtil.setStarRocksSourceAndSink(env, ssi, item));
		env.execute(jobName.toString().toLowerCase());
	}
	

	/**
	 * 获取class
	 * @methodName getBeanClass
	 * @param className 为全路径
	 * @return Class<?>
	 * @author wys
	 * @date 2023-05-18
	 */
	private static Class<?> getBeanClass(String className) {
		try {
			Class<?> cls= Class.forName(className);
			if(!cls.isAnnotationPresent(TableName.class)){
				throw new RuntimeException("同步的实体类不存在@TableName");
			}
			return cls;
		} catch (ClassNotFoundException e) {
			//抛出异常:获取Class失败
			throw new RuntimeException(String.format("未找到实体类[%s]", className));
		}
	}


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

三、Apache Flink Dashboard执行任务

  • 在Apache Flink Dashboard的Submit New Job菜单,上传打包的jar,输入执行的主类,以及需要同步的表所对应的实体类(多个逗号分割)
    在这里插入图片描述
  • 点击Submit生成相应任务
    在这里插入图片描述
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/579457
推荐阅读
相关标签