赞
踩
假设一个场景:编写一个博客系统,需要引入elasticsearch搜索引擎实现对文章内容的检索。则需要解决MySQL与elasticsearch数据同步的问题。
此时我们有三种选择:
在业务层执行增加、修改、删除改变mysql数据库之后,也执行操作redis的逻辑代码。
优点:操作简单
缺点:与业务操作代码耦合度变高;执行效率低。
在执行完增加、修改、删除之后, 往MQ中发送一条消息 ;同步程序作为MQ中的消费者,从消息队列中获取消息,然后执行同步elasticsearch数据库的逻辑。
优点:业务代码解耦, 并且可以做到准实时。
缺点:需要在业务代码中加入发送消息到MQ中的代码 , API耦合。
binglog实现同步的方法再细分不止一种,这个笔记主要学习canal,所以以canal为例。而且canal不止可以将数据同步给redis,也可以同步给其他类型的数据库。
优点:与业务代码完全解耦,API完全解耦,可以做到准实时。
缺点:canal是第三方实现的,需要学习成本(学无止尽,技多不压身)。
本章我们学习第三种学习思路,仅实现canal和mysql的数据同步。
create user canal identified by 'canal';
grant select,replication slave, replication client on *.* to 'canal'@'%';
flush privileges;
# 查看bin-log是否开启 on: 开启 off: 关闭
show variables like 'log_bin';
编写my.conf, 挂载容器中的mysql配置文件
[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
character-set-server=utf8
collation-server=utf8_unicode_ci
skip-character-set-client-handshake
skip-name-resolve
# start binlog
log-bin=mysql-bin
binlog-format=ROW
server_id=1
docker pull canal/canal-server:v1.1.5
# 创建一个容器
docker run --name canal -d canal/canal-server:v1.1.5
# 复制容器中的配置文件到本地
docker cp canal:/home/admin/canal-server/conf/canal.properties /wuming/canal
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /wuming/canal
查看容器mysql-8的ip
docker inspect mysql # 这里假设此处ip是 177.17.0.1
修改canal.properties配置文件
# 默认端口 11111
# 默认输出model为tcp, 这里根据使用的mq类型进行修改
# tcp, kafka, RocketMQ
canal.serverMode = tcp
#################################################
######### destinations #############
#################################################
# canal可以有多个instance,每个实例有独立的配置文件,默认只 有一个example实例。
# 如果需要处理多个mysql数据的话,可以复制出多个example,对其重新命名,
# 命令和配置文件中指定的名称一致。然后修改canal.properties 中的 canal.destinations
# canal.destinations=实例 1,实例 2,实例 3
canal.destinations = example
修改instance.properties配置文件
# 不能和mysql重复 canal.instance.mysql.slaveId=2 # 使用mysql的虚拟ip和端口 canal.instance.master.address=177.17.0.1:3306 # 使用已创建的canal用户 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # canal.instance.defaultDatabaseName =test # 问题:(原本这样的,值同步test库,此处没能解决,单据指定数据库同步配置) # canal.instance.filter.regex=.*\\..* # canal.instance.defaultDatabaseName =test # 注掉上面,然后添加,同步所有的库。 # .\*\\\\..\*: 表示匹配所有的库所有的表 canal.instance.filter.regex =.\*\\\\..\* # 目的地,可以认识一个消息队列,不需要更改。 canal.mq.topic=example # 如果是
docker run --name canal -p 11111:11111 \
-v /wuming/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-v /wuming/canal/canal.properties:/home/admin/canal-server/conf/canal.properties \
-d canal/canal-server:v1.1.5
引入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
程序连接
@Slf4j public class CanalTest { //Canal服务地址 使用自己虚拟机的ip地址 private static final String SERVER_ADDRESS = "127.0.0.1"; //Canal Server 服务端口号 private static final Integer PORT = 11111; //目的地,其实Canal Service内部有一个队列,和配置文件中一致即可,参考【修改instance.properties】图中 private static final String DESTINATION = "example"; //用户名和密码,但是目前不支持,只能为空 private static final String USERNAME = ""; //用户名和密码,但是目前不支持,只能为空 private static final String PASSWORD= ""; public static void main(String[] args){ CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD); canalConnector.connect(); //订阅所有消息 canalConnector.subscribe(".*\\..*"); // 只订阅test数据库下的所有表 //canalConnector.subscribe("test"); //恢复到之前同步的那个位置 canalConnector.rollback(); for(;;){ //获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少 Message message = canalConnector.getWithoutAck(100); //获取消息id long batchId = message.getId(); if(batchId != -1){ log.info("msgId -> " + batchId); printEnity(message.getEntries()); //提交确认 //canalConnector.ack(batchId); //处理失败,回滚数据 //canalConnector.rollback(batchId); } } } private static void printEnity(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { if(entry.getEntryType() != CanalEntry.EntryType.ROWDATA){ continue; } try{ // 序列化数据 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { System.out.println(rowChange.getEventType()); switch (rowChange.getEventType()){ //如果希望监听多种事件,可以手动增加case case INSERT: // 表名 String tableName = entry.getHeader().getTableName(); //System.out.println("表名:"+tableName); //测试users表进行映射处 List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); //for(CanalEntry.Column c:afterColumnsList){ // System.out.println("字段:"+c.getName()+"值:"+c.getValue()); //} System.out.println(afterColumnsList); break; case UPDATE: List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList(); System.out.println("新插入的数据是:" + afterColumnsList2); break; case DELETE: List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); System.out.println("被删除的数据是:" + beforeColumnsList); break; default: } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } } }
执行程序并,使用navicat操作数据,查看程序是否读取bin-log。
6.1、canal是什么
Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。
6.2、Canal 的工作原理
很简单,就是把自己伪装成 Slave,假装从 Master 复制数据。
1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
# 查看binlog文件列表
show binary logs;
# 查看当前正在写入的binlog文件
show master status;
# 查看指定binlog文件的内容
show binlog events [in 'log_name'] [FROM pos] [limit [offset,] row_count]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。