当前位置:   article > 正文

canal 同步mysql数据_使用canal同步mysql

使用canal同步mysql

1、Canal 是一个开源的数据库数据同步工具,用于监控数据库变更并将变更数据传递到指定的消费端,通常用于将MySQL数据库的变更数据传递到其他系统进行处理

2、使用 Canal 同步 MySQL 数据的基本步骤:

  1. 安装和配置 Canal 服务器:

    • 下载 Canal 服务器安装包或从源代码构建。
    • 配置 Canal 服务器的参数,包括数据库连接信息、监听的数据表、认证配置等。这些配置通常在 canal.properties 文件中设置。
  2. 启动 Canal 服务器:

    • 启动已配置好的 Canal 服务器,它会连接到 MySQL 数据库,监控数据变更。
  3. 编写消费端应用程序:

    • 消费端应用程序可以是您自己编写的任何程序,它会从 Canal 服务器获取数据库变更数据并进行处理。
    • 使用 Canal 提供的客户端 SDK(如 Java 客户端)来连接 Canal 服务器,订阅特定的数据库和数据表。
    • 编写逻辑来处理接收到的变更数据,可以根据业务需求将数据写入其他系统、存储、消息队列等。
  4. 客户端连接canal-server,订阅mysql数据变更逻辑,以及将binglog日志解析成相应的sql语句,可以直接在mysql中执行
  5. 引入的pom
    • <dependency>
          <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.1.4</version>
      </dependency>
    1. package cn.liu.demo.canal;
    2. import com.alibaba.otter.canal.client.CanalConnector;
    3. import com.alibaba.otter.canal.client.CanalConnectors;
    4. import com.alibaba.otter.canal.protocol.CanalEntry;
    5. import com.alibaba.otter.canal.protocol.Message;
    6. import com.google.protobuf.InvalidProtocolBufferException;
    7. import java.net.InetSocketAddress;
    8. import java.util.List;
    9. import java.util.Queue;
    10. import java.util.concurrent.ConcurrentLinkedQueue;
    11. import static com.alibaba.otter.canal.protocol.CanalEntry.*;
    12. import static com.alibaba.otter.canal.protocol.CanalEntry.EntryType.*;
    13. /**
    14. * @Author
    15. * @Date:2023/8/22 14:42
    16. * @Description
    17. */
    18. public class CanalClient {
    19. private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
    20. public static void main(String[] args) {
    21. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "instance1", "admin", "admin");
    22. try {
    23. connector.connect();
    24. // 监听的表,格式为数据库.表名,数据库.表名
    25. connector.subscribe("user.*");
    26. //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
    27. connector.rollback();
    28. while (true){
    29. Message message = connector.getWithoutAck(100); // 获取指定数量的数据
    30. //获取批量ID
    31. long batchId = message.getId();
    32. //获取批量的数量
    33. int size = message.getEntries().size();
    34. //如果没有数据
    35. if (batchId == -1 || size == 0) {
    36. try {
    37. //线程休眠2秒
    38. Thread.sleep(2000);
    39. } catch (InterruptedException e) {
    40. e.printStackTrace();
    41. }
    42. } else {
    43. //如果有数据,处理数据
    44. printEntry(message.getEntries());
    45. dataHandle(message.getEntries());
    46. }
    47. //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
    48. connector.ack(batchId);
    49. //当队列里面堆积的sql大于一定数值的时候就模拟执行
    50. if (SQL_QUEUE.size() >= 1) {
    51. executeQueueSql();
    52. }
    53. }
    54. }catch (Exception e){
    55. e.printStackTrace();
    56. }finally {
    57. connector.disconnect();
    58. }
    59. }
    60. /**
    61. * 模拟执行队列里面的sql语句
    62. */
    63. public static void executeQueueSql() {
    64. int size = SQL_QUEUE.size();
    65. for (int i = 0; i < size; i++) {
    66. String sql = SQL_QUEUE.poll();
    67. System.out.println("[sql]----> " + sql);
    68. }
    69. }
    70. /**
    71. * 打印canal server解析binlog获得的实体类信息
    72. */
    73. private static void printEntry(List<Entry> entrys) {
    74. for (Entry entry : entrys) {
    75. if (entry.getEntryType() == TRANSACTIONBEGIN || entry.getEntryType() == TRANSACTIONEND) {
    76. //开启/关闭事务的实体类型,跳过
    77. continue;
    78. }
    79. //RowChange对象,包含了一行数据变化的所有特征
    80. //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
    81. CanalEntry.RowChange rowChage;
    82. try {
    83. rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    84. } catch (Exception e) {
    85. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
    86. }
    87. //获取操作类型:insert/update/delete类型
    88. CanalEntry.EventType eventType = rowChage.getEventType();
    89. //打印Header信息
    90. System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
    91. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    92. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
    93. eventType));
    94. //判断是否是DDL语句
    95. if (rowChage.getIsDdl()) {
    96. System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
    97. }
    98. //获取RowChange对象里的每一行数据,打印出来
    99. for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
    100. //如果是删除语句
    101. if (eventType == CanalEntry.EventType.DELETE) {
    102. printColumn(rowData.getBeforeColumnsList());
    103. //如果是新增语句
    104. } else if (eventType == CanalEntry.EventType.INSERT) {
    105. printColumn(rowData.getAfterColumnsList());
    106. //如果是更新的语句
    107. } else {
    108. //变更前的数据
    109. System.out.println("------->; before");
    110. printColumn(rowData.getBeforeColumnsList());
    111. //变更后的数据
    112. System.out.println("------->; after");
    113. printColumn(rowData.getAfterColumnsList());
    114. }
    115. }
    116. }
    117. }
    118. /**
    119. * 数据处理
    120. *
    121. * @param entrys
    122. */
    123. private static void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
    124. for (Entry entry : entrys) {
    125. if (ROWDATA == entry.getEntryType()) {
    126. CanalEntry.RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    127. EventType eventType = rowChange.getEventType();
    128. if (eventType == EventType.DELETE) {
    129. saveDeleteSql(entry);
    130. } else if (eventType == EventType.UPDATE) {
    131. saveUpdateSql(entry);
    132. } else if (eventType == EventType.INSERT) {
    133. saveInsertSql(entry);
    134. }
    135. }
    136. }
    137. }
    138. private static void saveUpdateSql(Entry entry) {
    139. try {
    140. RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    141. List<RowData> rowDatasList = rowChange.getRowDatasList();
    142. for (RowData rowData : rowDatasList) {
    143. List<Column> newColumnList = rowData.getAfterColumnsList();
    144. StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");
    145. for (int i = 0; i < newColumnList.size(); i++) {
    146. sql.append(" " + newColumnList.get(i).getName()
    147. + " = '" + newColumnList.get(i).getValue() + "'");
    148. if (i != newColumnList.size() - 1) {
    149. sql.append(",");
    150. }
    151. }
    152. sql.append(" where ");
    153. List<Column> oldColumnList = rowData.getBeforeColumnsList();
    154. for (Column column : oldColumnList) {
    155. if (column.getIsKey()) {
    156. //暂时只支持单一主键
    157. sql.append(column.getName() + "=" + column.getValue());
    158. break;
    159. }
    160. }
    161. SQL_QUEUE.add(sql.toString());
    162. }
    163. } catch (InvalidProtocolBufferException e) {
    164. e.printStackTrace();
    165. }
    166. }
    167. /**
    168. * 保存删除语句
    169. *
    170. * @param entry
    171. */
    172. private static void saveDeleteSql(Entry entry) {
    173. try {
    174. RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    175. List<RowData> rowDatasList = rowChange.getRowDatasList();
    176. for (RowData rowData : rowDatasList) {
    177. List<Column> columnList = rowData.getBeforeColumnsList();
    178. StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");
    179. for (Column column : columnList) {
    180. if (column.getIsKey()) {
    181. //暂时只支持单一主键
    182. sql.append(column.getName() + "=" + column.getValue());
    183. break;
    184. }
    185. }
    186. SQL_QUEUE.add(sql.toString());
    187. }
    188. } catch (InvalidProtocolBufferException e) {
    189. e.printStackTrace();
    190. }
    191. }
    192. /**
    193. * 保存插入语句
    194. *
    195. * @param entry
    196. */
    197. private static void saveInsertSql(Entry entry) {
    198. try {
    199. RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
    200. List<RowData> rowDatasList = rowChange.getRowDatasList();
    201. for (RowData rowData : rowDatasList) {
    202. List<Column> columnList = rowData.getAfterColumnsList();
    203. StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");
    204. for (int i = 0; i < columnList.size(); i++) {
    205. sql.append(columnList.get(i).getName());
    206. if (i != columnList.size() - 1) {
    207. sql.append(",");
    208. }
    209. }
    210. sql.append(") VALUES (");
    211. for (int i = 0; i < columnList.size(); i++) {
    212. sql.append("'" + columnList.get(i).getValue() + "'");
    213. if (i != columnList.size() - 1) {
    214. sql.append(",");
    215. }
    216. }
    217. sql.append(")");
    218. SQL_QUEUE.add(sql.toString());
    219. }
    220. } catch (InvalidProtocolBufferException e) {
    221. e.printStackTrace();
    222. }
    223. }
    224. private static void printColumn(List<CanalEntry.Column> columns) {
    225. for (CanalEntry.Column column : columns) {
    226. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    227. }
    228. }
    229. }

 总结:

总结起来,使用 Canal 同步 MySQL 数据的过程涉及到配置 Canal 服务器、编写消费端应用程序以及处理接收到的变更数据。这使您能够将 MySQL 数据库的变更同步到其他系统中,实现数据的实时传输和处理。具体的实现方式会根据您的业务需求和技术栈而有所不同

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号