当前位置:   article > 正文

Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性_canel同步到redis中

canel同步到redis中

目录

 

1. 背景

2. Windows系统安装canal

3.Mysql准备工作

4. 公共依赖包

5. Redis缓存设计

6. mall-canal-service


 

1. 背景

canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。所以其核心功能如下:

  • 数据实时备份
  • 异构数据源(elasticsearch、Hbase)与数据库数据增量同步
  • 业务缓存cache 刷新,保证缓存一致性
  • 带业务逻辑的增量数据处理,如监听某个数据的变化做一定的逻辑处理

canal是借助于MySQL主从复制原理实现

08456dcd79f140de9e945633bc447524.png

  • master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  • slave将master的binary log events拷贝到它的中继日志(relay log);
  • slave重做中继日志中的事件,将改变反映它自己的数据。

 canal的工作原理:

8ca49d5857924291a2de84e733e4f296.png

 

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

2. Windows系统安装canal

Github下载链接:Releases · alibaba/canal · GitHub

 77301713faec4dc1bd7554fe229c78f2.png

 解压

e31d0176c7e94ba19fb6e6b6e67af614.png

 进入/conf/example,修改instance.properties

  1. # position info
  2. canal.instance.master.address=127.0.0.1:3306
  3. canal.instance.master.journal.name=
  4. canal.instance.master.position=
  5. canal.instance.master.timestamp=
  6. canal.instance.master.gtid=
  7. # rds oss binlog
  8. canal.instance.rds.accesskey=
  9. canal.instance.rds.secretkey=
  10. canal.instance.rds.instanceId=
  11. # table meta tsdb info
  12. canal.instance.tsdb.enable=true
  13. #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
  14. #canal.instance.tsdb.dbUsername=canal
  15. #canal.instance.tsdb.dbPassword=canal
  16. #canal.instance.standby.address =
  17. #canal.instance.standby.journal.name =
  18. #canal.instance.standby.position =
  19. #canal.instance.standby.timestamp =
  20. #canal.instance.standby.gtid=
  21. # username/password
  22. canal.instance.dbUsername=root
  23. canal.instance.dbPassword=123456
  24. canal.instance.connectionCharset = UTF-8
  25. # enable druid Decrypt database password
  26. canal.instance.enableDruid=false
  27. #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
  28. # table regex
  29. canal.instance.filter.regex=.*\\..*
  30. # table black regex
  31. canal.instance.filter.black.regex=mysql\\.slave_.*
  32. # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  33. #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
  34. # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  35. #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
  36. # mq config
  37. canal.mq.topic=example
  38. # dynamic topic route by schema or table regex
  39. #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
  40. canal.mq.partition=0
  41. # hash partition config
  42. #canal.mq.partitionsNum=3
  43. #canal.mq.partitionHash=test.table:id^name,.*\\..*
  44. #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
  45. #################################################

之后进入到bin目录下启动startup就可以了。

3.Mysql准备工作

  1. /*
  2. SQLyog Community v13.2.0 (64 bit)
  3. MySQL - 8.0.33 : Database - shop
  4. *********************************************************************
  5. */
  6. /*!40101 SET NAMES utf8 */;
  7. /*!40101 SET SQL_MODE=''*/;
  8. /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
  9. /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
  10. /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
  11. /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
  12. CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
  13. USE `shop`;
  14. /*Table structure for table `brand` */
  15. DROP TABLE IF EXISTS `brand`;
  16. CREATE TABLE `brand` (
  17. `id` INT NOT NULL AUTO_INCREMENT COMMENT '品牌id',
  18. `name` VARCHAR(100) NOT NULL COMMENT '品牌名称',
  19. `image` VARCHAR(1000) DEFAULT '' COMMENT '品牌图片地址',
  20. `initial` VARCHAR(1) DEFAULT '' COMMENT '品牌的首字母',
  21. `sort` INT DEFAULT NULL COMMENT '排序',
  22. PRIMARY KEY (`id`)
  23. ) ENGINE=INNODB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 COMMENT='品牌表';
  24. /*Data for the table `brand` */
  25. INSERT INTO `brand`(`id`,`name`,`image`,`initial`,`sort`) VALUES
  26. (11,'华为','https://sklll.oss-cn-beijing.aliyuncs.com/secby/eed72cc4-a9c1-4010-949a-03cef5b933d6.jpg','',NULL),
  27. (12,'中兴','https://sklll.oss-cn-beijing.aliyuncs.com/secby/4fedb361-5ab3-4ad0-a667-580c1f37dff0.jpg','',NULL),
  28. (13,'大疆','https://sklll.oss-cn-beijing.aliyuncs.com/secby/e8382c48-0487-4a9b-8fd0-a3716c3eea19.jpg','',NULL);
  29. /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
  30. /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
  31. /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
  32. /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

