赞
踩
目录
canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。所以其核心功能如下:
canal是借助于MySQL主从复制原理实现
- master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
- slave将master的binary log events拷贝到它的中继日志(relay log);
- slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理:
Github下载链接:Releases · alibaba/canal · GitHub
解压
进入/conf/example,修改instance.properties
- # position info
- canal.instance.master.address=127.0.0.1:3306
- canal.instance.master.journal.name=
- canal.instance.master.position=
- canal.instance.master.timestamp=
- canal.instance.master.gtid=
-
- # rds oss binlog
- canal.instance.rds.accesskey=
- canal.instance.rds.secretkey=
- canal.instance.rds.instanceId=
-
- # table meta tsdb info
- canal.instance.tsdb.enable=true
- #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
- #canal.instance.tsdb.dbUsername=canal
- #canal.instance.tsdb.dbPassword=canal
-
- #canal.instance.standby.address =
- #canal.instance.standby.journal.name =
- #canal.instance.standby.position =
- #canal.instance.standby.timestamp =
- #canal.instance.standby.gtid=
-
- # username/password
- canal.instance.dbUsername=root
- canal.instance.dbPassword=123456
- canal.instance.connectionCharset = UTF-8
- # enable druid Decrypt database password
- canal.instance.enableDruid=false
- #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
-
- # table regex
- canal.instance.filter.regex=.*\\..*
- # table black regex
- canal.instance.filter.black.regex=mysql\\.slave_.*
- # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
- #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
- # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
- #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
-
- # mq config
- canal.mq.topic=example
- # dynamic topic route by schema or table regex
- #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
- canal.mq.partition=0
- # hash partition config
- #canal.mq.partitionsNum=3
- #canal.mq.partitionHash=test.table:id^name,.*\\..*
- #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
- #################################################
之后进入到bin目录下启动startup就可以了。
- /*
- SQLyog Community v13.2.0 (64 bit)
- MySQL - 8.0.33 : Database - shop
- *********************************************************************
- */
-
- /*!40101 SET NAMES utf8 */;
-
- /*!40101 SET SQL_MODE=''*/;
-
- /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
- /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
- /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
- /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
- CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
-
- USE `shop`;
-
- /*Table structure for table `brand` */
-
- DROP TABLE IF EXISTS `brand`;
-
- CREATE TABLE `brand` (
- `id` INT NOT NULL AUTO_INCREMENT COMMENT '品牌id',
- `name` VARCHAR(100) NOT NULL COMMENT '品牌名称',
- `image` VARCHAR(1000) DEFAULT '' COMMENT '品牌图片地址',
- `initial` VARCHAR(1) DEFAULT '' COMMENT '品牌的首字母',
- `sort` INT DEFAULT NULL COMMENT '排序',
- PRIMARY KEY (`id`)
- ) ENGINE=INNODB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 COMMENT='品牌表';
-
- /*Data for the table `brand` */
-
- INSERT INTO `brand`(`id`,`name`,`image`,`initial`,`sort`) VALUES
- (11,'华为','https://sklll.oss-cn-beijing.aliyuncs.com/secby/eed72cc4-a9c1-4010-949a-03cef5b933d6.jpg','',NULL),
- (12,'中兴','https://sklll.oss-cn-beijing.aliyuncs.com/secby/4fedb361-5ab3-4ad0-a667-580c1f37dff0.jpg','',NULL),
- (13,'大疆','https://sklll.oss-cn-beijing.aliyuncs.com/secby/e8382c48-0487-4a9b-8fd0-a3716c3eea19.jpg','',NULL);
-
- /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
- /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
- /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
- /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
- <dependencies>
- <!--web包-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <!--MyBatis Plus-->
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- <version>3.3.2</version>
- </dependency>
-
- <!--MySQL-->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <scope>runtime</scope>
- </dependency>
-
- <!--Redis-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
-
- <!--Nacos-->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
- </dependency>
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
- </dependency>
- </dependencies>
这里基于Springcloud Alibaba进行设计,对应的服务名称为mall-goods
bootstrap.yaml代码如下:
- server:
- port: 8081
- spring:
- application:
- name: mall-goods
- datasource:
- driver-class-name: com.mysql.cj.jdbc.Driver
- url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
- username: root
- password: 123456
- cloud:
- nacos:
- config:
- file-extension: yaml
- server-addr: localhost:8848
- discovery:
- #Nacos的注册地址
- server-addr: localhost:8848
-
- redis:
- host: xx //改为自己redis ip地址
- port: 6379
导入RedisConfig配置
- @Configuration
- public class RedisConfig {
-
- @Bean
- public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
- RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
- redisTemplate.setConnectionFactory(factory);
- GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
- // 值采用json序列化
- redisTemplate.setValueSerializer(serializer);
- //使用StringRedisSerializer来序列化和反序列化redis的key值
- redisTemplate.setKeySerializer(new StringRedisSerializer());
- // 设置hash key 和value序列化模式
- redisTemplate.setHashKeySerializer(new StringRedisSerializer());
- redisTemplate.setHashValueSerializer(serializer);
- redisTemplate.afterPropertiesSet();
- return redisTemplate;
- }
-
-
- @Bean
- public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
- RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());
- RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
- .entryTtl(Duration.ofHours(12))//设置默认缓存时间
- .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));
- return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);
- }
- }
对于Brand进行redis缓存设计
在主启动类上添加@EnableCaching开启缓存。
@Cacheable
注解可以标记一个方法需要被缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会先查找缓存,如果缓存中存在相应的数据,则直接从缓存中读取,否则执行方法并将结果缓存到缓存中。
@CachePut
注解可以标记一个方法需要更新缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会更新缓存中的数据。
@CacheEvict
注解可以标记一个方法需要删除缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会删除缓存中对应的数据。
代码如下:
- @RestController
- @RequestMapping("/brand")
- public class BrandController {
-
- @Autowired
- BrandService brandService;
-
- @GetMapping("/{id}")
- @Cacheable(value = "brand",key = "#id")
- public Brand search(@PathVariable(value = "id")Integer id){
- return brandService.getById(id);
- }
- /****
- * 添加缓存
- */
- @PostMapping
- @CachePut(value = "brand",key = "#brand.id")
- public Brand add(@RequestBody Brand brand){
- return brand;
- }
-
- /****
- * 修改缓存
- */
- @PutMapping
- @CachePut(value = "brand",key = "#brand.id")
- public Brand update(@RequestBody Brand brand){
- return brand;
- }
-
- /****
- * 删除缓存
- */
- @DeleteMapping("/{id}")
- @CacheEvict(value = "brand",key = "#id")
- public void delete(@PathVariable(value = "id")Integer id){
- }
- }
这里微服务名称为mall-canal-service,监控Mysql数据库数据变化进行实时的更新缓存。
除公共依赖外导入依赖
- <dependency>
- <groupId>top.javatool</groupId>
- <artifactId>canal-spring-boot-starter</artifactId>
- <version>1.2.1-RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-openfeign</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-loadbalancer</artifactId>
- </dependency>
bootstrap.yaml设计如下:
- server:
- port: 8083
- spring:
- application:
- name: mall-canal
- cloud:
- nacos:
- config:
- file-extension: yaml
- server-addr: localhost:8848
- discovery:
- #Nacos的注册地址
- server-addr: localhost:8848
- #Canal配置
- canal:
- server: localhost:11111
- destination: example
设计Feign接口
- @FeignClient(value = "mall-goods",contextId = "brand")//服务名字
- public interface BrandFeign {
-
- @GetMapping("/brand/{id}")
- Brand search(@PathVariable(value = "id")Integer id);
-
-
- @PostMapping("/brand")
- Brand add(@RequestBody Brand brand);
-
-
- /****
- * 修改方法
- */
- @PutMapping("/brand")
- Brand update(@RequestBody Brand brand);
-
-
- /****
- * 删除方法
- */
- @DeleteMapping("/brand/{id}")
- void delete(@PathVariable(value = "id")Integer id);
-
- }
Canal实时监控Mysql数据库变化,代码设计如下:
- @Component
- @CanalTable(value = "brand")
- public class BrandHandler implements EntryHandler<Brand> {
-
- @Autowired
- BrandFeign brandFeign;
-
- @Override
- public void insert(Brand brand) {
- System.out.println(brand);
- brandFeign.add(brand);
- }
-
- @Override
- public void update(Brand before, Brand after) {
- System.out.println(after);
- brandFeign.update(after);
- }
-
- @Override
- public void delete(Brand brand) {
- System.out.println(brand);
- brandFeign.delete(brand.getId());
- }
- }
在MallCanalApplication主启动类上添加@EnableFeignClients(basePackages = {"org.example.feign"}),包名为feign接口路径。
注意:本人在使用canal时,数据库有非varchar类型的null值,在运行时会抛异常错误。
- at top.javatool.canal.client.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:52) ~[canal-client-1.2.1-RELEASE.jar:na]
- at top.javatool.canal.client.handler.impl.AsyncMessageHandlerImpl.lambda$handleMessage$0(AsyncMessageHandlerImpl.java:30) ~[canal-client-1.2.1-RELEASE.jar:na]
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_281]
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_281]
- at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_281]
- Caused by: java.lang.NumberFormatException: For input string: ""
- at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_281]
代码运行后,经过测试成功。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。