当前位置:   article > 正文

Canal数据库监听_canal监听

canal监听

1、什么是canal

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)

2、canal使用场景

(1)阿里otter(阿里用于进行异地数据库之间的同步框架)中间件的一部分,这是原始场景;

(2)更新缓存:如果有大量的请求发送到mysql的话,mysql查询速度慢,QPS上不去,光查mysql可能会瘫痪,那就可以在前面加个缓存,这个缓存有2个主要的问题。一是缓存没有怎么办,二是数据不一致怎么办。对于第一个问题查缓存没有就查询mysql,mysql再往缓存中写一份。对于第二个问题,如果数据库修改了,那就采用异步的方式进行修改,启动一个canal服务,监控mysql,只要一有变化就同步缓存,这样mysql和缓存就能达到最终的一致性;

(3)抓取业务数据新增变化表,用于制作拉链表:做拉链表是需要有增加时间和修改时间的,需要数据今天新增和变化的数据,如果时间不全就没办法知道哪些是修改的。可以通过canal把变化的抽到自己的表里,以后数据就从这个表出。

(4)取业务表的新增变化数据,用于制作实时统计

3、canal工作原理

1. MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal的工作原理:把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。

4、 Canal 实战

  1. 需求分析:

(1)、注册用户成功,需要发送邮箱信息;

方式1:我们使用AOP切面,注册成功后,发送mq消息,监听mq消息队列,发送邮件。这样会存在业务层直接有耦合关系

方式2:使用canal监听数据库,监听用户表,如果用户表有数据插入,发送mq消息到队列中。监听mq消息队列,发送邮件。这样监听发送邮件与注册的业务没有任何耦合关系;

(2)、修改或者删除或者增加数据后,redis缓存中数据没有同步更新

方式1:使用AOP切面,@CacheEvict(value = "ums", key = "'resource'", beforeInvocation = false),先执行修改或者新增或者删除的代码,执行完成后,删除redis中的key。下次执行查询的数据后,将数据写到redis中@Cacheable(value = "ums", key = "'resource'");

这种方式有个弊端:

1.修改、删除、新增,每个方法上都需要添加@CacheEvict,删除redis中的key

2.每次执行修改、删除、新增后,执行查询的时候,先要去数据库查询,然后在同步到redis中;

方式2:使用canal监听数据库,监听对应的表,如果表中有增、删、改,发送mq消息,监听到mq消息队列,则删除redis中的对应key,重新读取最新的数据写入。这样修改或者增加完成后,返回查询数据,数据是正确的,并且返回后查询不需要等待较长时间,因为开辟了其他线程消费mq消息;

  1. 环境配置

所有的应用都部署到docker容器上

  1. MySQL开启 binlog 日志

1.docker查看mysql工作目录 docker inspect xx

MergedDir表示工作目录

2.进入工作目录

查看mysql配置文件

3.编辑mysql配置文件,在mysqld下增加如下配置

  1. [mysqld]
  2. log-bin=mysql-bin # 开启binlog
  3. binlog-format=ROW # 选择ROW模式
  4. server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可

4.重启docker容器

docker restart myMysql

2. RabbitMQ 队列创建

  • 添加交换机 canal.exchange

  • 添加队列 canal.queue

  • 队列绑定交换机,设置routingkey

  1. canal配置与启动

1.下载镜像

[root@localhost /]# docker run --name myCanal -d -p 11111:11111 canal/canal-server

-d 后台启动 --name 起名字 -p端口映射 canal/canal-server镜像名称

  1. 查看canal工作目录

docker inspect myCanal

  1. 进入工作目录,修改配置文件

我们不用linux进入工作目录了,我们使用idea的sftp的功能进入工作目录修改配置文件

修改配置文件信息

修改文件后上传到服务器

