赞
踩
项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。
那么这个时候问题来了,怎么把MySQL中增量数据同步到ES?
Java web应用性能分析之【高并发之缓存-多级缓存】_java多级缓存-CSDN博客
在前面文章中有提到这个场景,如何保证redis、EhCache、MySQL数据一致?
我们都知道作为数据库写操作,是不通过缓存的。假设商品服务实例 1 将 1 号商品价格调整为 80 元,这会衍生一个新问题:如何主动向应用程序推送数据变更的消息来保证它们也能同步更新缓存呢?
针对上面两种场景,可以看看canal的解决方案。
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。项目起源于阿里巴巴内部对于跨机房数据同步的需求,通过解析MySQL的二进制日志(Binary Log),Canal能够捕获并推送数据库的变更事件,满足了诸如数据库镜像、实时备份、索引实时维护等多种业务场景的需求。
GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
Home · alibaba/canal Wiki · GitHub
Canal当前支持MySQL数据库的多个版本,包括但不限于5.1.x、5.5.x、5.6.x、5.7.x及8.0.x,同时也兼容阿里云RDS等云数据库服务,为用户提供了广泛的数据库兼容性保障。
部分支持MySQL体系数据库:Mariadb 10.x、PolarDB-X
高性能与低延迟:Canal 1.1.x版本进行了深度优化,性能提升高达150%。
Prometheus监控:原生集成Prometheus监控,便于系统健康状况的跟踪。
消息系统集成:直接支持Kafka、RocketMQ消息投递,便于与大数据平台对接。
云数据库支持:无缝对接阿里云RDS,解决了自动主备切换及离线Binlog解析问题。
Docker部署:提供Docker镜像,简化部署流程。
WebUI管理:Canal-Admin工程引入WebUI,实现动态配置、任务管理与日志查看等功能。
MySQL master 将数据变更写入二进制日志(binary log), 日志中的记录叫做二进制日志事件(binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映到它自己的数据
Canal巧妙地模拟了MySQL主从复制的机制。具体而言:
优点: 可以完全和业务代码解耦,增量日志订阅。
缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。
对应这些软件包:
除了基础功能,Canal还支持丰富的进阶特性和周边生态工具,如:
Canal-Admin:提供Web界面管理Canal实例,实现配置、监控和运维的可视化操作。
canal2sql:一个工具项目,能根据Binlog生成SQL,便于数据迁移或备份。
Otter:Canal的消费端开源项目,用于数据同步与数据集成。
整个 HA 机制的控制主要是依赖了zookeeper的两个特性:watcher、EPHEMERAL节点。canal的 HA 机制实现分为两部分,canal server 和 canal client分别有对应的实现。
canal server实现流程如下:
为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。
名称 | 版本 |
---|---|
MySQL | 5.7 |
elasticsearch | 7.17.9 |
canal | 1.1.7 |
jdk | 1.8 |
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
- [mysqld]
- log-bin=mysql-bin # 开启 binlog
- binlog-format=ROW # 选择 ROW 模式
- server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
- CREATE USER canal IDENTIFIED BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
- FLUSH PRIVILEGES;
本次操作只需要下载admin、deployer两个包。
下载后解压即可用,当然,要运行起来的话,需要配置jdk1.8,这里就不再多说。
启动admin前,先完成建库和见表(包括admin的默认登录账号密码 admin/123456),否则启动报错。主要是如下六张表。
切记数据库和配置文件中的密码要一致。
分别启动admin和server
启动server:bin/startup.bat
启动admin:bin/startup.bat
登录admin:http://192.168.1.4:8089/#/login?redirect=%2Fdashboard
默认账号密码:admin/123456
引入依赖
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.4</version>
- </dependency>
客户端代码
- package com.zxx.study.base.canal;
-
-
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.alibaba.otter.canal.protocol.Message;
- import com.google.protobuf.ByteString;
- import com.google.protobuf.InvalidProtocolBufferException;
- import lombok.SneakyThrows;
-
- import java.net.InetSocketAddress;
- import java.util.List;
-
- /**
- * @author zhouxx
- * @create 2024-08-01 21:59
- */
- public class CanalClient {
-
- @SneakyThrows
- public static void main(String[] args) {
- try {
- // 创建canal客户端,单链接模式
- CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.4",
- 11111), "example", "", "");
- // 创建连接
- canalConnector.connect();
- while (true) {
- // 订阅数据库
- // canalConnector.subscribe("mall");
-
- // 获取数据
- Message message = canalConnector.get(100);
-
- // 获取Entry集合
- List<CanalEntry.Entry> entries = message.getEntries();
-
- // 判断集合是否为空,如果为空,则等待一会继续拉取数据
- if (entries.size() <= 0) {
- // System.out.println("当次抓取没有数据,休息一会。。。。。。");
- Thread.sleep(1000);
- } else {
- // 遍历entries,单条解析
- for (CanalEntry.Entry entry : entries) {
-
- //1.获取表名
- String tableName = entry.getHeader().getTableName();
-
- //2.获取类型
- CanalEntry.EntryType entryType = entry.getEntryType();
-
- //3.获取序列化后的数据
- ByteString storeValue = entry.getStoreValue();
-
- //4.判断当前entryType类型是否为ROWDATA
- if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
-
- //5.反序列化数据
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
-
- //6.获取当前事件的操作类型
- CanalEntry.EventType eventType = rowChange.getEventType();
-
- //7.获取数据集
- List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
-
- //8.遍历rowDataList,并打印数据集
- for (CanalEntry.RowData rowData : rowDataList) {
-
- JSONObject beforeData = new JSONObject();
- List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
- for (CanalEntry.Column column : beforeColumnsList) {
- beforeData.put(column.getName(), column.getValue());
- }
-
- JSONObject afterData = new JSONObject();
- List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
- for (CanalEntry.Column column : afterColumnsList) {
- afterData.put(column.getName(), column.getValue());
- }
-
- //数据打印
- System.out.println("Table:" + tableName +
- ",EventType:" + eventType +
- ",Before:" + beforeData +
- ",After:" + afterData);
- /**
- * Table:test_user,EventType:UPDATE,Before:{"name":"1111221","id":"1"},After:{"name":"11tom","id":"1"}
- * Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx","id":"17"}
- * Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx1","id":"18"}
- * */
- }
- }
- }
- }
- }
- }catch (Exception e){
- e.printStackTrace();
- }
-
- }
-
- }
运行效果:在数据库里面忝删改查,均可以在客户端中打印出来
参考:cancel框架同步mysql数据到kafka_mysql cancel-CSDN博客
参考:利用canal进行MySQL到ES的数据实时同步_canal es-CSDN博客
结合上图,至此,场景一和场景二中的问题,均可以解决。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。