赞
踩
- package com.yclxiao.flinkcdcdemo.api;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import com.yclxiao.flinkcdcdemo.util.JdbcUtil;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.commons.lang3.time.DateFormatUtils;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.FilterFunction;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.apache.flink.util.Collector;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.Timestamp;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- import java.util.TimeZone;
-
- /**
- * league.oc_settle_profit -> cloud.dws_profit_record_hdj
- * API方式
- */
- public class Wfg2userApi {
- private static final Logger LOG = LoggerFactory.getLogger(Wfg2userApi.class);
- private static String MYSQL_HOST = "192.168.1.12";
- private static int MYSQL_PORT = 3306;
- private static String MYSQL_USER = "root";
- private static String MYSQL_PASSWD = "123456";
- private static String SYNC_DB = "zentao";
- private static List<String> SYNC_TABLES = Arrays.asList("zentao.zt_group");
-
- public static void main(String[] args) throws Exception {
-
-
-
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
- .hostname(MYSQL_HOST)
- .port(MYSQL_PORT)
- .databaseList(SYNC_DB) // set captured database
- .tableList(String.join(",", SYNC_TABLES)) // set captured table
- .username(MYSQL_USER)
- .password(MYSQL_PASSWD)
- .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
- .build();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
- env.enableCheckpointing(5000);
-
- DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + Wfg2userApi.class.getName());
-
- List<String> tableList = getTableList();
- for (String tbl : tableList) {
- SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);
- SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
- SingleOutputStreamOperator<String> logicStream = logic(cleanStream);
- logicStream.addSink(new CustomDealDataSink());
- }
- env.execute(Wfg2userApi.class.getName());
- }
-
- private static class CustomDealDataSink extends RichSinkFunction<String> {
- private transient Connection cloudConnection;
- private transient PreparedStatement cloudPreparedStatement;
-
- private String insertSql = "INSERT INTO `zentao_zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) \n" +
- " VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
- private String deleteSql = "delete from zentao_zt_group where id = '%s'";
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- // 在这里初始化 JDBC 连接
- cloudConnection = DriverManager.getConnection("jdbc:mysql://" + MYSQL_HOST + ":3306/wfg", "root", "123456");
- cloudPreparedStatement = cloudConnection.prepareStatement(insertSql);
- }
-
- @Override
- public void invoke(String value, Context context) throws Exception {
- JSONObject dataJson = JSON.parseObject(value);
- Long id = dataJson.getLong("id");
- Integer project = dataJson.getInteger("project");
- String vision = dataJson.getString("vision");
- String name = dataJson.getString("name");
- String role = dataJson.getString("role");
- String desc = dataJson.getString("desc");
- String acl = dataJson.getString("acl");
- Integer developer = dataJson.getInteger("developer");
-
- cloudPreparedStatement.setLong(1, id);
- cloudPreparedStatement.setInt(2, project);
- cloudPreparedStatement.setString(3, vision);
- cloudPreparedStatement.setString(4, name);
- cloudPreparedStatement.setString(5, role);
- cloudPreparedStatement.setString(6, desc);
- cloudPreparedStatement.setString(7, acl);
- cloudPreparedStatement.setInt(8, developer);
- cloudPreparedStatement.execute(String.format(deleteSql, id));
- cloudPreparedStatement.execute();
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- // 在这里关闭 JDBC 连接
- cloudPreparedStatement.close();
- cloudConnection.close();
- }
- }
-
- /**
- * 处理逻辑:过滤掉部分数据
- *
- * @param cleanStream
- * @return
- */
- private static SingleOutputStreamOperator<String> logic(SingleOutputStreamOperator<String> cleanStream) {
- return cleanStream.filter(new FilterFunction<String>() {
- @Override
- public boolean filter(String data) throws Exception {
- try {
- // JSONObject dataJson = JSON.parseObject(data);
- // String id = dataJson.getString("id");
- // Integer bizType = dataJson.getInteger("biz_type");
- // if (StringUtils.isBlank(id) || bizType == null) {
- // return false;
- // }
- // 只处理上岗卡数据
- // return bizType == 9;
- return true;
- } catch (Exception ex) {
- LOG.warn("filter other format binlog:{}", data);
- return false;
- }
- }
- });
- }
-
- /**
- * 清晰数据
- *
- * @param source
- * @return
- */
- private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
- return source.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String row, Collector<String> out) throws Exception {
- try {
- LOG.info("============================row:{}", row);
- JSONObject rowJson = JSON.parseObject(row);
- String op = rowJson.getString("op");
- //history,insert,update
- if (Arrays.asList("r", "c", "u").contains(op)) {
- out.collect(rowJson.getJSONObject("after").toJSONString());
- } else {
- LOG.info("filter other op:{}", op);
- }
- } catch (Exception ex) {
- LOG.warn("filter other format binlog:{}", row);
- }
- }
- });
- }
-
- /**
- * 过滤数据
- *
- * @param source
- * @param table
- * @return
- */
- private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
- return source.filter(new FilterFunction<String>() {
- @Override
- public boolean filter(String row) throws Exception {
- try {
- JSONObject rowJson = JSON.parseObject(row);
- JSONObject source = rowJson.getJSONObject("source");
- String tbl = source.getString("table");
- return table.equals(tbl);
- } catch (Exception ex) {
- ex.printStackTrace();
- return false;
- }
- }
- });
- }
-
- private static List<String> getTableList() {
- List<String> tables = new ArrayList<>();
- String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
- List<JSONObject> tableList = JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
- for (JSONObject jsob : tableList) {
- String schemaName = jsob.getString("TABLE_SCHEMA");
- String tblName = jsob.getString("TABLE_NAME");
- String schemaTbl = schemaName + "." + tblName;
- if (SYNC_TABLES.contains(schemaTbl)) {
- tables.add(tblName);
- }
- }
- return tables;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。