3、springboot整合canal

  1. 引入jar包

  1. <!-- canal数据库监听,依赖的jar包。 需要注意canal监听的时候还需要mybatis-plus相关jar,没有监听的时候会报错 -->
  2. <dependency>
  3. <groupId>top.javatool</groupId>
  4. <artifactId>canal-spring-boot-starter</artifactId>
  5. </dependency>
  6. <!-- 依赖 数据库已经mybatis-plus相关jar包 -->
  7. <dependency>
  8. <groupId>mysql</groupId>
  9. <artifactId>mysql-connector-java</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.baomidou</groupId>
  13. <artifactId>mybatis-plus-boot-starter</artifactId>
  14. </dependency>
  1. springboot配置文件,配置canal信息

  1. #如果配置了数据库监听发送mq消息到队列中,这个配置就不要了,因为这个默认是按照tcp连接方式连接的,我们修改了配置文件,连接方式修改为了mq
  2. canal:
  3. server: 192.168.75.131:11111
  4. destination: example
  1. 编写测试代码

  1. import com.powernode.entity.UmsUser;
  2. import org.springframework.mail.javamail.JavaMailSender;
  3. import org.springframework.stereotype.Component;
  4. import top.javatool.canal.client.annotation.CanalTable;
  5. import top.javatool.canal.client.handler.EntryHandler;
  6. import javax.annotation.Resource;
  7. /**
  8. * canal监听数据库,如果ums_user数据库插入数据,则发送邮件
  9. * 注意:
  10. * 如果我们先删除或者添加或者修改数据,服务启动后,依然会监听到。因此服务短暂的暂停不影响数据的完整性
  11. *
  12. * canal用在数据同步很好,例如:数据库更新,缓存也可以进行更新
  13. *
  14. * CanalTable 监听那个表
  15. */
  16. @Component
  17. @CanalTable(value = "ums_user")
  18. public class UserHandler implements EntryHandler<UmsUser> {
  19. @Resource
  20. private JavaMailSender javaMailSender;
  21. /**
  22. * 监听添加
  23. * @param umsUser 添加的umsUser对象数据
  24. */
  25. public void insert(UmsUser umsUser) {
  26. System.out.println("监听添加");
  27. System.out.println("添加====" + umsUser);
  28. //监听到添加数据后,进行发送邮件,不需要在往mq中发现消息,mq消费消息发送邮件了。彻底与业务层解耦了
  29. }
  30. /**
  31. * 监听修改
  32. * @param before 监听修改前user对象的被修改字段的数据,其他没有修改的都是null
  33. * 修改前=======UmsUser(name=BB, phone=null, email=null, icon=null, password=null, active=null, sort=null, fileInfoListList=null, description=null)
  34. * @param after 监听修改后的user对象的数据
  35. */
  36. public void update(UmsUser before, UmsUser after) {
  37. System.out.println("监听修改");
  38. System.out.println("修改前=======" + before);
  39. System.out.println("修改后=======" + after);
  40. }
  41. /**
  42. * 监听删除
  43. * @param umsUser 删除umsUser对象数据
  44. */
  45. public void delete(UmsUser umsUser) {
  46. System.out.println("监听删除");
  47. System.out.println("删除====" + umsUser);
  48. }
  49. }

启动spring项目

修改ums_user表信息,监听到休息的信息。

注意:canal只监听,增删改,不监听查询

通过上面我们可以发现,通过canal监听可以,对数据库的表进行监听,结合EntryHandler实现的增删改的方法,我们拿到增加、删除、修改的数据。可以对数据进行操作。

例如。添加数据后,我们可以在增加方法里面发送邮件;

4、springboot + canal +rabbitmq 整合

上面那种方法数据库表变更后,就会canal就会监听到,调用对应的方法处理。