4. 公共依赖包

  1. <dependencies>
  2. <!--web包-->
  3. <dependency>
  4. <groupId>org.springframework.boot</groupId>
  5. <artifactId>spring-boot-starter-web</artifactId>
  6. </dependency>
  7. <!--MyBatis Plus-->
  8. <dependency>
  9. <groupId>com.baomidou</groupId>
  10. <artifactId>mybatis-plus-boot-starter</artifactId>
  11. <version>3.3.2</version>
  12. </dependency>
  13. <!--MySQL-->
  14. <dependency>
  15. <groupId>mysql</groupId>
  16. <artifactId>mysql-connector-java</artifactId>
  17. <scope>runtime</scope>
  18. </dependency>
  19. <!--Redis-->
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-data-redis</artifactId>
  23. </dependency>
  24. <!--Nacos-->
  25. <dependency>
  26. <groupId>com.alibaba.cloud</groupId>
  27. <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>com.alibaba.cloud</groupId>
  31. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  32. </dependency>
  33. </dependencies>

5. Redis缓存设计

这里基于Springcloud Alibaba进行设计,对应的服务名称为mall-goods

bootstrap.yaml代码如下:

  1. server:
  2. port: 8081
  3. spring:
  4. application:
  5. name: mall-goods
  6. datasource:
  7. driver-class-name: com.mysql.cj.jdbc.Driver
  8. url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
  9. username: root
  10. password: 123456
  11. cloud:
  12. nacos:
  13. config:
  14. file-extension: yaml
  15. server-addr: localhost:8848
  16. discovery:
  17. #Nacos的注册地址
  18. server-addr: localhost:8848
  19. redis:
  20. host: xx //改为自己redis ip地址
  21. port: 6379

导入RedisConfig配置

  1. @Configuration
  2. public class RedisConfig {
  3. @Bean
  4. public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
  5. RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
  6. redisTemplate.setConnectionFactory(factory);
  7. GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
  8. // 值采用json序列化
  9. redisTemplate.setValueSerializer(serializer);
  10. //使用StringRedisSerializer来序列化和反序列化redis的key值
  11. redisTemplate.setKeySerializer(new StringRedisSerializer());
  12. // 设置hash key 和value序列化模式
  13. redisTemplate.setHashKeySerializer(new StringRedisSerializer());
  14. redisTemplate.setHashValueSerializer(serializer);
  15. redisTemplate.afterPropertiesSet();
  16. return redisTemplate;
  17. }
  18. @Bean
  19. public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
  20. RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());
  21. RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
  22. .entryTtl(Duration.ofHours(12))//设置默认缓存时间
  23. .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));
  24. return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);
  25. }
  26. }

对于Brand进行redis缓存设计

在主启动类上添加@EnableCaching开启缓存。

@Cacheable 注解可以标记一个方法需要被缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会先查找缓存,如果缓存中存在相应的数据,则直接从缓存中读取,否则执行方法并将结果缓存到缓存中。

@CachePut 注解可以标记一个方法需要更新缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会更新缓存中的数据。

@CacheEvict 注解可以标记一个方法需要删除缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会删除缓存中对应的数据。

