当前位置:   article > 正文

(高阶)Redis 7 第13讲 数据双写一致性 canal篇

(高阶)Redis 7 第13讲 数据双写一致性 canal篇

 面试题

问题答案
如何保证mysql改动后,立即同步到Rediscanal

 

简介

https://github.com/alibaba/canal/wikiicon-default.png?t=N7T8https://github.com/alibaba/canal/wiki

 基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 

 业务

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

下载

官网https://github.com/alibaba/canal/releases/tag/canal-1.1.6icon-default.png?t=N7T8https://github.com/alibaba/canal/releases/tag/canal-1.1.6
百度网盘链接:https://pan.baidu.com/s/1Hs7JieAZA_q4lmvIdJZgFw?pwd=aqi2 
提取码:aqi2 

 Mysql 主从复制原理

Canal原理

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  3. canal 解析 binary log 对象(原始为 byte 流)

 案例

mysql 环境 

  1. # 查看mysql 版本
  2. SELECT VERSION();
  3. # 查看当前主机的二进制日志
  4. SHOW MASTER status;
  5. # 查看binlog 开启状态
  6. SHOW VARIABLES LIKE 'log_bin'

 

 my.ini 配置

  1. # 在mysqld中加入一下内容
  2. [mysqld]
  3. log-bin=mysql-bin #开启 binlog
  4. binlog-format=ROW #选择 ROW 模式
  5. server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

 

 

 重启Mysql

 

创建canal用户并授权

  1. DROP USER IF EXISTS 'canal'@'%';
  2. CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
  3. GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
  4. FLUSH PRIVILEGES;
  5. SELECT * FROM mysql.`user`

canal安装配置

上传安装包

解压安装包

tar -zxvf canal.deployer-1.1.6.tar.gz

 配置文件地址

修改配置

启动canal

 

启动成功

 如果出现如下错误

Caused by: java.io.IOException: caching_sha2_password Auth failed
com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:260)

 

  1. # 修改加密方式
  2. select host,user,plugin from mysql.user ;
  3. ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';

