赞
踩
1、Canal 是一个开源的数据库数据同步工具,用于监控数据库变更并将变更数据传递到指定的消费端,通常用于将MySQL数据库的变更数据传递到其他系统进行处理
2、使用 Canal 同步 MySQL 数据的基本步骤:
安装和配置 Canal 服务器:
canal.properties
文件中设置。启动 Canal 服务器:
编写消费端应用程序:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
- package cn.liu.demo.canal;
-
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.alibaba.otter.canal.protocol.Message;
- import com.google.protobuf.InvalidProtocolBufferException;
-
- import java.net.InetSocketAddress;
- import java.util.List;
- import java.util.Queue;
- import java.util.concurrent.ConcurrentLinkedQueue;
-
- import static com.alibaba.otter.canal.protocol.CanalEntry.*;
- import static com.alibaba.otter.canal.protocol.CanalEntry.EntryType.*;
-
- /**
- * @Author:
- * @Date:2023/8/22 14:42
- * @Description:
- */
- public class CanalClient {
- private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
-
- public static void main(String[] args) {
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "instance1", "admin", "admin");
- try {
- connector.connect();
- // 监听的表,格式为数据库.表名,数据库.表名
- connector.subscribe("user.*");
- //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
- connector.rollback();
- while (true){
- Message message = connector.getWithoutAck(100); // 获取指定数量的数据
- //获取批量ID
- long batchId = message.getId();
- //获取批量的数量
- int size = message.getEntries().size();
-
- //如果没有数据
- if (batchId == -1 || size == 0) {
- try {
- //线程休眠2秒
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- //如果有数据,处理数据
- printEntry(message.getEntries());
- dataHandle(message.getEntries());
- }
- //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
- connector.ack(batchId);
-
-
- //当队列里面堆积的sql大于一定数值的时候就模拟执行
- if (SQL_QUEUE.size() >= 1) {
- executeQueueSql();
- }
- }
-
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- connector.disconnect();
- }
-
- }
-
- /**
- * 模拟执行队列里面的sql语句
- */
- public static void executeQueueSql() {
- int size = SQL_QUEUE.size();
- for (int i = 0; i < size; i++) {
- String sql = SQL_QUEUE.poll();
- System.out.println("[sql]----> " + sql);
- }
- }
-
-
- /**
- * 打印canal server解析binlog获得的实体类信息
- */
- private static void printEntry(List<Entry> entrys) {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == TRANSACTIONBEGIN || entry.getEntryType() == TRANSACTIONEND) {
- //开启/关闭事务的实体类型,跳过
- continue;
- }
- //RowChange对象,包含了一行数据变化的所有特征
- //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
- CanalEntry.RowChange rowChage;
- try {
- rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
- }
- //获取操作类型:insert/update/delete类型
- CanalEntry.EventType eventType = rowChage.getEventType();
- //打印Header信息
- System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
- //判断是否是DDL语句
- if (rowChage.getIsDdl()) {
- System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
- }
- //获取RowChange对象里的每一行数据,打印出来
- for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
- //如果是删除语句
- if (eventType == CanalEntry.EventType.DELETE) {
- printColumn(rowData.getBeforeColumnsList());
- //如果是新增语句
- } else if (eventType == CanalEntry.EventType.INSERT) {
- printColumn(rowData.getAfterColumnsList());
- //如果是更新的语句
- } else {
- //变更前的数据
- System.out.println("------->; before");
- printColumn(rowData.getBeforeColumnsList());
- //变更后的数据
- System.out.println("------->; after");
- printColumn(rowData.getAfterColumnsList());
- }
- }
- }
- }
-
- /**
- * 数据处理
- *
- * @param entrys
- */
- private static void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
- for (Entry entry : entrys) {
- if (ROWDATA == entry.getEntryType()) {
- CanalEntry.RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
- EventType eventType = rowChange.getEventType();
- if (eventType == EventType.DELETE) {
- saveDeleteSql(entry);
- } else if (eventType == EventType.UPDATE) {
- saveUpdateSql(entry);
- } else if (eventType == EventType.INSERT) {
- saveInsertSql(entry);
- }
- }
- }
- }
-
- private static void saveUpdateSql(Entry entry) {
- try {
- RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
- List<RowData> rowDatasList = rowChange.getRowDatasList();
- for (RowData rowData : rowDatasList) {
- List<Column> newColumnList = rowData.getAfterColumnsList();
- StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");
- for (int i = 0; i < newColumnList.size(); i++) {
- sql.append(" " + newColumnList.get(i).getName()
- + " = '" + newColumnList.get(i).getValue() + "'");
- if (i != newColumnList.size() - 1) {
- sql.append(",");
- }
- }
- sql.append(" where ");
- List<Column> oldColumnList = rowData.getBeforeColumnsList();
- for (Column column : oldColumnList) {
- if (column.getIsKey()) {
- //暂时只支持单一主键
- sql.append(column.getName() + "=" + column.getValue());
- break;
- }
- }
- SQL_QUEUE.add(sql.toString());
- }
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 保存删除语句
- *
- * @param entry
- */
- private static void saveDeleteSql(Entry entry) {
- try {
- RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
- List<RowData> rowDatasList = rowChange.getRowDatasList();
- for (RowData rowData : rowDatasList) {
- List<Column> columnList = rowData.getBeforeColumnsList();
- StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");
- for (Column column : columnList) {
- if (column.getIsKey()) {
- //暂时只支持单一主键
- sql.append(column.getName() + "=" + column.getValue());
- break;
- }
- }
- SQL_QUEUE.add(sql.toString());
- }
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 保存插入语句
- *
- * @param entry
- */
- private static void saveInsertSql(Entry entry) {
- try {
- RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
- List<RowData> rowDatasList = rowChange.getRowDatasList();
- for (RowData rowData : rowDatasList) {
- List<Column> columnList = rowData.getAfterColumnsList();
- StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");
- for (int i = 0; i < columnList.size(); i++) {
- sql.append(columnList.get(i).getName());
- if (i != columnList.size() - 1) {
- sql.append(",");
- }
- }
- sql.append(") VALUES (");
- for (int i = 0; i < columnList.size(); i++) {
- sql.append("'" + columnList.get(i).getValue() + "'");
- if (i != columnList.size() - 1) {
- sql.append(",");
- }
- }
- sql.append(")");
- SQL_QUEUE.add(sql.toString());
- }
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- }
-
-
- private static void printColumn(List<CanalEntry.Column> columns) {
- for (CanalEntry.Column column : columns) {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- }
- }
-
-
-
- }
总结:
总结起来,使用 Canal 同步 MySQL 数据的过程涉及到配置 Canal 服务器、编写消费端应用程序以及处理接收到的变更数据。这使您能够将 MySQL 数据库的变更同步到其他系统中,实现数据的实时传输和处理。具体的实现方式会根据您的业务需求和技术栈而有所不同
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。