代码如下:

  1. @RestController
  2. @RequestMapping("/brand")
  3. public class BrandController {
  4. @Autowired
  5. BrandService brandService;
  6. @GetMapping("/{id}")
  7. @Cacheable(value = "brand",key = "#id")
  8. public Brand search(@PathVariable(value = "id")Integer id){
  9. return brandService.getById(id);
  10. }
  11. /****
  12. * 添加缓存
  13. */
  14. @PostMapping
  15. @CachePut(value = "brand",key = "#brand.id")
  16. public Brand add(@RequestBody Brand brand){
  17. return brand;
  18. }
  19. /****
  20. * 修改缓存
  21. */
  22. @PutMapping
  23. @CachePut(value = "brand",key = "#brand.id")
  24. public Brand update(@RequestBody Brand brand){
  25. return brand;
  26. }
  27. /****
  28. * 删除缓存
  29. */
  30. @DeleteMapping("/{id}")
  31. @CacheEvict(value = "brand",key = "#id")
  32. public void delete(@PathVariable(value = "id")Integer id){
  33. }
  34. }

6. mall-canal-service

这里微服务名称为mall-canal-service,监控Mysql数据库数据变化进行实时的更新缓存。

除公共依赖外导入依赖

  1. <dependency>
  2. <groupId>top.javatool</groupId>
  3. <artifactId>canal-spring-boot-starter</artifactId>
  4. <version>1.2.1-RELEASE</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.cloud</groupId>
  8. <artifactId>spring-cloud-starter-openfeign</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.cloud</groupId>
  12. <artifactId>spring-cloud-loadbalancer</artifactId>
  13. </dependency>

bootstrap.yaml设计如下:

  1. server:
  2. port: 8083
  3. spring:
  4. application:
  5. name: mall-canal
  6. cloud:
  7. nacos:
  8. config:
  9. file-extension: yaml
  10. server-addr: localhost:8848
  11. discovery:
  12. #Nacos的注册地址
  13. server-addr: localhost:8848
  14. #Canal配置
  15. canal:
  16. server: localhost:11111
  17. destination: example

设计Feign接口

  1. @FeignClient(value = "mall-goods",contextId = "brand")//服务名字
  2. public interface BrandFeign {
  3. @GetMapping("/brand/{id}")
  4. Brand search(@PathVariable(value = "id")Integer id);
  5. @PostMapping("/brand")
  6. Brand add(@RequestBody Brand brand);
  7. /****
  8. * 修改方法
  9. */
  10. @PutMapping("/brand")
  11. Brand update(@RequestBody Brand brand);
  12. /****
  13. * 删除方法
  14. */
  15. @DeleteMapping("/brand/{id}")
  16. void delete(@PathVariable(value = "id")Integer id);
  17. }

Canal实时监控Mysql数据库变化,代码设计如下:

  1. @Component
  2. @CanalTable(value = "brand")
  3. public class BrandHandler implements EntryHandler<Brand> {
  4. @Autowired
  5. BrandFeign brandFeign;
  6. @Override
  7. public void insert(Brand brand) {
  8. System.out.println(brand);
  9. brandFeign.add(brand);
  10. }
  11. @Override
  12. public void update(Brand before, Brand after) {
  13. System.out.println(after);
  14. brandFeign.update(after);
  15. }
  16. @Override
  17. public void delete(Brand brand) {
  18. System.out.println(brand);
  19. brandFeign.delete(brand.getId());
  20. }
  21. }

在MallCanalApplication主启动类上添加@EnableFeignClients(basePackages = {"org.example.feign"}),包名为feign接口路径。

注意:本人在使用canal时,数据库有非varchar类型的null值,在运行时会抛异常错误。

  1. at top.javatool.canal.client.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:52) ~[canal-client-1.2.1-RELEASE.jar:na]
  2. at top.javatool.canal.client.handler.impl.AsyncMessageHandlerImpl.lambda$handleMessage$0(AsyncMessageHandlerImpl.java:30) ~[canal-client-1.2.1-RELEASE.jar:na]
  3. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_281]
  4. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_281]
  5. at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_281]
  6. Caused by: java.lang.NumberFormatException: For input string: ""
  7. at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_281]

代码运行后,经过测试成功。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/727804
推荐阅读
相关标签
  

闽ICP备14008679号