创建测试表

  1. CREATE TABLE `t_user` (
  2. `id` int NOT NULL AUTO_INCREMENT,
  3. `userName` varchar(255) NOT NULL,
  4. PRIMARY KEY (`id`)
  5. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

 创建Redis工具类

  1. package com.mco.utils;
  2. import cn.hutool.core.util.RandomUtil;
  3. import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import redis.clients.jedis.GeoCoordinate;
  7. import redis.clients.jedis.Jedis;
  8. import redis.clients.jedis.JedisPool;
  9. import redis.clients.jedis.JedisPoolConfig;
  10. import redis.clients.jedis.args.GeoUnit;
  11. import java.time.Duration;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. import java.util.Objects;
  15. /**
  16. * @author :liao.wei
  17. * @date :2023/9/18 21:15
  18. * @package : com.mco.utils
  19. */
  20. public class RedisUtils {
  21. private static Logger logger = LoggerFactory.getLogger(JedisPoolUtil.class);
  22. public static final String REDIS_IP_ADDR = "120.77.64.190";
  23. public static final String REDIS_PWD = "111111";
  24. public static JedisPool jedisPool;
  25. static {
  26. JedisPoolConfig poolConfig = new JedisPoolConfig();
  27. poolConfig.setMaxIdle(8);
  28. poolConfig.setMinIdle(2);
  29. poolConfig.setMaxWait(Duration.ofSeconds(30000));
  30. jedisPool = new JedisPool(poolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_PWD);
  31. }
  32. public static Jedis getJedis() throws Exception {
  33. if (null != jedisPool) {
  34. return jedisPool.getResource();
  35. }
  36. throw new Exception("Jedispool is not ok");
  37. }
  38. }

Canal 业务类

  1. public class RedisCanalClient {
  2. public static final Integer _60SECONDS = 60;
  3. public static final String CANAL_IP_ADDR = "192.168.1.11";
  4. private static void redisInsert(List<Column> columns)
  5. {
  6. JSONObject jsonObject = new JSONObject();
  7. for (Column column : columns)
  8. {
  9. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  10. jsonObject.put(column.getName(),column.getValue());
  11. }
  12. if(columns.size() > 0)
  13. {
  14. try(Jedis jedis = RedisUtils.getJedis())
  15. {
  16. jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
  17. }catch (Exception e){
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. private static void redisDelete(List<Column> columns)
  23. {
  24. JSONObject jsonObject = new JSONObject();
  25. for (Column column : columns)
  26. {
  27. jsonObject.put(column.getName(),column.getValue());
  28. }
  29. if(columns.size() > 0)
  30. {
  31. try(Jedis jedis = RedisUtils.getJedis())
  32. {
  33. jedis.del(columns.get(0).getValue());
  34. }catch (Exception e){
  35. e.printStackTrace();
  36. }
  37. }
  38. }
  39. private static void redisUpdate(List<Column> columns)
  40. {
  41. JSONObject jsonObject = new JSONObject();
  42. for (Column column : columns)
  43. {
  44. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  45. jsonObject.put(column.getName(),column.getValue());
  46. }
  47. if(columns.size() > 0)
  48. {
  49. try(Jedis jedis = RedisUtils.getJedis())
  50. {
  51. jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
  52. System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
  53. }catch (Exception e){
  54. e.printStackTrace();
  55. }
  56. }
  57. }
  58. public static void printEntry(List<Entry> entrys)
  59. {
  60. for (Entry entry : entrys) {
  61. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  62. continue;
  63. }
  64. RowChange rowChage = null;
  65. try {
  66. //获取变更的row数据
  67. rowChage = RowChange.parseFrom(entry.getStoreValue());
  68. } catch (Exception e) {
  69. throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
  70. }
  71. //获取变动类型
  72. EventType eventType = rowChage.getEventType();
  73. System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
  74. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  75. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
  76. for (RowData rowData : rowChage.getRowDatasList()) {
  77. if (eventType == EventType.INSERT) {
  78. redisInsert(rowData.getAfterColumnsList());
  79. } else if (eventType == EventType.DELETE) {
  80. redisDelete(rowData.getBeforeColumnsList());
  81. } else {//EventType.UPDATE
  82. redisUpdate(rowData.getAfterColumnsList());
  83. }
  84. }
  85. }
  86. }
  87. public static void main(String[] args)
  88. {
  89. System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
  90. //=================================
  91. // 创建链接canal服务端
  92. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_IP_ADDR,
  93. 11111), "example", "", ""); // 这里用户名和密码如果在这写了,会覆盖canal配置文件的账号密码,如果不填从配置文件中读
  94. int batchSize = 1000;
  95. //空闲空转计数器
  96. int emptyCount = 0;
  97. System.out.println("---------------------canal init OK,开始监听mysql变化------");
  98. try {
  99. connector.connect();
  100. //connector.subscribe(".*\\..*");
  101. connector.subscribe("test.t_user"); // 设置监听哪个表
  102. connector.rollback();
  103. int totalEmptyCount = 10 * _60SECONDS;
  104. while (emptyCount < totalEmptyCount) {
  105. System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
  106. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  107. long batchId = message.getId();
  108. int size = message.getEntries().size();
  109. if (batchId == -1 || size == 0) {
  110. emptyCount++;
  111. try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
  112. } else {
  113. //计数器重新置零
  114. emptyCount = 0;
  115. printEntry(message.getEntries());
  116. }
  117. connector.ack(batchId); // 提交确认
  118. // connector.rollback(batchId); // 处理失败, 回滚数据
  119. }
  120. System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
  121. } finally {
  122. connector.disconnect();
  123. }
  124. }
  125. }

 说明:

        

CANAL_IP_ADDR:canal 服务部署ip

InetSocketAddress: 端口可从canal.log 中查看

启动main方法

数据库中新增一条数据

 查看Canal客户端监听

 查看Redis数据

POM引入

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <maven.compiler.source>1.8</maven.compiler.source>
  4. <maven.compiler.target>1.8</maven.compiler.target>
  5. <junit.version>4.12</junit.version>
  6. <log4j.version>1.2.17</log4j.version>
  7. <lombok.version>1.16.18</lombok.version>
  8. <mysql.version>5.1.47</mysql.version>
  9. <druid.version>1.1.16</druid.version>
  10. <mapper.version>4.1.5</mapper.version>
  11. <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>redis.clients</groupId>
  16. <artifactId>jedis</artifactId>
  17. <version>4.3.1</version>
  18. </dependency>
  19. <!--canal-->
  20. <dependency>
  21. <groupId>com.alibaba.otter</groupId>
  22. <artifactId>canal.client</artifactId>
  23. <version>1.1.0</version>
  24. </dependency>
  25. <!--SpringBoot通用依赖模块-->
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-web</artifactId>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.springframework.boot</groupId>
  32. <artifactId>spring-boot-starter-actuator</artifactId>
  33. </dependency>
  34. <!--SpringBoot与Redis整合依赖-->
  35. <dependency>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-starter-data-redis</artifactId>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.apache.commons</groupId>
  41. <artifactId>commons-pool2</artifactId>
  42. </dependency>
  43. <!--SpringBoot与AOP-->
  44. <dependency>
  45. <groupId>org.springframework.boot</groupId>
  46. <artifactId>spring-boot-starter-aop</artifactId>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.aspectj</groupId>
  50. <artifactId>aspectjweaver</artifactId>
  51. </dependency>
  52. <!--Mysql数据库驱动-->
  53. <dependency>
  54. <groupId>mysql</groupId>
  55. <artifactId>mysql-connector-java</artifactId>
  56. <version>5.1.47</version>
  57. </dependency>
  58. <!--SpringBoot集成druid连接池-->
  59. <dependency>
  60. <groupId>com.alibaba</groupId>
  61. <artifactId>druid-spring-boot-starter</artifactId>
  62. <version>1.1.10</version>
  63. </dependency>
  64. <dependency>
  65. <groupId>com.alibaba</groupId>
  66. <artifactId>druid</artifactId>
  67. <version>${druid.version}</version>
  68. </dependency>
  69. <!--mybatis和springboot整合-->
  70. <dependency>
  71. <groupId>org.mybatis.spring.boot</groupId>
  72. <artifactId>mybatis-spring-boot-starter</artifactId>
  73. <version>${mybatis.spring.boot.version}</version>
  74. </dependency>
  75. <!--hutool-->
  76. <dependency>
  77. <groupId>cn.hutool</groupId>
  78. <artifactId>hutool-all</artifactId>
  79. <version>5.2.3</version>
  80. </dependency>
  81. <dependency>
  82. <groupId>junit</groupId>
  83. <artifactId>junit</artifactId>
  84. <version>${junit.version}</version>
  85. </dependency>
  86. <dependency>
  87. <groupId>org.springframework.boot</groupId>
  88. <artifactId>spring-boot-starter-test</artifactId>
  89. <scope>test</scope>
  90. </dependency>
  91. <dependency>
  92. <groupId>log4j</groupId>
  93. <artifactId>log4j</artifactId>
  94. <version>${log4j.version}</version>
  95. </dependency>
  96. <dependency>
  97. <groupId>org.projectlombok</groupId>
  98. <artifactId>lombok</artifactId>
  99. <version>${lombok.version}</version>
  100. <optional>true</optional>
  101. </dependency>
  102. <!--persistence-->
  103. <dependency>
  104. <groupId>javax.persistence</groupId>
  105. <artifactId>persistence-api</artifactId>
  106. <version>1.0.2</version>
  107. </dependency>
  108. <!--通用Mapper-->
  109. <dependency>
  110. <groupId>tk.mybatis</groupId>
  111. <artifactId>mapper</artifactId>
  112. <version>${mapper.version}</version>
  113. </dependency>
  114. <dependency>
  115. <groupId>org.springframework.boot</groupId>
  116. <artifactId>spring-boot-autoconfigure</artifactId>
  117. </dependency>
  118. <dependency>
  119. <groupId>org.apache.commons</groupId>
  120. <artifactId>commons-pool2</artifactId>
  121. <version>2.11.1</version>
  122. </dependency>
  123. <dependency>
  124. <groupId>com.baomidou</groupId>
  125. <artifactId>mybatis-plus-boot-starter</artifactId>
  126. <version>3.4.1</version>
  127. </dependency>
  128. </dependencies>

connector.subscribe过滤规则

源码地址icon-default.png?t=N7T8https://gitee.com/UniQue006/redis_example.git 

  1. 声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/165197?site
    推荐阅读
    相关标签