当前位置:   article > 正文

SpringCloudAlibaba - 数据同步中间件Canal_spring cloud cancl

spring cloud cancl

简介

官方文档:https://github.com/alibaba/canal

canal ,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

工作原理

MySQL主备复制原理

 

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

本文讲解MySQL同步Redis,分为两种方式:CanalClient,MQ形式。

一. CanalClient方式

1. MySQL配置

配置MySQL的  my.ini/my.cnf  开启允许基于binlog文件主从同步

  1. log-bin=mysql-bin
  2. binlog-format=ROW
  3. server_id=108

配置该文件后,重启mysql服务器即可

手动创建cannl账号或者直接使用root账号

  1. drop user 'canal'@'%';
  2. CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
  3. grant all privileges on *.* to 'canal'@'%' identified by 'canal';
  4. flush privileges;

创建完后,在mysql库user表里检查配置都为yes即代表创建并授权成功。

2. CanalServer端配置

修改\conf\example下的instance.properties 配置文件内容

  1. canal.instance.master.address=192.168.0.108:3306
  2. canal.instance.dbUsername=root
  3. canal.instance.dbPassword=root

启动\bin\startup.bat,查看 \logs\example example.log日志文件出现 start successful....则代表启动成功。

3. CanalClient

  核心Jar包:

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.alibaba.otter</groupId>
  4. <artifactId>canal.client</artifactId>
  5. <version>1.1.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>redis.clients</groupId>
  9. <artifactId>jedis</artifactId>
  10. <version>2.9.0</version>
  11. </dependency>
  12. </dependencies>

  RedisUtil:

  1. import redis.clients.jedis.Jedis;
  2. public class RedisUtil {
  3. private static Jedis jedis = null;
  4. public static synchronized Jedis getJedis() {
  5. if (jedis == null) {
  6. jedis = new Jedis("127.0.0.1", 6379);
  7. }
  8. return jedis;
  9. }
  10. public static boolean existKey(String key) {
  11. return getJedis().exists(key);
  12. }
  13. public static void delKey(String key) {
  14. getJedis().del(key);
  15. }
  16. public static String stringGet(String key) {
  17. return getJedis().get(key);
  18. }
  19. public static String stringSet(String key, String value) {
  20. return getJedis().set(key, value);
  21. }
  22. public static void hashSet(String key, String field, String value) {
  23. getJedis().hset(key, field, value);
  24. }
  25. }

  CanalClient:

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.alibaba.otter.canal.client.CanalConnector;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.protocol.CanalEntry.*;
  5. import com.alibaba.otter.canal.protocol.Message;
  6. import java.net.InetSocketAddress;
  7. import java.util.List;
  8. public class CanalClient {
  9. public static void main(String args[]) {
  10. // 连接我们的CanalServer端
  11. CanalConnector connector = CanalConnectors.newSingleConnector(new
  12. InetSocketAddress("127.0.0.1",
  13. 11111), "example", "", "");
  14. int batchSize = 100;
  15. try {
  16. connector.connect();
  17. connector.subscribe("cacal.user_table"); //同步cacal库下的user_table
  18. connector.rollback();
  19. while (true) {
  20. // 获取指定数量的数据
  21. Message message = connector.getWithoutAck(batchSize);
  22. long batchId = message.getId();
  23. int size = message.getEntries().size();
  24. if (batchId == -1 || size == 0) {
  25. try {
  26. Thread.sleep(1000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. } else {
  31. printEntry(message.getEntries());
  32. }
  33. // 提交确认
  34. connector.ack(batchId);
  35. // connector.rollback(batchId); // 处理失败, 回滚数据
  36. }
  37. } finally {
  38. connector.disconnect();
  39. }
  40. }
  41. private static void printEntry(List<Entry> entrys) {
  42. for (Entry entry : entrys) {
  43. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  44. continue;
  45. }
  46. RowChange rowChage = null;
  47. try {
  48. rowChage = RowChange.parseFrom(entry.getStoreValue());
  49. } catch (Exception e) {
  50. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  51. e);
  52. }
  53. EventType eventType = rowChage.getEventType();
  54. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  55. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  56. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  57. eventType));
  58. for (RowData rowData : rowChage.getRowDatasList()) {
  59. if (eventType == EventType.DELETE) {
  60. redisDelete(rowData.getBeforeColumnsList());
  61. } else if (eventType == EventType.INSERT) {
  62. redisInsert(rowData.getAfterColumnsList());
  63. } else {
  64. System.out.println("-------> before");
  65. printColumn(rowData.getBeforeColumnsList());
  66. System.out.println("-------> after");
  67. redisUpdate(rowData.getAfterColumnsList());
  68. }
  69. }
  70. }
  71. }
  72. private static void printColumn(List<Column> columns) {
  73. for (Column column : columns) {
  74. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  75. }
  76. }
  77. private static void redisInsert(List<Column> columns) {
  78. JSONObject json = new JSONObject();
  79. for (Column column : columns) {
  80. json.put(column.getName(), column.getValue());
  81. }
  82. if (columns.size() > 0) {
  83. RedisUtil.stringSet("canal:user:" + columns.get(0).getValue(), json.toJSONString());
  84. }
  85. }
  86. private static void redisUpdate(List<Column> columns) {
  87. JSONObject json = new JSONObject();
  88. for (Column column : columns) {
  89. json.put(column.getName(), column.getValue());
  90. }
  91. if (columns.size() > 0) {
  92. RedisUtil.stringSet("canal:user:" + columns.get(0).getValue(), json.toJSONString());
  93. }
  94. }
  95. private static void redisDelete(List<Column> columns) {
  96. JSONObject json = new JSONObject();
  97. for (Column column : columns) {
  98. json.put(column.getName(), column.getValue());
  99. }
  100. if (columns.size() > 0) {
  101. RedisUtil.delKey("canal:user:" + columns.get(0).getValue());
  102. }
  103. }
  104. }

