当前位置:   article > 正文

redis高级 使用canal进行mysql和redis的双写一致应用篇

redis高级 使用canal进行mysql和redis的双写一致应用篇

前言

我们昨天谈论了对应的redis和mysql进行双写一致的理论篇

我们说了五种更新策略和查看的策略

更新策略可以使用

1.先更新数据库再更新redis  (高并发可能导致数据不一致)

2.先更新redis再更新数据库  (高并发可能导致数据不一致)

上述建议加上双检加锁策略来保证mysql的负载没那么高

3.停机更新  (业务允许可以使用)

4.先删除redis再更新数据库  ----延迟双删策略(但是业务时间大概率不好估量)

5.先更新数据库再删除redis      使用canal保证数据一致

--------------------

前面我们说了canal主要是通过二进制binlog的监听来对应进行一个增量的监控

下面我们来玩一玩对应的canal

这里的canal我们可以就当做一个从机,用来做对应的同步操作

下载以及前置工作

 下载地址 : Release v1.1.6 · alibaba/canal · GitHub

对mysql进行配置,新加一个canal用户

使用以下sql即可(注:mysql5和8不同)

  1. //mysql5.7 开通用户权限
  2. DROP USER IF EXISTS 'canal'@'%';
  3. CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
  4. GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
  5. FLUSH PRIVILEGES;
  6. SELECT * FROM mysql.user;

只要结果是这样就代表是ok的

下载之后我们放在linux 下的 /mycanal文件夹下

使用 tar -zxvf 进行对应的解压

之后我们需要修改redis下对应的配置文件  

我们只需要修改 /mycanal/conf/example/instance.properties

修改为对应的mysql的ip和端口即可     

然后我们需要配置对应的my.ini开启对应的binlog二进制文件

  1. log-bin=mysql-bin #开启 binlog
  2. binlog-format=ROW #选择 ROW 模式
  3. server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

然后我们重启mysql即可,查看binlog是否开启

开启即可

然后我们去对应的bin目录下开启对应的脚本

对应的stop.sh就是停止对应的应用

然后我们可以去对应日志文件查看是否启动成功

分别查看logs下面文件夹的canal.log和example.log即可

