赞
踩
canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。
canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。
阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
原理相对比较简单:
加个canal slave 监听mysql数据变化日志,只适用mysql数据库。读取的数据可以同步到redis、其他数据库、ES等。
canal需要使用到mysql,我们需要先安装mysql, 安装mysql容器,但canal是基于mysql的主从模式实现的,所以必须先开启binlog.
binlog可以拿到mysql数据日志,canal再去获取日志信息。
先使用docker 创建mysql容器。
(1) 连接到mysql中,并修改/etc/mysql/mysql.conf.d/mysqld.cnf 需要开启主从模式,开启binlog模式。
执行如下命令,编辑mysql配置文件
命令行如下:
docker exec -it mysql /bin/bash
cd /etc/mysql/mysql.conf.d
vi mysqld.cnf
修改mysqld.cnf配置文件,添加如下配置:
上图配置如下:
二进制模式的日志目录
数据库唯一id
log-bin/var/lib/mysql/mysql-bin
server-id=12345
(2) 创建账号,用于测试使用
使用root账号创建用户并授予权限。账号canal,密码canal
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
(3)重启mysql容器
docker restart mysql
下载镜像:
docker pull docker.io/canal/canal-server
容器安装
-p端口映射,-d后台运行
docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
进入容器,修改核心配置canal.properties 和instance.properties,canal.properties 是canal自身的配置,instance.properties是需要同步数据的数据库连接配置。
执行代码如下:
docker exec -it canal /bin/bash
cd canal-server/conf/
vi canal.properties
cd example/
vi instance.properties
修改canal.properties的id,不能和mysql的server-id重复,如下图:
修改instance.properties,配置数据库连接地址:
改需要监听的数据库ip地址
配置里面有username和password,都是canal
这里的canal.instance.filter.regex
有多种配置,如下:
数据库正则表达式
.*
表示所有数据库
\\..*
表示所有表
.*\\..*
表示所有数据库的所有表都被监听
可以参考地址如下:
https://github.com/alibaba/canal/wiki/AdminGuide
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
配置完成后,设置开机启动,并记得重启canal。
exit
docker update --restart=always canal
docker restart canal
当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行同步到redis(也可以是另一个数据库、ES)中即可。
思路:创建一个独立的程序,并监控canal服务器,获取binlog日志,解析数据,将数据更新到redis中。这样数据就更新了。
github地址:https://github.com/wanwujiedao/spring-boot-starter-canal、
https://github.com/alibaba/canal
(1)安装辅助jar包
在spring-boot-starter-canal-master
中有一个工程starter-canal
,它主要提供了SpringBoot环境下canal
的支持,我们需要先安装该工程,在starter-canal
目录下执行mvn install
下载jar到本地,如下图:
(2)canal微服务工程搭建
创建service-canal工程,并引入相关配置。
pom.xml
<!--canal依赖-->
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
application.yml配置
server:
port: 18082
spring:
application:
name: canal
#example实例,对应canal配置文件
canal:
client:
instances:
example:
host: 192.168.169.140
port: 11111
userName: canal
password: canal
#springCloud eureka: client: service-url: defaultZone: http://127.0.0.1:7001/eureka instance: prefer-ip-address: true feign: hystrix: enabled: true #hystrix 配置 hystrix: command: default: execution: timeout: #如果enabled设置为false,则请求超时交给ribbon控制 enabled: true isolation: strategy: SEMAPHORE
(3)启动类创建
在包下创建启动类,代码如下:
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class}) // 忽略数据库连接
@EnableEurekaClient // springCloud注册中心
@EnableCanalClient // canal客户端
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class,args);
}
}
(4)监听创建
创建一个CanalDataEventListener类,实现对表增删改操作的监听,代码如下:
package com.changgou.service.canal.listener; import com.alibaba.otter.canal.protocol.CanalEntry; import com.xpand.starter.canal.annotation.CanalEventListener; import com.xpand.starter.canal.annotation.DeleteListenPoint; import com.xpand.starter.canal.annotation.InsertListenPoint; import com.xpand.starter.canal.annotation.ListenPoint; import com.xpand.starter.canal.annotation.UpdateListenPoint; /** * Title:实现对mysql数据库数据日志的监听 * Description: * @author WZQ * @version 1.0.0 * @date 2020/3/4 */ @CanalEventListener public class CanalDataEventListener { /*** * 增加数据监听 * @param eventType 当前操作的类型 增加数据 * @param rowData 发生变更的数据-->>增加的数据 */ @InsertListenPoint public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { System.out.println("增加数据:"); rowData.getAfterColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue())); } // rowData.getAfterColumnsList() 之后的数据,适用于增加、修改 // rowData.getBeforeColumnsList() 之前的数据,适用于删除 /** * 修改数据监听 * @param eventType * @param rowData 发生变更的数据-->>修改的数据 */ @UpdateListenPoint public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { System.out.println("修改前的数据:"); rowData.getBeforeColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue())); System.out.println("修改后的数据:"); rowData.getAfterColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue())); } /*** * 删除数据监听 * @param eventType */ @DeleteListenPoint public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { System.out.println("删除数据:"); rowData.getBeforeColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue())); } /*** * 自定义数据修改监听,指定监听的库,表 * @param eventType * @param rowData */ @ListenPoint(destination = "example", // 实例配置 schema = "changgou_content", // 库 table = {"tb_content_category", "tb_content"}, // 表 eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE}) // 监听类型,修改数据,删除数据 public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { System.out.println("自定义修改前的数据:"); rowData.getBeforeColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue())); System.out.println("自定义修改后的数据:"); rowData.getAfterColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue())); } }
(5)测试
启动canal微服务,然后修改任意数据库的表数据,canal微服务后台输出如下:
监听修改的数据并同步到redis缓存(Mysql、ES)。一般缓存的数据是静态数据,防止高并发。
广告图片缓存同步:
如上图,每次执行广告操作的时候,会记录操作日志到,然后将操作日志发送给canal,canal将操作记录发送给canal微服务,canal微服务根据修改的分类ID调用content微服务查询分类对应的所有广告,canal微服务再将所有广告存入到Redis缓存。
service-content微服务是广告微服务的增删改查,这里不用写出来。canal微服务根据修改的分类ID调用content微服务查询分类对应的所有广告,这里要使用到fegin,微服务之间的调用。
/***
* 根据categoryId查询广告集合
*/
@GetMapping(value = "/list/category/{id}")
public ResponseResult<List<Content>> findByCategory(@PathVariable Long id){
//根据分类ID查询广告集合
List<Content> contents = contentService.findByCategory(id);
return new ResponseResult<List<Content>>(true,StatusCode.OK,"查询成功!",contents);
}
#springCloud配置 eureka: client: service-url: defaultZone: http://127.0.0.1:7001/eureka instance: prefer-ip-address: true feign: hystrix: enabled: true #hystrix 配置 hystrix: command: default: execution: timeout: #如果enabled设置为false,则请求超时交给ribbon控制 enabled: true isolation: strategy: SEMAPHORE
<!-- Spring Cloud --> <spring-cloud.version>Greenwich.SR2</spring-cloud.version> <!-- Spring Cloud --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!-- eureka注册中心,只有eureka-server微服务用到 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <!-- eureka-client客户端 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!-- openfeign --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <!-- redis 使用--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
另外在另一个工程中加入fegin,方法和地址跟微服务controller一样,service-content-fegin中直接加,不用配置文件,不用启动类,放个接口就行:
@FeignClient(name="content")
@RequestMapping(value = "/content")
public interface ContentFeign {
/***
* 根据分类ID查询所有广告
*/
@GetMapping(value = "/list/category/{id}")
ResponseResult<List<Content>> findByCategory(@PathVariable Long id);
}
在canal微服务中修改如下:
(1)配置redis
修改application.yml配置文件,添加redis配置,如下代码:
redis有设置密码则添加password
(2)启动类中开启feign
修改CanalApplication,添加@EnableFeignClients
注解,扫描fegin包,可调用content微服务controller方法。代码如下:
(3)同步实现
修改监听类CanalDataEventListener,实现监听广告的增删改,并根据增删改的数据使用feign查询对应分类的所有广告,将广告存入到Redis中,代码如下:
上图代码如下:
/** * Title:实现对mysql数据库数据日志的监听 * Description: * @author WZQ * @version 1.0.0 * @date 2020/3/4 */ @CanalEventListener public class CanalDataEventListener { @Resource private ContentFeign contentFeign; //字符串 @Resource private StringRedisTemplate stringRedisTemplate; //自定义数据库的 操作来监听 //destination = "example" /** * 自定义数据库的 操作来监听 * @param eventType 数据库修改数据类型,增改删 * @param rowData 数据 */ @ListenPoint(destination = "example", schema = "changgou_content", table = {"tb_content", "tb_content_category"}, eventType = { CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE, CanalEntry.EventType.INSERT}) public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { //1.获取列名 为category_id的值 String categoryId = getColumnValue(eventType, rowData); //2.调用feign 获取该分类下的所有的广告集合 ResponseResult<List<Content>> categoryresut = contentFeign.findByCategory(Long.valueOf(categoryId)); List<Content> data = categoryresut.getData(); //3.使用redisTemplate存储到redis中,存json值 stringRedisTemplate.boundValueOps("content_" + categoryId).set(JSON.toJSONString(data)); } private String getColumnValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { String categoryId = ""; //判断 如果是删除 则获取beforlist if (eventType == CanalEntry.EventType.DELETE) { for (CanalEntry.Column column : rowData.getBeforeColumnsList()) { // 列名为category_id if (column.getName().equalsIgnoreCase("category_id")) { categoryId = column.getValue(); return categoryId; } } } else { //判断 如果是添加 或者是更新 获取afterlist for (CanalEntry.Column column : rowData.getAfterColumnsList()) { if (column.getName().equalsIgnoreCase("category_id")) { categoryId = column.getValue(); return categoryId; } } } return categoryId; } }
测试:
修改数据库数据,可以看到Redis中的缓存跟着一起变化
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。