当前位置:   article > 正文

canal搭配RabbitMQ实现数据同步解决方案_canal 同步消息到rabbitmq

canal 同步消息到rabbitmq

目录

1.cannal的使用及原理

2.docker安装cannal

3.在java中使用cannal及RabbitMQ


1.cannal的使用及原理

cannal主要支持MySQL数据库,它会将自己伪装成从节点产生与主节点的交互操作,主节点一旦数据发生变更也就是发生增删改的操作就会将这些信息写入到binlog日志中并且推送到从节点也就是cannal,然后从节点去执行信息里增删改的操作,从而实现数据主从节点数据一致。

2.docker安装cannal

1.下载镜像

docker pull docker.io/canal/canal-server

2.容器安装

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

进入容器,修改核心配置canal.properties 和instance.properties,canal.properties 是canal自身的配置,instance.properties是需要同步数据的数据库连接配置。

执行代码如下:

  1. docker exec -it canal /bin/bash
  2. cd canal-server/conf/
  3. vi canal.properties
  4. cd example/
  5. vi instance.properties

修改canal.properties的id,不能和mysql的server-id重复,如下图:

修改instance.properties,配置数据库连接地址:   其实这个不做也可以在Java代码中也可以指定监听那个数据库那张表

设置开机启动,并重启cannal

  1. docker update --restart=always canal
  2. docker restart canal

3.在java中使用cannal及RabbitMQ

1.POM.XML

  1. #cannal
  2. <dependency>
  3. <groupId>com.xpand</groupId>
  4. <artifactId>starter-canal</artifactId>
  5. <version>0.0.1-SNAPSHOT</version>
  6. </dependency>
  7. #RabbitMQ
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-amqp</artifactId>
  11. </dependency>

2.application.yml

  1. #canal配置
  2. canal:
  3. client:
  4. instances:
  5. # exmaple
  6. example:
  7. host: 127.0.0.1
  8. port: 11111

3.SpringBoot启动类

  1. @SpringBootApplication
  2. @EnableCanalClient #声明当前服务是cannal监控数据变化客户端
  3. public class CanalApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(CanalApplication.class,args);
  6. }
  7. }

4.实例代码cannal将监听到的数据通过MQ进行发送

  1. /**
  2. * @author JiaWei
  3. */
  4. @Slf4j
  5. @CanalEventListener //声明当前的类是canal的监听类
  6. public class MycannalListener {
  7. @Autowired
  8. RabbitTemplate rabbitTemplate;
  9. /**
  10. * @param eventType 当前操作数据库
  11. * @param rowData 当前操作数据库数据
  12. */
  13. @ListenPoint(schema = "changgou_businness",table = "tb_ad")
  14. public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
  15. System.out.println("广告数据发生改变");
  16. //改变之前的数据
  17. rowData.getBeforeColumnsList().forEach((x) -> System.out.println("改变前的数据:" + x.getName() + "::" + x.getValue()));
  18. //改变之后的数据
  19. rowData.getAfterColumnsList().forEach((x) -> System.out.println("改变后的数据:" + x.getName() + "::" + x.getValue()));
  20. for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
  21. if ("position".equals(column)){
  22. log.info("cannal监听到的数据为{}",column + "已经发送到MQ队列为{}",RabbitMQConfig.AD_UPDATE_QUEUE);
  23. rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE,column);
  24. }
  25. }
  26. }
  27. }

5.MQ消费者

  1. package com.changgou.business.listener;
  2. import okhttp3.*;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.io.IOException;
  6. /**
  7. * @author JiaWei
  8. */
  9. @Component
  10. public class AdListener {
  11. @RabbitListener(queues = "ad_update_queue")
  12. public void receviceMessage(String message){
  13. System.out.println("接收到的消息为" + message);
  14. 《《《《业务逻辑》》》》 可以是feign调用
  15. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/445322
推荐阅读
相关标签
  

闽ICP备14008679号