赞
踩
前提: 被监听数据库需要开启bin_log , 账号需要有可查看bin_log日志权限
<flink-version>1.12.0</flink-version> <flink-mysql-version>2.0.0</flink-mysql-version> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>${flink-mysql-version}</version> </dependency> <!--flinkcdc 结束-->
/** * @author: xrp * @date: 2022/05/05/10:25 * @description */ public class LinkConfig implements DebeziumDeserializationSchema<String> { /** * * @param sourceRecord * @param collector */ @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) { JSONObject result = new JSONObject(); String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); result.put("db", fields[1]); result.put("tableName", fields[2]); //获取before数据 Struct value = (Struct) sourceRecord.value(); Struct before = value.getStruct("before"); JSONObject beforeJson = new JSONObject(); if (before != null) { beforeJson = getJson(before); } result.put("before", beforeJson); //获取after数据 Struct after = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if (after != null) { //获取列信息 afterJson = getJson(after); } result.put("after", afterJson); //获取操作类型 Envelope.Operation operation = Envelope.operationFor(sourceRecord); result.put("op", operation); collector.collect(result.toJSONString()); } private JSONObject getJson(Struct struct) { JSONObject jsonObject = new JSONObject(); //获取列信息 Schema schema = struct.schema(); List<Field> fieldList = schema.fields(); for (Field field : fieldList) { jsonObject.put(field.name(), struct.get(field)); } return jsonObject; } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
/** * @author: xrp * @date: 2022/05/05/14:49 * @description */ @Component public class StartUp implements CommandLineRunner { @Override public void run(String... args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("要监听的数据库地址") .port(端口号) .username("数据库账户") .password("数据库密码") .databaseList("数据库") .tableList("数据库.数据库表1", "数据库。数据库表2") //多个表逗号分隔 .deserializer(new LinkConfig()) //自定义返回结果集 .startupOptions(StartupOptions.initial()) .serverTimeZone("UTC") .build(); DataStreamSource<String> streamSource = env.addSource(sourceFunction); // 多表进行分片处理 OutputTag<String> orderTag = new OutputTag<>("表1", Types.STRING); OutputTag<String> userTag = new OutputTag<>("表2",Types.STRING); SingleOutputStreamOperator<String> process = streamSource.map((MapFunction<String, JSONObject>) JSON::parseObject).process(new ProcessFunction<JSONObject, String>() { @Override public void processElement(JSONObject value, Context context, Collector<String> collector) { if ("表1".equals(value.getString("tableName"))) { context.output(orderTag, value.toJSONString()); } else if ("表2".equals(value.getString("tableName"))) { context.output(userTag, value.toJSONString()); } } }); DataStream<String> orderStream = process.getSideOutput(orderTag); DataStream<String> userStream = process.getSideOutput(userTag); orderStream.print(); userStream.print(); //自定义sink streamSource.addSink(new ListenerOrderSink()); userStream.addSink(new ListenerUserSink()); env.executeAsync("fLinkCDC"); } }
/** * @author: xrp * @date: 2022/05/24/17:27 * @description 订单sink */ public class ListenerOrderSink extends RichSinkFunction<String> { private static final Logger LOGGER = LoggerFactory.getLogger(ListenerOrderSink.class); private PreparedStatement ps = null; private Connection connection = null; String driver = "com.mysql.cj.jdbc.Driver"; String url = "jdbc:mysql://要将监听到的数据同步到哪,另一个数据库地址:端口号/数据库名字?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"; String username = "数据库账号"; String password = "数据库密码"; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConn(); ps = connection.prepareStatement("insert into 数据库名.表名values (?,?,?,?,?,?,?,?,?,?,?)"); } private Connection getConn() { try { Class.forName(driver); connection = DriverManager.getConnection(url, username, password); LOGGER.error("数据库连接成功"); } catch (Exception e) { LOGGER.error("数据库连接失败"); } return connection; } @Override public void invoke(String p, Context context) throws Exception { //TranslateJson 将自定义的返回结果集转为具体实体 TranslateJson translateJson = JSON.parseObject(p, TranslateJson.class); if (ConstantValue.CREATE_INFO.equals(translateJson.getOp())) { ErpOrder erpOrder = JSON.parseObject(translateJson.getAfter(), ErpOrder.class); // 注意: 字段个数需要与表字段个数对应上 ps.setString(1,erpOrder.getId()); ps.setString(2,erpOrder.getCode()); ps.setString(3,erpOrder.getCustomerName()); ps.setBigDecimal(4,erpOrder.getOrderAmount()); ps.setString(5,erpOrder.getUserId()); ps.setString(6,erpOrder.getUserName()); ps.setString(7,erpOrder.getCreateName()); ps.setString(8,erpOrder.getCreateDate()); ps.setInt(9,erpOrder.getStatus()); ps.setInt(10,erpOrder.getOrderId()); ps.setString(11,erpOrder.getOrderCode()); ps.executeUpdate(); } } @Override public void close() throws Exception { super.close(); if(connection != null){ connection.close(); } if (ps != null){ ps.close(); } } }
/**
* @author: xrp
* @date: 2022/05/05/10:50
* @description
*/
@Data
public class TranslateJson {
private static final long serialVersionUID = -74375380912179188L;
private String op;
private String before;
private String after;
private String db;
private String tableName;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。