新建cacal库,user_table,无论新增,更改,删除,都会同步到Redis。

    

二. MQ方式

Canal支持两种MQ:Kafka和RocketMQ,本文讲解Kafka。

1. Kafka环境安装

启动zookeeper,并运行ZooInspector,具体安装前面博客有讲解:

  1. 解压 kafka_2.13-2.4.0 改名为 kafka
  2. 修改 server.properties中的配置
  3. log.dirs=D:\MyTools\Kafka\logs
  4. Cmd 进入到该目录:D:\MyTools\Kafka
  5. .\bin\windows\kafka-server-start.bat .\config\server.properties

2. Canal配置更改

  1. 1.修改 example/instance.properties
  2. canal.mq.topic=zb-topic
  3. 2.修改 canal.properties
  4. # tcp, kafka, RocketMQ
  5. canal.serverMode = kafka
  6. canal.mq.servers = 127.0.0.1:9092

3. 编写消费者代码

  1. @RestController
  2. public class KafkaController {
  3. @Autowired
  4. private RedisUtils redisUtils;
  5. // 消费者使用日志打印消息
  6. @KafkaListener(topics = "zb-topic")
  7. public void receive(ConsumerRecord<?, ?> consumer) {
  8. System.out.println("topic名称:" + consumer.topic() + ",key:" +
  9. consumer.key() + "," +
  10. "分区位置:" + consumer.partition()
  11. + ", 下标" + consumer.offset() + "," + consumer.value());
  12. String json = (String) consumer.value();
  13. JSONObject jsonObject = JSONObject.parseObject(json);
  14. String sqlType = jsonObject.getString("type");
  15. JSONArray data = jsonObject.getJSONArray("data");
  16. JSONObject userObject = data.getJSONObject(0);
  17. String id = userObject.getString("id");
  18. String database = jsonObject.getString("database");
  19. String table = jsonObject.getString("table");
  20. String key = database + "_" + table + "_" + id;
  21. if ("UPDATE".equals(sqlType) || "INSERT".equals(sqlType)) {
  22. redisUtils.setString(key, userObject.toJSONString());
  23. return;
  24. }
  25. if ("DELETE".equals(sqlType)) {
  26. redisUtils.deleteKey(key);
  27. }
  28. }
  29. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/85648
推荐阅读
相关标签
  

闽ICP备14008679号