当前位置:   article > 正文

Canal实现数据同步_canal数据库同步

canal数据库同步

1、Canal实现数据同步

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

1.1 Canal工作原理

在这里插入图片描述

原理相对比较简单:

1、canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议。

2、mysql master收到dump请求,开始推送binary log给slave(也就是canal)。

3、canal解析binary log对象(原始为byte流)。

canal需要使用到mysql,我们需要先安装mysql,canal是基于mysql的主从模式实现的,所以必须先开启

binlog。

1.2 安装MySQL

# 搜索MySQL镜像
$ docker search mysql
  • 1
  • 2
# 拉取MySQL镜像
$ docker pull mysql:5.5
  • 1
  • 2
# 启动MySQL
$ docker run -d -p 3306:3306 -v /home/zhangshixing/linuxmysql/conf:/etc/mysql/conf.d -v /home/zhangshixing/linuxmysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name zsxmysq1 mysql:5.5
  • 1
  • 2
$ docker exec -it fc566f7d1857 /bin/bash
  • 1
$ mysql -h 192.168.94.186 -u root -p
  • 1

在这里插入图片描述

1.3 开启binlog模式

(1) 、连接到mysql中,并修改 /etc/mysql/my.cnf 需要开启主从模式,开启binlog模式。

执行如下命令,编辑 mysql 配置文件:

$ docker exec -it fc566f7d1857 /bin/bash
# 该目录一定要存在,否则log日志无法写入
$ cd /etc/mysql/
$ vim my.cnf
  • 1
  • 2
  • 3
  • 4

没有vim,需要安装vim:

$ apt-get update
$ apt-get install vim
  • 1
  • 2

修改 my.cnf 配置文件,添加如下配置:

# 开启logbin
log-bin=/var/lib/mysql/mysql-bin
# binlog日志格式
binlog-format=ROW
# mysql主从备份serverId,canal中不能与此相同
server-id=12345
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

(2) 、创建账号,用于测试使用,使用root账号创建用户并授予权限

# CREATE USER <用户名> [ IDENTIFIED ] BY [ PASSWORD ] <口令>
# <用户名>格式为 'user_name'@'host_name',这里user_name是用户名,host_name为主机名
# "%" 表示一组主机
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, INSERT, DELETE, UPDATE, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(3)、重启mysql容器

$ docker restart fc566f7d1857
  • 1

(4)、查看配置是否生效

$ docker exec -it fc566f7d1857 /bin/bash
$ mysql -h 192.168.94.186 -u canal -p
  • 1
  • 2

在这里插入图片描述

# binlog日志文件
$ show master status;
  • 1
  • 2

在这里插入图片描述

# 查看是否配置成功
$ show variables like 'binlog_format';
  • 1
  • 2

在这里插入图片描述

查看日志文件:

在这里插入图片描述

使用Navicat新建canal_test 数据库和t_user_1表。

# 查看日志信息
$ mysqlbinlog -vv mysql-bin.000001
  • 1
  • 2

在这里插入图片描述

1.4 canal容器安装

下载镜像:

$ docker pull docker.io/canal/canal-server
  • 1

容器安装:

$ docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
  • 1

进入容器,修改核心配置canal.propertiesinstance.propertiescanal.properties 是canal自身的

配置,instance.properties是需要同步数据的数据库连接配置。

执行代码如下:

$ docker exec -it canal /bin/bash
$ cd canal-server/conf/
$ vim canal.properties
$ cd example/
$ vim instance.properties
  • 1
  • 2
  • 3
  • 4
  • 5

修改canal.properties的id,不能和mysql的server-id重复,如下图:

在这里插入图片描述

修改instance.properties,配置数据库连接地址:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

这里的canal.instance.filter.regex有多种配置,如下:

可以参考地址如下:

https://github.com/alibaba/canal/wiki/AdminGuide
  • 1

mysql 数据解析关注的表,Perl正则表达式。

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)

常见例子:

1、所有表:.* or .*\\..*

2、canal schema下所有表:canal\\..*

3、canal下的以canal打头的表:canal\\.canal.*

4、canal schema下的一张表:canal.test1

5、多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

注意:此过滤条件只针对row模式的数据有效(mixed/statement因为不解析sql,所以无法准确提取

tableName进行过滤)。

配置完成后,设置开机启动,并记得重启canal。

$ docker update --restart=always canal
$ docker restart canal
  • 1
  • 2

验证配置是否成功:

$ docker exec -it canal /bin/bash
$ cd canal-server/logs/example/
# 查看日志
$ tail -100f example.log  
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

1.5 canal服务搭建

当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。

1.5.1 安装辅助jar包

spring-boot-starter-canal-master中有一个工程starter-canal,它主要提供了SpringBoot环境下

canal的支持,我们需要先安装该工程,在starter-canal目录下执行mvn install,如下图:

在这里插入图片描述

1.5.2 canal服务工程搭建

spring-boot-starter-canal-master中创建一个工程canal-test

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example</groupId>
	<artifactId>cana-test</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>cana-test</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.0.RELEASE</version>
		<relativePath/>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>com.xpand</groupId>
			<artifactId>starter-canal</artifactId>
			<version>0.0.1-SNAPSHOT</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

application.yml配置

# canal配置
canal.client.instances.example.host=192.168.94.186
canal.client.instances.example.port=11111
  • 1
  • 2
  • 3

监听创建

创建一个CanalDataEventListener类,实现对表增删改操作的监听,代码如下:

package com.example.canatest.config;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.*;

/**
 * @author zhangshixing
 */
@CanalEventListener
public class MyEventListener {

    /**
     * 增加数据监听
     *
     * @param eventType
     * @param rowData
     */
    @InsertListenPoint
    public void onEvent1(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("InsertListenPoint");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }

    /**
     * 修改数据监听
     *
     * @param rowData
     */
    @UpdateListenPoint
    public void onEvent2(CanalEntry.RowData rowData) {
        System.out.println("UpdateListenPoint");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }

    /**
     * 刪除数据监听
     *
     * @param eventType
     */
    @DeleteListenPoint
    public void onEvent3(CanalEntry.EventType eventType) {
        System.out.println("DeleteListenPoint");
    }

    /**
     * 自定义数据修改监听
     *
     * @param eventType
     * @param rowData
     */
    @ListenPoint(destination = "example", schema = "canal_test", table = {"t_user_1", "t_user_2"}, eventType = CanalEntry.EventType.INSERT)
    public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("CustomListenPoint");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

启动类创建

创建启动类,代码如下:

package com.example.canatest;

import com.xpand.starter.canal.annotation.EnableCanalClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableCanalClient
public class CanaTestApplication {

	public static void main(String[] args) {

		SpringApplication.run(CanaTestApplication.class, args);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

测试

启动canal微服务,然后修改任意数据库的表数据,canal微服务后台输出信息。

1、插入数据

在这里插入图片描述

在这里插入图片描述

2、修改数据

在这里插入图片描述

在这里插入图片描述

3、删除数据

在这里插入图片描述

在这里插入图片描述

4、自定义插入

在这里插入图片描述

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/801338
推荐阅读
  

闽ICP备14008679号