当前位置:   article > 正文

Canal对MySQL进行数据迁移_canal mysql mysql

canal mysql mysql

Canal简单介绍

贴个官方网址:阿里巴巴MySQL binlog 增量订阅&消费组件

架构图:
基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

以上资料来源官网

Canal的简单使用

MySQL配置

首先需要安装MySQL数据库,目前笔者使用的MySQL版本是最新的8.0.33,这个过程就不在赘述了

在Windows下,一般在ProgramData/MySQL文件夹中就能找到配置文件"mysql.ini",打开,搜索修改或新建以下选项:

# 指定服务的id,这个要与canal中的区分开,因为每个服务节点的id都要不一样
server-id=1
# 生成的binlog文件的前缀名称,Windows下binlog文件一般存在ProgramData/MySQL/Data文件夹中
log-bin="NS9052929-bin"
# binlog日志的记录方式:row、statement、mixed
binlog_format=row
# 需要记录binlog的数据库,使用逗号分割可以指定多个,如不配置则是所有
binlog-do-db=canal-demo
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

然后重启MySQL数据库服务,Windows在服务窗口中就可以重启
在这里插入图片描述

新建数据库

新建一个架构(数据库):canal-demo

CREATE DATABASE `canal-demo` 
/* DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */
  • 1
  • 2

向其中添加一张表

-- auto-generated definition
create table user
(
    id       int auto_increment comment '用户id'
        primary key,
    username varchar(100)   not null comment '用户名',
    age      int default -1 not null comment '用户年龄'
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Canal的配置

在github中的release中下载压缩包:Releases-alibaba/canal
我下载了最新版本
在这里插入图片描述

把它解压到文件中,修改两个配置文件:
在这里插入图片描述
修改cana.properties,将其中的配置项修改:

# 这个要与上面图片中的文件夹名对应起来,其实是对应多个实例
# 如果要新建实例,复制一个example,改名字,并修改其中的配置文件即可
canal.destinations = example
# 设置服务端口,默认为11111
canal.port = 11111
# 设置服务模式,因为下面是对接Java,因此使用TCP
canal.serverMode = tcp
# 设置数据库的连接账号以及密码
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

修改实例中的配置文件:instance.properties

# 设置数据库路径
canal.instance.master.address=127.0.0.1:3306
# 设置数据库的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 服务id,需要与mysql中的区分开
canal.instance.mysql.slaveId=20
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

修改完成后,双击bin/startup.bat,启动canal,看到下面页面则说明启动成功:
在这里插入图片描述

结合Java使用

新建一个maven项目, 向pom.xml文件中加入以下依赖:

<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

编写一个客户端程序,连接canal:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class CanalClient {

    public static void main(String[] args) {
        // hostname, port, destination, username, password,username和password默认为空
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        try {
            connector.connect();
            // 监听的表,格式为数据库.表名,数据库.表名
            connector.subscribe("canal-demo.*");
            // 不断循环获取
            while (true) {
                Message message = connector.getWithoutAck(100); // 获取指定数量的数据
                List<Entry> entries = message.getEntries();
                // 如果没有数据的话就等待1秒
                if (entries.isEmpty()) {
                    log.info("没有数据,休息一下");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    // 有数据的话循环打印出数据
                    for (Entry entry : entries) {
                        String tableName = entry.getHeader().getTableName();
                        log.info("表名:{}", tableName);
                        // 判断Entry类型是否为ROW变换
                        EntryType entryType = entry.getEntryType();
                        if (EntryType.ROWDATA.equals(entryType)) {
                            log.info("ROW变换");
                            // 序列化数据
                            ByteString storeValue = entry.getStoreValue();
                            // 反序列化数据
                            RowChange rowChange = RowChange.parseFrom(storeValue);
                            // 获取事件类型
                            log.info("事件类型:{}", rowChange.getEventType());
                            // 获取具体数据
                            List<RowData> rowDatasList = rowChange.getRowDatasList();
                            for (RowData rowData : rowDatasList) {
                                List<Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                Map<String, Object> beforeMap = new HashMap<>();
                                for (Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                log.info("变化前的数据:{}", beforeMap);
                                List<Column> afterColumnsList = rowData.getAfterColumnsList();
                                Map<String, Object> afterMap = new HashMap<>();
                                for (Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                log.info("变化后的数据:{}", afterMap);
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 最后关闭连接
            connector.disconnect();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

运行程序,insert、update、delete监控的数据库中的数据,就会看到控制台中有打印消息
在这里插入图片描述

结合Spring Boot使用

新建一个Spring Boot应用,向其中添加以下依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>4.0.0-rc-2</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

Spring Booot版本是2.7.11,使用的JDK8
依赖中的canal-spring-boot-starter是其他开源者对canal在Spring Boot中的集成,测试不支持JDK17的版本,因此Spring Boot版本只能为3.0以下。

如果需要使用JDK17的话,也就是Spring Boot 3.0以上或者Spring 6的话,可以用另外一个开发者的包:behappy-canal,当然还有其他开发者也做了canal的集成,大家自己尝试下吧~

<dependency>
    <groupId>io.github.behappy-project</groupId>
    <artifactId>behappy-canal-spring-boot-starter</artifactId>
    <version>3.0.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

配置文件:

# 数据库连接信息
spring.datasource.url=jdbc:mysql://localhost:3306/canal-demo?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
# Spring 服务名称
spring.application.name=canal-spring-boot-demo
# canal的服务地址
canal.server=127.0.0.1:11111
# 需要监控的实例
canal.destination=example
# 关闭日志,不然一秒打印一个日志,浪费空间
logging.level.top.javatool.canal.client=OFF
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

首先新建一个实体类User对应数据库中的字段

@Data
public class User {
    private Integer id;
    private String username;
    private Integer age;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

然后书写一个CanalHandler实现接口EntryHandler<T>,在里面对insert、update、delete这几种操作加入自己的处理

import com.example.springdemo.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@Component
@CanalTable("user")
@Slf4j
public class CanalHandler implements EntryHandler<User> {
    @Override
    public void insert(User user) {
        log.info("插入用户:{}", user.toString());
    }

    @Override
    public void update(User before, User after) {
        log.info("用户修改前:{}", before.toString());
        log.info("用户修改后:{}", after.toString());
    }

    @Override
    public void delete(User user) {
        log.info("删除用户:{}", user.toString());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

运行结果
在这里插入图片描述

总结

通过Canal可以没有侵入的,即时的将数据库的改动同步到Redis、ElasticSearch或者其他数据存储库中,如果是在大数据方面需要数据聚合的话,推荐使用Flink CDC。目前Canal还有一个问题就是似乎不再维护了,但还是为我们提供了一个轻量化的数据迁移、同步工具。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/949300
推荐阅读
相关标签
  

闽ICP备14008679号