然后我们创建对应的数据库和测试数据表

  1. 先创建数据库
  2. 然后执行以下语句
  3. CREATE TABLE `t_user` (
  4. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  5. `userName` varchar(100) NOT NULL,
  6. PRIMARY KEY (`id`)
  7. ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

springboot整合

我们先创建一个项目

然后先配置对应的pom文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.atguigu.canal</groupId>
  6. <artifactId>canal_demo02</artifactId>
  7. <version>1.0-SNAPSHOT</version>
  8. <parent>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-parent</artifactId>
  11. <version>2.5.14</version>
  12. <relativePath/>
  13. </parent>
  14. <properties>
  15. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  16. <maven.compiler.source>1.8</maven.compiler.source>
  17. <maven.compiler.target>1.8</maven.compiler.target>
  18. <junit.version>4.12</junit.version>
  19. <log4j.version>1.2.17</log4j.version>
  20. <lombok.version>1.16.18</lombok.version>
  21. <mysql.version>5.1.47</mysql.version>
  22. <druid.version>1.1.16</druid.version>
  23. <mapper.version>4.1.5</mapper.version>
  24. <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
  25. </properties>
  26. <dependencies>
  27. <!--canal-->
  28. <dependency>
  29. <groupId>com.alibaba.otter</groupId>
  30. <artifactId>canal.client</artifactId>
  31. <version>1.1.0</version>
  32. </dependency>
  33. <!--SpringBoot通用依赖模块-->
  34. <dependency>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-starter-web</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.boot</groupId>
  40. <artifactId>spring-boot-starter-actuator</artifactId>
  41. </dependency>
  42. <!--swagger2-->
  43. <dependency>
  44. <groupId>io.springfox</groupId>
  45. <artifactId>springfox-swagger2</artifactId>
  46. <version>2.9.2</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>io.springfox</groupId>
  50. <artifactId>springfox-swagger-ui</artifactId>
  51. <version>2.9.2</version>
  52. </dependency>
  53. <!--SpringBoot与Redis整合依赖-->
  54. <dependency>
  55. <groupId>org.springframework.boot</groupId>
  56. <artifactId>spring-boot-starter-data-redis</artifactId>
  57. </dependency>
  58. <dependency>
  59. <groupId>org.apache.commons</groupId>
  60. <artifactId>commons-pool2</artifactId>
  61. </dependency>
  62. <!--SpringBoot与AOP-->
  63. <dependency>
  64. <groupId>org.springframework.boot</groupId>
  65. <artifactId>spring-boot-starter-aop</artifactId>
  66. </dependency>
  67. <dependency>
  68. <groupId>org.aspectj</groupId>
  69. <artifactId>aspectjweaver</artifactId>
  70. </dependency>
  71. <!--Mysql数据库驱动-->
  72. <dependency>
  73. <groupId>mysql</groupId>
  74. <artifactId>mysql-connector-java</artifactId>
  75. <version>5.1.47</version>
  76. </dependency>
  77. <!--SpringBoot集成druid连接池-->
  78. <dependency>
  79. <groupId>com.alibaba</groupId>
  80. <artifactId>druid-spring-boot-starter</artifactId>
  81. <version>1.1.10</version>
  82. </dependency>
  83. <dependency>
  84. <groupId>com.alibaba</groupId>
  85. <artifactId>druid</artifactId>
  86. <version>${druid.version}</version>
  87. </dependency>
  88. <!--mybatis和springboot整合-->
  89. <dependency>
  90. <groupId>org.mybatis.spring.boot</groupId>
  91. <artifactId>mybatis-spring-boot-starter</artifactId>
  92. <version>${mybatis.spring.boot.version}</version>
  93. </dependency>
  94. <!--通用基础配置junit/devtools/test/log4j/lombok/hutool-->
  95. <!--hutool-->
  96. <dependency>
  97. <groupId>cn.hutool</groupId>
  98. <artifactId>hutool-all</artifactId>
  99. <version>5.2.3</version>
  100. </dependency>
  101. <dependency>
  102. <groupId>junit</groupId>
  103. <artifactId>junit</artifactId>
  104. <version>${junit.version}</version>
  105. </dependency>
  106. <dependency>
  107. <groupId>org.springframework.boot</groupId>
  108. <artifactId>spring-boot-starter-test</artifactId>
  109. <scope>test</scope>
  110. </dependency>
  111. <dependency>
  112. <groupId>log4j</groupId>
  113. <artifactId>log4j</artifactId>
  114. <version>${log4j.version}</version>
  115. </dependency>
  116. <dependency>
  117. <groupId>org.projectlombok</groupId>
  118. <artifactId>lombok</artifactId>
  119. <version>${lombok.version}</version>
  120. <optional>true</optional>
  121. </dependency>
  122. <!--persistence-->
  123. <dependency>
  124. <groupId>javax.persistence</groupId>
  125. <artifactId>persistence-api</artifactId>
  126. <version>1.0.2</version>
  127. </dependency>
  128. <!--通用Mapper-->
  129. <dependency>
  130. <groupId>tk.mybatis</groupId>
  131. <artifactId>mapper</artifactId>
  132. <version>${mapper.version}</version>
  133. </dependency>
  134. <dependency>
  135. <groupId>org.springframework.boot</groupId>
  136. <artifactId>spring-boot-autoconfigure</artifactId>
  137. </dependency>
  138. <dependency>
  139. <groupId>redis.clients</groupId>
  140. <artifactId>jedis</artifactId>
  141. <version>3.8.0</version>
  142. </dependency>
  143. </dependencies>
  144. <build>
  145. <plugins>
  146. <plugin>
  147. <groupId>org.springframework.boot</groupId>
  148. <artifactId>spring-boot-maven-plugin</artifactId>
  149. </plugin>
  150. </plugins>
  151. </build>
  152. </project>

然后是对应的properties文件,记得按照自己的设置修改

记得这里修改数据库的名字以及密码

  1. server.port=5555
  2. # ========================alibaba.druid=====================
  3. spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
  4. spring.datasource.driver-class-name=com.mysql.jdbc.Driver
  5. spring.datasource.url=jdbc:mysql://localhost:13306/canal_test?useUnicode=true&characterEncoding=utf-8&useSSL=false
  6. spring.datasource.username=root
  7. spring.datasource.password=abc123
  8. spring.datasource.druid.test-while-idle=false

接着主启动类这里用不到,无需配置

下面写业务类

首先先搞连接池

注意修改对应的配置密码以及redis的IP地址

  1. public class RedisUtils
  2. {
  3. public static final String REDIS_IP_ADDR = "192.168.188.136";
  4. public static final String REDIS_pwd = "abc123";
  5. public static JedisPool jedisPool;
  6. static {
  7. JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
  8. jedisPoolConfig.setMaxTotal(20);
  9. jedisPoolConfig.setMaxIdle(10);
  10. jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);
  11. }
  12. public static Jedis getJedis() throws Exception {
  13. if(null!=jedisPool){
  14. return jedisPool.getResource();
  15. }
  16. throw new Exception("Jedispool is not ok");
  17. }
  18. }