我们可以使用异步进行处理,表变更后,发送mq消息到队列,消费消息发送邮件;

  1. 继续修改canal instance.properties配置文件

  1. 修改canal.properties配置文件信息

  1. 删除springboot配置文件中canal配置信息,因为如果使用mq连接形式就不是tcp了

  1. 编写测试代码,ums_user表修改、添加、删除数据,发送邮件

  1. package com.powernode.core;
  2. import com.alibaba.fastjson.JSONObject;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.mail.javamail.JavaMailSender;
  7. import org.springframework.mail.javamail.MimeMessageHelper;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.Resource;
  10. import javax.mail.MessagingException;
  11. import javax.mail.internet.MimeMessage;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.stream.Collectors;
  15. import java.util.stream.Stream;
  16. /**
  17. * canal监听数据库所有的表,如果数据库ums_user表插入数据,就给email队列中发送mq消息
  18. */
  19. @Slf4j
  20. @Component
  21. public class ListenerMysqlEmail {
  22. public static final String QUEUE_NAME = "email";
  23. @Resource
  24. JavaMailSender javaMailSender;
  25. @Value("${spring.mail.username}")
  26. String from;
  27. /**
  28. * canal监控
  29. * 消费mq, 判断如果监控到shop.ums_user表数据有更新或者新增或者删除,发送邮件
  30. * @param message
  31. */
  32. @RabbitListener(queues = QUEUE_NAME)
  33. public void listenerMysqlEmailQueue(Message message) {
  34. String body = new String(message.getBody());
  35. List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
  36. Map<String, Object> result = JSONObject.parseObject(body, Map.class);
  37. if("shop".equals(result.get("database")) && "ums_user".equals(result.get("table"))){
  38. if(typeList.contains((String) result.get("type"))){
  39. MimeMessage mimeMessage = javaMailSender.createMimeMessage();
  40. try {
  41. MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage,true);
  42. List<Map<String, Object>> data =(List)result.get("data");
  43. for (Map<String, Object> map:data) {
  44. String text = "<p>尊敬的" +map.get("name") + "先生/女士" + "</p><br/>" + "<p>系统为您创建了了用户,登录名是您的手机号或者邮箱</p></br>"
  45. + "<p>密码是" + map.get("password") +"</p>";
  46. mimeMessageHelper.setSubject("系统消息");
  47. mimeMessageHelper.setFrom(from);
  48. mimeMessageHelper.setTo((String) map.get("email"));
  49. mimeMessageHelper.setText(text);
  50. javaMailSender.send(mimeMessage);
  51. }
  52. } catch (MessagingException e) {
  53. e.printStackTrace();
  54. log.error("邮件发送异常" + e);
  55. }
  56. }
  57. }
  58. }
  59. }

5、编写测试代码,修改ums_resource表信息,删除redis缓存,重新刷新redis

  1. package com.powernode.core;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.powernode.service.UmsResourceService;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.Exchange;
  6. import org.springframework.amqp.rabbit.annotation.Queue;
  7. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.data.redis.core.RedisTemplate;
  10. import org.springframework.stereotype.Component;
  11. import javax.annotation.Resource;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.stream.Collectors;
  15. import java.util.stream.Stream;
  16. @Component
  17. public class ListenerResource {
  18. @Resource
  19. private UmsResourceService umsResourceService;
  20. @Resource
  21. private RedisTemplate redisTemplate;
  22. /**
  23. * 监听数据库
  24. * 如果监听到了shop.ums_resource表数据更新或者删除,更新redis数据
  25. * @param message
  26. */
  27. @RabbitListener(bindings = {
  28. @QueueBinding(
  29. value = @Queue(value = "email", durable = "true"),
  30. exchange = @Exchange(value = "canal.exchage"),
  31. key = "canalEmail"
  32. )
  33. })
  34. public void handleDataChange(Message message) {
  35. String body = new String(message.getBody());
  36. List<String> typeList = Stream.of("INSERT", "UPDATE", "DELETE").collect(Collectors.toList());
  37. Map<String, Object> result = JSONObject.parseObject(body, Map.class);
  38. if ("shop".equals(result.get("database")) && "ums_resource".equals(result.get("table"))) {
  39. if (typeList.contains((String) result.get("type"))) {
  40. //如果是增删改,删除redis中的key,重新将数据刷新到redis中
  41. if (redisTemplate.delete("ums::resource")) {
  42. umsResourceService.getAll();
  43. }
  44. }
  45. }
  46. }
  47. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/85681
推荐阅读
相关标签
  

闽ICP备14008679号