赞
踩
贴个官方网址:阿里巴巴MySQL binlog 增量订阅&消费组件
架构图:
基于日志增量订阅和消费的业务包括
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
以上资料来源官网
首先需要安装MySQL数据库,目前笔者使用的MySQL版本是最新的8.0.33,这个过程就不在赘述了
在Windows下,一般在ProgramData/MySQL文件夹中就能找到配置文件"mysql.ini",打开,搜索修改或新建以下选项:
# 指定服务的id,这个要与canal中的区分开,因为每个服务节点的id都要不一样
server-id=1
# 生成的binlog文件的前缀名称,Windows下binlog文件一般存在ProgramData/MySQL/Data文件夹中
log-bin="NS9052929-bin"
# binlog日志的记录方式:row、statement、mixed
binlog_format=row
# 需要记录binlog的数据库,使用逗号分割可以指定多个,如不配置则是所有
binlog-do-db=canal-demo
然后重启MySQL数据库服务,Windows在服务窗口中就可以重启
新建一个架构(数据库):canal-demo
CREATE DATABASE `canal-demo`
/* DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */
向其中添加一张表
-- auto-generated definition
create table user
(
id int auto_increment comment '用户id'
primary key,
username varchar(100) not null comment '用户名',
age int default -1 not null comment '用户年龄'
);
在github中的release中下载压缩包:Releases-alibaba/canal
我下载了最新版本
把它解压到文件中,修改两个配置文件:
修改cana.properties
,将其中的配置项修改:
# 这个要与上面图片中的文件夹名对应起来,其实是对应多个实例
# 如果要新建实例,复制一个example,改名字,并修改其中的配置文件即可
canal.destinations = example
# 设置服务端口,默认为11111
canal.port = 11111
# 设置服务模式,因为下面是对接Java,因此使用TCP
canal.serverMode = tcp
# 设置数据库的连接账号以及密码
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
修改实例中的配置文件:instance.properties
# 设置数据库路径
canal.instance.master.address=127.0.0.1:3306
# 设置数据库的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 服务id,需要与mysql中的区分开
canal.instance.mysql.slaveId=20
修改完成后,双击bin/startup.bat
,启动canal,看到下面页面则说明启动成功:
新建一个maven项目, 向pom.xml
文件中加入以下依赖:
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
编写一个客户端程序,连接canal:
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @Slf4j public class CanalClient { public static void main(String[] args) { // hostname, port, destination, username, password,username和password默认为空 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); try { connector.connect(); // 监听的表,格式为数据库.表名,数据库.表名 connector.subscribe("canal-demo.*"); // 不断循环获取 while (true) { Message message = connector.getWithoutAck(100); // 获取指定数量的数据 List<Entry> entries = message.getEntries(); // 如果没有数据的话就等待1秒 if (entries.isEmpty()) { log.info("没有数据,休息一下"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { // 有数据的话循环打印出数据 for (Entry entry : entries) { String tableName = entry.getHeader().getTableName(); log.info("表名:{}", tableName); // 判断Entry类型是否为ROW变换 EntryType entryType = entry.getEntryType(); if (EntryType.ROWDATA.equals(entryType)) { log.info("ROW变换"); // 序列化数据 ByteString storeValue = entry.getStoreValue(); // 反序列化数据 RowChange rowChange = RowChange.parseFrom(storeValue); // 获取事件类型 log.info("事件类型:{}", rowChange.getEventType()); // 获取具体数据 List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> beforeColumnsList = rowData.getBeforeColumnsList(); Map<String, Object> beforeMap = new HashMap<>(); for (Column column : beforeColumnsList) { beforeMap.put(column.getName(), column.getValue()); } log.info("变化前的数据:{}", beforeMap); List<Column> afterColumnsList = rowData.getAfterColumnsList(); Map<String, Object> afterMap = new HashMap<>(); for (Column column : afterColumnsList) { afterMap.put(column.getName(), column.getValue()); } log.info("变化后的数据:{}", afterMap); } } } } } } catch (Exception e) { e.printStackTrace(); } finally { // 最后关闭连接 connector.disconnect(); } } }
运行程序,insert、update、delete监控的数据库中的数据,就会看到控制台中有打印消息
新建一个Spring Boot
应用,向其中添加以下依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>4.0.0-rc-2</version> </dependency> </dependencies>
Spring Booot
版本是2.7.11
,使用的JDK8
依赖中的canal-spring-boot-starter
是其他开源者对canal在Spring Boot中的集成,测试不支持JDK17的版本,因此Spring Boot版本只能为3.0以下。
如果需要使用JDK17的话,也就是Spring Boot 3.0以上或者Spring 6的话,可以用另外一个开发者的包:behappy-canal,当然还有其他开发者也做了canal的集成,大家自己尝试下吧~
<dependency>
<groupId>io.github.behappy-project</groupId>
<artifactId>behappy-canal-spring-boot-starter</artifactId>
<version>3.0.2</version>
</dependency>
配置文件:
# 数据库连接信息
spring.datasource.url=jdbc:mysql://localhost:3306/canal-demo?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
# Spring 服务名称
spring.application.name=canal-spring-boot-demo
# canal的服务地址
canal.server=127.0.0.1:11111
# 需要监控的实例
canal.destination=example
# 关闭日志,不然一秒打印一个日志,浪费空间
logging.level.top.javatool.canal.client=OFF
首先新建一个实体类User
对应数据库中的字段
@Data
public class User {
private Integer id;
private String username;
private Integer age;
}
然后书写一个CanalHandler
实现接口EntryHandler<T>
,在里面对insert、update、delete这几种操作加入自己的处理
import com.example.springdemo.model.User; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import top.javatool.canal.client.annotation.CanalTable; import top.javatool.canal.client.handler.EntryHandler; @Component @CanalTable("user") @Slf4j public class CanalHandler implements EntryHandler<User> { @Override public void insert(User user) { log.info("插入用户:{}", user.toString()); } @Override public void update(User before, User after) { log.info("用户修改前:{}", before.toString()); log.info("用户修改后:{}", after.toString()); } @Override public void delete(User user) { log.info("删除用户:{}", user.toString()); } }
运行结果
通过Canal可以没有侵入的,即时的将数据库的改动同步到Redis、ElasticSearch或者其他数据存储库中,如果是在大数据方面需要数据聚合的话,推荐使用Flink CDC。目前Canal还有一个问题就是似乎不再维护了,但还是为我们提供了一个轻量化的数据迁移、同步工具。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。