当前位置:   article > 正文

第十五章_Redis与MySQL数据双写一致性工程落地案例_canal 实现mysql和redis双写一致

canal 实现mysql和redis双写一致

复习+面试题

采用双检加锁策略 

多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它

其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。

后面的线程进来发现已经有缓存了,就直接走缓存。 

canal

是什么

canal [kə'næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;

历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;

官网地址

一句话

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL​数据库增量日志解析,提供增量数据订阅和消费

能干嘛

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

去哪下

官网下载地址

工作原理,面试回答

传统MySQL主从复制工作原理 

MySQL的主从复制将经过如下步骤:

1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;

2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,

如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;

3、同时 master 主服务器为每个 I/O Thread 启动一个dump  Thread,用于向其发送二进制事件日志;

4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;

5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;

6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;

canal工作原理

mysql-canal-redis双写一致性Coding

java案例,来源出处

mysql

1 查看mysql版本

SELECT VERSION();

mysql5.7.28

2 当前的主机二进制日志

show master status;

3 查看SHOW VARIABLES LIKE 'log_bin';

4 开启 MySQL的binlog写入功能

D:\devSoft\mysql\mysql5.7.28(自己安装目录)目录下打开 

最好提前备份

my.ini

mysql

log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1    #配置MySQL replaction需要定义,不要和canal的 slaveId重复
ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。
STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

window        my.ini

linux             my.cnf

5 重启mysql

再次查看SHOW VARIABLES LIKE 'log_bin';

6 授权canal连接MySQL账号

mysql默认的用户在mysql库的user表里

SELECT * FROM mysql.`user`;

默认没有canal账户,此处新建+授权

DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';  
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';  
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;

canal服务端 

1.下载

GitHub地址

下载Linux版本:canal.deployer-1.1.6.tar.gz

注意发布时间+版本,2022.8.11后发布的才用

2.解压

解压后整体放入/mycanal路径下

3.配置

修改/mycanal/conf/example路径下instance.properties文件

instance.properties

换成自己的mysql主机master的IP地址 

换成自己的在mysql新建的canal账户

4. 启动

/opt/mycanal/bin路径下执行./startup.sh

5.查看

判断canal是否启动成功

查看 server 日志

查看 样例example 的日志

canal客户端(Java编写业务程序) 

SQL脚本

1 随便选个数据库,以你自己为主,本例bigdata,按照下面建表

CREATE TABLE `t_user` (

  `id` bigint(20) NOT NULL AUTO_INCREMENT,

  `userName` varchar(100) NOT NULL,

  PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;

2 建module

canal_demo02

3 改POM

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.atguigu.canal</groupId>
  6. <artifactId>canal_demo02</artifactId>
  7. <version>1.0-SNAPSHOT</version>
  8. <parent>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-parent</artifactId>
  11. <version>2.5.14</version>
  12. <relativePath/>
  13. </parent>
  14. <properties>
  15. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  16. <maven.compiler.source>1.8</maven.compiler.source>
  17. <maven.compiler.target>1.8</maven.compiler.target>
  18. <junit.version>4.12</junit.version>
  19. <log4j.version>1.2.17</log4j.version>
  20. <lombok.version>1.16.18</lombok.version>
  21. <mysql.version>5.1.47</mysql.version>
  22. <druid.version>1.1.16</druid.version>
  23. <mapper.version>4.1.5</mapper.version>
  24. <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
  25. </properties>
  26. <dependencies>
  27. <!--canal-->
  28. <dependency>
  29. <groupId>com.alibaba.otter</groupId>
  30. <artifactId>canal.client</artifactId>
  31. <version>1.1.0</version>
  32. </dependency>
  33. <!--SpringBoot通用依赖模块-->
  34. <dependency>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-starter-web</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.boot</groupId>
  40. <artifactId>spring-boot-starter-actuator</artifactId>
  41. </dependency>
  42. <!--swagger2-->
  43. <dependency>
  44. <groupId>io.springfox</groupId>
  45. <artifactId>springfox-swagger2</artifactId>
  46. <version>2.9.2</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>io.springfox</groupId>
  50. <artifactId>springfox-swagger-ui</artifactId>
  51. <version>2.9.2</version>
  52. </dependency>
  53. <!--SpringBoot与Redis整合依赖-->
  54. <dependency>
  55. <groupId>org.springframework.boot</groupId>
  56. <artifactId>spring-boot-starter-data-redis</artifactId>
  57. </dependency>
  58. <dependency>
  59. <groupId>org.apache.commons</groupId>
  60. <artifactId>commons-pool2</artifactId>
  61. </dependency>
  62. <!--SpringBoot与AOP-->
  63. <dependency>
  64. <groupId>org.springframework.boot</groupId>
  65. <artifactId>spring-boot-starter-aop</artifactId>
  66. </dependency>
  67. <dependency>
  68. <groupId>org.aspectj</groupId>
  69. <artifactId>aspectjweaver</artifactId>
  70. </dependency>
  71. <!--Mysql数据库驱动-->
  72. <dependency>
  73. <groupId>mysql</groupId>
  74. <artifactId>mysql-connector-java</artifactId>
  75. <version>5.1.47</version>
  76. </dependency>
  77. <!--SpringBoot集成druid连接池-->
  78. <dependency>
  79. <groupId>com.alibaba</groupId>
  80. <artifactId>druid-spring-boot-starter</artifactId>
  81. <version>1.1.10</version>
  82. </dependency>
  83. <dependency>
  84. <groupId>com.alibaba</groupId>
  85. <artifactId>druid</artifactId>
  86. <version>${druid.version}</version>
  87. </dependency>
  88. <!--mybatis和springboot整合-->
  89. <dependency>
  90. <groupId>org.mybatis.spring.boot</groupId>
  91. <artifactId>mybatis-spring-boot-starter</artifactId>
  92. <version>${mybatis.spring.boot.version}</version>
  93. </dependency>
  94. <!--通用基础配置junit/devtools/test/log4j/lombok/hutool-->
  95. <!--hutool-->
  96. <dependency>
  97. <groupId>cn.hutool</groupId>
  98. <artifactId>hutool-all</artifactId>
  99. <version>5.2.3</version>
  100. </dependency>
  101. <dependency>
  102. <groupId>junit</groupId>
  103. <artifactId>junit</artifactId>
  104. <version>${junit.version}</version>
  105. </dependency>
  106. <dependency>
  107. <groupId>org.springframework.boot</groupId>
  108. <artifactId>spring-boot-starter-test</artifactId>
  109. <scope>test</scope>
  110. </dependency>
  111. <dependency>
  112. <groupId>log4j</groupId>
  113. <artifactId>log4j</artifactId>
  114. <version>${log4j.version}</version>
  115. </dependency>
  116. <dependency>
  117. <groupId>org.projectlombok</groupId>
  118. <artifactId>lombok</artifactId>
  119. <version>${lombok.version}</version>
  120. <optional>true</optional>
  121. </dependency>
  122. <!--persistence-->
  123. <dependency>
  124. <groupId>javax.persistence</groupId>
  125. <artifactId>persistence-api</artifactId>
  126. <version>1.0.2</version>
  127. </dependency>
  128. <!--通用Mapper-->
  129. <dependency>
  130. <groupId>tk.mybatis</groupId>
  131. <artifactId>mapper</artifactId>
  132. <version>${mapper.version}</version>
  133. </dependency>
  134. <dependency>
  135. <groupId>org.springframework.boot</groupId>
  136. <artifactId>spring-boot-autoconfigure</artifactId>
  137. </dependency>
  138. <dependency>
  139. <groupId>redis.clients</groupId>
  140. <artifactId>jedis</artifactId>
  141. <version>3.8.0</version>
  142. </dependency>
  143. </dependencies>
  144. <build>
  145. <plugins>
  146. <plugin>
  147. <groupId>org.springframework.boot</groupId>
  148. <artifactId>spring-boot-maven-plugin</artifactId>
  149. </plugin>
  150. </plugins>
  151. </build>
  152. </project>

4 写YML

  1. server.port=5555
  2. # ========================alibaba.druid=====================
  3. spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
  4. spring.datasource.driver-class-name=com.mysql.jdbc.Driver
  5. spring.datasource.url=jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false
  6. spring.datasource.username=root
  7. spring.datasource.password=123456
  8. spring.datasource.druid.test-while-idle=false

5 主启动

  1. package com.test.canal;
  2. import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. /**
  6. * @auther admin
  7. */
  8. @SpringBootApplication
  9. public class CanalDemo02App {
  10. //本例不要启动CanalDemo02App实例
  11. }

6 业务类

RedisUtils

  1. package com.atguigu.canal.util;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPool;
  4. import redis.clients.jedis.JedisPoolConfig;
  5. /**
  6. * @auther admin
  7. */
  8. public class RedisUtils {
  9. public static final String REDIS_IP_ADDR = "192.168.111.185";
  10. public static final String REDIS_PWD = "111111";
  11. public static JedisPool jedisPool;
  12. static {
  13. JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
  14. jedisPoolConfig.setMaxTotal(20);
  15. jedisPoolConfig.setMaxIdle(10);
  16. jedisPool=new JedisPool(jedisPoolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_PWD);
  17. }
  18. public static Jedis getJedis() throws Exception {
  19. if(jedisPool != null){
  20. return jedisPool.getResource();
  21. }
  22. throw new Exception("Jedispool is not ok");
  23. }
  24. }

RedisCanalClientExample

  1. package com.test.canal.biz;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.otter.canal.client.CanalConnector;
  4. import com.alibaba.otter.canal.client.CanalConnectors;
  5. import com.alibaba.otter.canal.protocol.CanalEntry.*;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import com.test.canal.util.RedisUtils;
  8. import redis.clients.jedis.Jedis;
  9. import java.net.InetSocketAddress;
  10. import java.util.List;
  11. import java.util.UUID;
  12. import java.util.concurrent.TimeUnit;
  13. /**
  14. * @auther admin
  15. */
  16. public class RedisCanalClientExample {
  17. public static final Integer _60SECONDS = 60;
  18. public static final String REDIS_IP_ADDR = "192.168.111.185";
  19. private static void redisInsert(List<Column> columns) {
  20. JSONObject jsonObject = new JSONObject();
  21. for (Column column : columns) {
  22. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  23. jsonObject.put(column.getName(),column.getValue());
  24. }
  25. if(columns.size() > 0) {
  26. try(Jedis jedis = RedisUtils.getJedis()) {
  27. jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
  28. }catch (Exception e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. private static void redisDelete(List<Column> columns) {
  34. JSONObject jsonObject = new JSONObject();
  35. for (Column column : columns) {
  36. jsonObject.put(column.getName(), column.getValue());
  37. }
  38. if(columns.size() > 0) {
  39. try(Jedis jedis = RedisUtils.getJedis()) {
  40. jedis.del(columns.get(0).getValue());
  41. }catch (Exception e){
  42. e.printStackTrace();
  43. }
  44. }
  45. }
  46. private static void redisUpdate(List<Column> columns) {
  47. JSONObject jsonObject = new JSONObject();
  48. for (Column column : columns) {
  49. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  50. jsonObject.put(column.getName(), column.getValue());
  51. }
  52. if(columns.size() > 0) {
  53. try(Jedis jedis = RedisUtils.getJedis()) {
  54. jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
  55. System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
  56. }catch (Exception e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. }
  61. public static void printEntry(List<Entry> entrys) {
  62. for (Entry entry : entrys) {
  63. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  64. continue;
  65. }
  66. RowChange rowChage = null;
  67. try {
  68. //获取变更的row数据
  69. rowChage = RowChange.parseFrom(entry.getStoreValue());
  70. } catch (Exception e) {
  71. throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
  72. }
  73. //获取变动类型
  74. EventType eventType = rowChage.getEventType();
  75. System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
  76. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  77. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
  78. for (RowData rowData : rowChage.getRowDatasList()) {
  79. if (eventType == EventType.INSERT) {
  80. redisInsert(rowData.getAfterColumnsList());
  81. } else if (eventType == EventType.DELETE) {
  82. redisDelete(rowData.getBeforeColumnsList());
  83. } else {//EventType.UPDATE
  84. redisUpdate(rowData.getAfterColumnsList());
  85. }
  86. }
  87. }
  88. }
  89. public static void main(String[] args) {
  90. System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
  91. //=================================
  92. // 创建链接canal服务端
  93. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
  94. 11111), "example", "", "");
  95. int batchSize = 1000;
  96. //空闲空转计数器
  97. int emptyCount = 0;
  98. System.out.println("---------------------canal init OK,开始监听mysql变化------");
  99. try {
  100. connector.connect();
  101. //connector.subscribe(".*\\..*");
  102. connector.subscribe("bigdata.t_user");
  103. connector.rollback();
  104. int totalEmptyCount = 10 * _60SECONDS;
  105. while (emptyCount < totalEmptyCount) {
  106. System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
  107. // 获取指定数量的数据
  108. Message message = connector.getWithoutAck(batchSize);
  109. long batchId = message.getId();
  110. int size = message.getEntries().size();
  111. if (batchId == -1 || size == 0) {
  112. emptyCount++;
  113. try {
  114. TimeUnit.SECONDS.sleep(1);
  115. } catch (InterruptedException e) {
  116. e.printStackTrace();
  117. }
  118. } else {
  119. //计数器重新置零
  120. emptyCount = 0;
  121. printEntry(message.getEntries());
  122. }
  123. // 提交确认
  124. connector.ack(batchId);
  125. // 处理失败, 回滚数据
  126. // connector.rollback(batchId);
  127. }
  128. System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
  129. } finally {
  130. connector.disconnect();
  131. }
  132. }
  133. }

题外话

java程序下connector.subscribe配置的过滤正则

关闭资源代码简写

try-with-resources释放资源

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/165199
推荐阅读
相关标签
  

闽ICP备14008679号