赞
踩
canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)
(1)阿里otter(阿里用于进行异地数据库之间的同步框架)中间件的一部分,这是原始场景;
(2)更新缓存:如果有大量的请求发送到mysql的话,mysql查询速度慢,QPS上不去,光查mysql可能会瘫痪,那就可以在前面加个缓存,这个缓存有2个主要的问题。一是缓存没有怎么办,二是数据不一致怎么办。对于第一个问题查缓存没有就查询mysql,mysql再往缓存中写一份。对于第二个问题,如果数据库修改了,那就采用异步的方式进行修改,启动一个canal服务,监控mysql,只要一有变化就同步缓存,这样mysql和缓存就能达到最终的一致性;
(3)抓取业务数据新增变化表,用于制作拉链表:做拉链表是需要有增加时间和修改时间的,需要数据今天新增和变化的数据,如果时间不全就没办法知道哪些是修改的。可以通过canal把变化的抽到自己的表里,以后数据就从这个表出。
(4)取业务表的新增变化数据,用于制作实时统计
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal的工作原理:把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。
(1)、注册用户成功,需要发送邮箱信息;
方式1:我们使用AOP切面,注册成功后,发送mq消息,监听mq消息队列,发送邮件。这样会存在业务层直接有耦合关系
方式2:使用canal监听数据库,监听用户表,如果用户表有数据插入,发送mq消息到队列中。监听mq消息队列,发送邮件。这样监听发送邮件与注册的业务没有任何耦合关系;
(2)、修改或者删除或者增加数据后,redis缓存中数据没有同步更新
方式1:使用AOP切面,@CacheEvict(value = "ums", key = "'resource'", beforeInvocation = false),先执行修改或者新增或者删除的代码,执行完成后,删除redis中的key。下次执行查询的数据后,将数据写到redis中@Cacheable(value = "ums", key = "'resource'");
这种方式有个弊端:
1.修改、删除、新增,每个方法上都需要添加@CacheEvict,删除redis中的key
2.每次执行修改、删除、新增后,执行查询的时候,先要去数据库查询,然后在同步到redis中;
方式2:使用canal监听数据库,监听对应的表,如果表中有增、删、改,发送mq消息,监听到mq消息队列,则删除redis中的对应key,重新读取最新的数据写入。这样修改或者增加完成后,返回查询数据,数据是正确的,并且返回后查询不需要等待较长时间,因为开辟了其他线程消费mq消息;
所有的应用都部署到docker容器上
1.docker查看mysql工作目录 docker inspect xx
MergedDir表示工作目录
2.进入工作目录
查看mysql配置文件
3.编辑mysql配置文件,在mysqld下增加如下配置
- [mysqld]
- log-bin=mysql-bin # 开启binlog
- binlog-format=ROW # 选择ROW模式
- server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可
4.重启docker容器
docker restart myMysql
添加交换机 canal.exchange
添加队列 canal.queue
队列绑定交换机,设置routingkey
1.下载镜像
[root@localhost /]# docker run --name myCanal -d -p 11111:11111 canal/canal-server
-d 后台启动 --name 起名字 -p端口映射 canal/canal-server镜像名称
查看canal工作目录
docker inspect myCanal
进入工作目录,修改配置文件
我们不用linux进入工作目录了,我们使用idea的sftp的功能进入工作目录修改配置文件
修改配置文件信息
修改文件后上传到服务器
引入jar包
- <!-- canal数据库监听,依赖的jar包。 需要注意canal监听的时候还需要mybatis-plus相关jar,没有监听的时候会报错 -->
- <dependency>
- <groupId>top.javatool</groupId>
- <artifactId>canal-spring-boot-starter</artifactId>
- </dependency>
-
- <!-- 依赖 数据库已经mybatis-plus相关jar包 -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- </dependency>
springboot配置文件,配置canal信息
- #如果配置了数据库监听发送mq消息到队列中,这个配置就不要了,因为这个默认是按照tcp连接方式连接的,我们修改了配置文件,连接方式修改为了mq
- canal:
- server: 192.168.75.131:11111
- destination: example
编写测试代码
-
- import com.powernode.entity.UmsUser;
- import org.springframework.mail.javamail.JavaMailSender;
-
- import org.springframework.stereotype.Component;
- import top.javatool.canal.client.annotation.CanalTable;
- import top.javatool.canal.client.handler.EntryHandler;
-
- import javax.annotation.Resource;
-
- /**
- * canal监听数据库,如果ums_user数据库插入数据,则发送邮件
- * 注意:
- * 如果我们先删除或者添加或者修改数据,服务启动后,依然会监听到。因此服务短暂的暂停不影响数据的完整性
- *
- * canal用在数据同步很好,例如:数据库更新,缓存也可以进行更新
- *
- * CanalTable 监听那个表
- */
- @Component
- @CanalTable(value = "ums_user")
- public class UserHandler implements EntryHandler<UmsUser> {
-
- @Resource
- private JavaMailSender javaMailSender;
-
- /**
- * 监听添加
- * @param umsUser 添加的umsUser对象数据
- */
- public void insert(UmsUser umsUser) {
- System.out.println("监听添加");
- System.out.println("添加====" + umsUser);
- //监听到添加数据后,进行发送邮件,不需要在往mq中发现消息,mq消费消息发送邮件了。彻底与业务层解耦了
- }
-
-
- /**
- * 监听修改
- * @param before 监听修改前user对象的被修改字段的数据,其他没有修改的都是null
- * 修改前=======UmsUser(name=BB, phone=null, email=null, icon=null, password=null, active=null, sort=null, fileInfoListList=null, description=null)
- * @param after 监听修改后的user对象的数据
- */
- public void update(UmsUser before, UmsUser after) {
- System.out.println("监听修改");
- System.out.println("修改前=======" + before);
- System.out.println("修改后=======" + after);
- }
-
- /**
- * 监听删除
- * @param umsUser 删除umsUser对象数据
- */
- public void delete(UmsUser umsUser) {
- System.out.println("监听删除");
- System.out.println("删除====" + umsUser);
- }
- }
启动spring项目
修改ums_user表信息,监听到休息的信息。
注意:canal只监听,增删改,不监听查询
通过上面我们可以发现,通过canal监听可以,对数据库的表进行监听,结合EntryHandler实现的增删改的方法,我们拿到增加、删除、修改的数据。可以对数据进行操作。
例如。添加数据后,我们可以在增加方法里面发送邮件;
上面那种方法数据库表变更后,就会canal就会监听到,调用对应的方法处理。
我们可以使用异步进行处理,表变更后,发送mq消息到队列,消费消息发送邮件;
继续修改canal instance.properties配置文件
修改canal.properties配置文件信息
删除springboot配置文件中canal配置信息,因为如果使用mq连接形式就不是tcp了
编写测试代码,ums_user表修改、添加、删除数据,发送邮件
- package com.powernode.core;
-
- import com.alibaba.fastjson.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.mail.javamail.JavaMailSender;
- import org.springframework.mail.javamail.MimeMessageHelper;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import javax.mail.MessagingException;
- import javax.mail.internet.MimeMessage;
- import java.util.List;
- import java.util.Map;
- import java.util.stream.Collectors;
- import java.util.stream.Stream;
-
- /**
- * canal监听数据库所有的表,如果数据库ums_user表插入数据,就给email队列中发送mq消息
- */
- @Slf4j
- @Component
- public class ListenerMysqlEmail {
-
- public static final String QUEUE_NAME = "email";
-
- @Resource
- JavaMailSender javaMailSender;
-
- @Value("${spring.mail.username}")
- String from;
-
- /**
- * canal监控
- * 消费mq, 判断如果监控到shop.ums_user表数据有更新或者新增或者删除,发送邮件
- * @param message
- */
- @RabbitListener(queues = QUEUE_NAME)
- public void listenerMysqlEmailQueue(Message message) {
- String body = new String(message.getBody());
- List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
- Map<String, Object> result = JSONObject.parseObject(body, Map.class);
- if("shop".equals(result.get("database")) && "ums_user".equals(result.get("table"))){
- if(typeList.contains((String) result.get("type"))){
-
- MimeMessage mimeMessage = javaMailSender.createMimeMessage();
- try {
- MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage,true);
- List<Map<String, Object>> data =(List)result.get("data");
- for (Map<String, Object> map:data) {
- String text = "<p>尊敬的" +map.get("name") + "先生/女士" + "</p><br/>" + "<p>系统为您创建了了用户,登录名是您的手机号或者邮箱</p></br>"
- + "<p>密码是" + map.get("password") +"</p>";
- mimeMessageHelper.setSubject("系统消息");
- mimeMessageHelper.setFrom(from);
- mimeMessageHelper.setTo((String) map.get("email"));
- mimeMessageHelper.setText(text);
-
- javaMailSender.send(mimeMessage);
- }
- } catch (MessagingException e) {
- e.printStackTrace();
- log.error("邮件发送异常" + e);
- }
- }
- }
- }
-
- }
5、编写测试代码,修改ums_resource表信息,删除redis缓存,重新刷新redis
- package com.powernode.core;
-
- import com.alibaba.fastjson.JSONObject;
- import com.powernode.service.UmsResourceService;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.util.List;
- import java.util.Map;
- import java.util.stream.Collectors;
- import java.util.stream.Stream;
-
- @Component
- public class ListenerResource {
- @Resource
- private UmsResourceService umsResourceService;
- @Resource
- private RedisTemplate redisTemplate;
-
- /**
- * 监听数据库
- * 如果监听到了shop.ums_resource表数据更新或者删除,更新redis数据
- * @param message
- */
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue(value = "email", durable = "true"),
- exchange = @Exchange(value = "canal.exchage"),
- key = "canalEmail"
- )
- })
- public void handleDataChange(Message message) {
- String body = new String(message.getBody());
- List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
- Map<String, Object> result = JSONObject.parseObject(body, Map.class);
- if ("shop".equals(result.get("database")) && "ums_resource".equals(result.get("table"))) {
- if (typeList.contains((String) result.get("type"))) {
- //如果是增删改,删除redis中的key,重新将数据刷新到redis中
- if (redisTemplate.delete("ums::resource")) {
- umsResourceService.getAll();
- }
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。