最后就是业务类了

  1. public class RedisCanalClientExample
  2. {
  3. public static final Integer _60SECONDS = 60;
  4. public static final String REDIS_IP_ADDR = "192.168.111.185";
  5. private static void redisInsert(List<Column> columns)
  6. {
  7. JSONObject jsonObject = new JSONObject();
  8. for (Column column : columns)
  9. {
  10. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  11. jsonObject.put(column.getName(),column.getValue());
  12. }
  13. if(columns.size() > 0)
  14. {
  15. try(Jedis jedis = RedisUtils.getJedis())
  16. {
  17. jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
  18. }catch (Exception e){
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. private static void redisDelete(List<Column> columns)
  24. {
  25. JSONObject jsonObject = new JSONObject();
  26. for (Column column : columns)
  27. {
  28. jsonObject.put(column.getName(),column.getValue());
  29. }
  30. if(columns.size() > 0)
  31. {
  32. try(Jedis jedis = RedisUtils.getJedis())
  33. {
  34. jedis.del(columns.get(0).getValue());
  35. }catch (Exception e){
  36. e.printStackTrace();
  37. }
  38. }
  39. }
  40. private static void redisUpdate(List<Column> columns)
  41. {
  42. JSONObject jsonObject = new JSONObject();
  43. for (Column column : columns)
  44. {
  45. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  46. jsonObject.put(column.getName(),column.getValue());
  47. }
  48. if(columns.size() > 0)
  49. {
  50. try(Jedis jedis = RedisUtils.getJedis())
  51. {
  52. jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
  53. System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
  54. }catch (Exception e){
  55. e.printStackTrace();
  56. }
  57. }
  58. }
  59. public static void printEntry(List<Entry> entrys) {
  60. for (Entry entry : entrys) {
  61. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  62. continue;
  63. }
  64. RowChange rowChage = null;
  65. try {
  66. //获取变更的row数据
  67. rowChage = RowChange.parseFrom(entry.getStoreValue());
  68. } catch (Exception e) {
  69. throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
  70. }
  71. //获取变动类型
  72. EventType eventType = rowChage.getEventType();
  73. System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
  74. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  75. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
  76. for (RowData rowData : rowChage.getRowDatasList()) {
  77. if (eventType == EventType.INSERT) {
  78. redisInsert(rowData.getAfterColumnsList());
  79. } else if (eventType == EventType.DELETE) {
  80. redisDelete(rowData.getBeforeColumnsList());
  81. } else {//EventType.UPDATE
  82. redisUpdate(rowData.getAfterColumnsList());
  83. }
  84. }
  85. }
  86. }
  87. public static void main(String[] args)
  88. {
  89. System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
  90. //=================================
  91. // 创建链接canal服务端
  92. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
  93. 11111), "example", "", "");
  94. int batchSize = 1000;
  95. //空闲空转计数器
  96. int emptyCount = 0;
  97. System.out.println("---------------------canal init OK,开始监听mysql变化------");
  98. try {
  99. connector.connect();
  100. //connector.subscribe(".*\\..*");
  101. connector.subscribe("bigdata.t_user");
  102. connector.rollback();
  103. int totalEmptyCount = 10 * _60SECONDS;
  104. while (emptyCount < totalEmptyCount) {
  105. System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
  106. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  107. long batchId = message.getId();
  108. int size = message.getEntries().size();
  109. if (batchId == -1 || size == 0) {
  110. emptyCount++;
  111. try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
  112. } else {
  113. //计数器重新置零
  114. emptyCount = 0;
  115. printEntry(message.getEntries());
  116. }
  117. connector.ack(batchId); // 提交确认
  118. // connector.rollback(batchId); // 处理失败, 回滚数据
  119. }
  120. System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
  121. } finally {
  122. connector.disconnect();
  123. }
  124. }
  125. }

此时我们修改mysql,redis就会自动做同步,原因就是我们在这里打日志的时候也进行了redis的回写操作,大家可以在业务类中很容易的发现

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/727788
推荐阅读
相关标签
  

闽ICP备14008679号