当前位置:   article > 正文

flink mysql数据表同步API CDC

flink mysql数据表同步API CDC

概述:

CDC简介 Change Data Capture

API CDC同步数据代码

  1. package com.yclxiao.flinkcdcdemo.api;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  5. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  6. import com.yclxiao.flinkcdcdemo.util.JdbcUtil;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.apache.commons.lang3.time.DateFormatUtils;
  9. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  10. import org.apache.flink.api.common.functions.FilterFunction;
  11. import org.apache.flink.api.common.functions.FlatMapFunction;
  12. import org.apache.flink.configuration.Configuration;
  13. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  14. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  15. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  16. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  17. import org.apache.flink.util.Collector;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. import java.sql.Connection;
  21. import java.sql.DriverManager;
  22. import java.sql.PreparedStatement;
  23. import java.sql.Timestamp;
  24. import java.util.ArrayList;
  25. import java.util.Arrays;
  26. import java.util.List;
  27. import java.util.TimeZone;
  28. /**
  29. * league.oc_settle_profit -> cloud.dws_profit_record_hdj
  30. * API方式
  31. */
  32. public class Wfg2userApi {
  33. private static final Logger LOG = LoggerFactory.getLogger(Wfg2userApi.class);
  34. private static String MYSQL_HOST = "192.168.1.12";
  35. private static int MYSQL_PORT = 3306;
  36. private static String MYSQL_USER = "root";
  37. private static String MYSQL_PASSWD = "123456";
  38. private static String SYNC_DB = "zentao";
  39. private static List<String> SYNC_TABLES = Arrays.asList("zentao.zt_group");
  40. public static void main(String[] args) throws Exception {
  41. MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  42. .hostname(MYSQL_HOST)
  43. .port(MYSQL_PORT)
  44. .databaseList(SYNC_DB) // set captured database
  45. .tableList(String.join(",", SYNC_TABLES)) // set captured table
  46. .username(MYSQL_USER)
  47. .password(MYSQL_PASSWD)
  48. .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
  49. .build();
  50. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  51. env.setParallelism(3);
  52. env.enableCheckpointing(5000);
  53. DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + Wfg2userApi.class.getName());
  54. List<String> tableList = getTableList();
  55. for (String tbl : tableList) {
  56. SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);
  57. SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
  58. SingleOutputStreamOperator<String> logicStream = logic(cleanStream);
  59. logicStream.addSink(new CustomDealDataSink());
  60. }
  61. env.execute(Wfg2userApi.class.getName());
  62. }
  63. private static class CustomDealDataSink extends RichSinkFunction<String> {
  64. private transient Connection cloudConnection;
  65. private transient PreparedStatement cloudPreparedStatement;
  66. private String insertSql = "INSERT INTO `zentao_zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) \n" +
  67. " VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
  68. private String deleteSql = "delete from zentao_zt_group where id = '%s'";
  69. @Override
  70. public void open(Configuration parameters) throws Exception {
  71. super.open(parameters);
  72. // 在这里初始化 JDBC 连接
  73. cloudConnection = DriverManager.getConnection("jdbc:mysql://" + MYSQL_HOST + ":3306/wfg", "root", "123456");
  74. cloudPreparedStatement = cloudConnection.prepareStatement(insertSql);
  75. }
  76. @Override
  77. public void invoke(String value, Context context) throws Exception {
  78. JSONObject dataJson = JSON.parseObject(value);
  79. Long id = dataJson.getLong("id");
  80. Integer project = dataJson.getInteger("project");
  81. String vision = dataJson.getString("vision");
  82. String name = dataJson.getString("name");
  83. String role = dataJson.getString("role");
  84. String desc = dataJson.getString("desc");
  85. String acl = dataJson.getString("acl");
  86. Integer developer = dataJson.getInteger("developer");
  87. cloudPreparedStatement.setLong(1, id);
  88. cloudPreparedStatement.setInt(2, project);
  89. cloudPreparedStatement.setString(3, vision);
  90. cloudPreparedStatement.setString(4, name);
  91. cloudPreparedStatement.setString(5, role);
  92. cloudPreparedStatement.setString(6, desc);
  93. cloudPreparedStatement.setString(7, acl);
  94. cloudPreparedStatement.setInt(8, developer);
  95. cloudPreparedStatement.execute(String.format(deleteSql, id));
  96. cloudPreparedStatement.execute();
  97. }
  98. @Override
  99. public void close() throws Exception {
  100. super.close();
  101. // 在这里关闭 JDBC 连接
  102. cloudPreparedStatement.close();
  103. cloudConnection.close();
  104. }
  105. }
  106. /**
  107. * 处理逻辑:过滤掉部分数据
  108. *
  109. * @param cleanStream
  110. * @return
  111. */
  112. private static SingleOutputStreamOperator<String> logic(SingleOutputStreamOperator<String> cleanStream) {
  113. return cleanStream.filter(new FilterFunction<String>() {
  114. @Override
  115. public boolean filter(String data) throws Exception {
  116. try {
  117. // JSONObject dataJson = JSON.parseObject(data);
  118. // String id = dataJson.getString("id");
  119. // Integer bizType = dataJson.getInteger("biz_type");
  120. // if (StringUtils.isBlank(id) || bizType == null) {
  121. // return false;
  122. // }
  123. // 只处理上岗卡数据
  124. // return bizType == 9;
  125. return true;
  126. } catch (Exception ex) {
  127. LOG.warn("filter other format binlog:{}", data);
  128. return false;
  129. }
  130. }
  131. });
  132. }
  133. /**
  134. * 清晰数据
  135. *
  136. * @param source
  137. * @return
  138. */
  139. private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
  140. return source.flatMap(new FlatMapFunction<String, String>() {
  141. @Override
  142. public void flatMap(String row, Collector<String> out) throws Exception {
  143. try {
  144. LOG.info("============================row:{}", row);
  145. JSONObject rowJson = JSON.parseObject(row);
  146. String op = rowJson.getString("op");
  147. //history,insert,update
  148. if (Arrays.asList("r", "c", "u").contains(op)) {
  149. out.collect(rowJson.getJSONObject("after").toJSONString());
  150. } else {
  151. LOG.info("filter other op:{}", op);
  152. }
  153. } catch (Exception ex) {
  154. LOG.warn("filter other format binlog:{}", row);
  155. }
  156. }
  157. });
  158. }
  159. /**
  160. * 过滤数据
  161. *
  162. * @param source
  163. * @param table
  164. * @return
  165. */
  166. private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
  167. return source.filter(new FilterFunction<String>() {
  168. @Override
  169. public boolean filter(String row) throws Exception {
  170. try {
  171. JSONObject rowJson = JSON.parseObject(row);
  172. JSONObject source = rowJson.getJSONObject("source");
  173. String tbl = source.getString("table");
  174. return table.equals(tbl);
  175. } catch (Exception ex) {
  176. ex.printStackTrace();
  177. return false;
  178. }
  179. }
  180. });
  181. }
  182. private static List<String> getTableList() {
  183. List<String> tables = new ArrayList<>();
  184. String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
  185. List<JSONObject> tableList = JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
  186. for (JSONObject jsob : tableList) {
  187. String schemaName = jsob.getString("TABLE_SCHEMA");
  188. String tblName = jsob.getString("TABLE_NAME");
  189. String schemaTbl = schemaName + "." + tblName;
  190. if (SYNC_TABLES.contains(schemaTbl)) {
  191. tables.add(tblName);
  192. }
  193. }
  194. return tables;
  195. }
  196. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/545879
推荐阅读
相关标签
  

闽ICP备14008679号