当前位置:   article > 正文

canal应用

canal应用
1. canal分享模块链接

分享模块

分享链接
Mysql日志-binlog有道云笔记 (youdao.com)
canal-deployer实战有道云笔记 (youdao.com)
canal-tcp客户端
 
有道云笔记 (youdao.com)
canal-adapter实战有道云笔记 (youdao.com)

canal源码下载地址

Gitee 极速下载/canal
2. 客户端测试链接代码
  1. package com.lx.utils.canalClient;
  2. import com.alibaba.otter.canal.client.CanalConnector;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.common.utils.AddressUtils;
  5. import com.alibaba.otter.canal.protocol.CanalEntry.*;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import java.net.InetSocketAddress;
  8. import java.util.List;
  9. public class SimpleCanalClientExample {
  10. public static void main(String args[]) {
  11. // 创建链接
  12. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
  13. 11121), "example", "", "");
  14. // CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", "example", "", "");
  15. int batchSize = 1000;
  16. int emptyCount = 0;
  17. try {
  18. connector.connect();
  19. connector.subscribe(".*\\..*");
  20. connector.rollback();
  21. int totalEmptyCount = 120;
  22. while (emptyCount < totalEmptyCount) {
  23. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  24. long batchId = message.getId();
  25. int size = message.getEntries().size();
  26. if (batchId == -1 || size == 0) {
  27. emptyCount++;
  28. System.out.println("empty count : " + emptyCount);
  29. try {
  30. Thread.sleep(10000);
  31. } catch (InterruptedException e) {
  32. }
  33. } else {
  34. emptyCount = 0;
  35. // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
  36. printEntry(message.getEntries());
  37. }
  38. //connector.ack(batchId); // 提交确认
  39. // connector.rollback(batchId); // 处理失败, 回滚数据
  40. }
  41. System.out.println("empty too many times, exit");
  42. } finally {
  43. connector.disconnect();
  44. }
  45. }
  46. private static void printEntry(List<Entry> entrys) {
  47. for (Entry entry : entrys) {
  48. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  49. continue;
  50. }
  51. RowChange rowChage = null;
  52. try {
  53. rowChage = RowChange.parseFrom(entry.getStoreValue());
  54. } catch (Exception e) {
  55. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  56. e);
  57. }
  58. EventType eventType = rowChage.getEventType();
  59. System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
  60. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  61. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  62. eventType));
  63. for (RowData rowData : rowChage.getRowDatasList()) {
  64. if (eventType == EventType.DELETE) {
  65. printColumn(rowData.getBeforeColumnsList());
  66. } else if (eventType == EventType.INSERT) {
  67. printColumn(rowData.getAfterColumnsList());
  68. } else {
  69. System.out.println("-------&gt; before");
  70. printColumn(rowData.getBeforeColumnsList());
  71. System.out.println("-------&gt; after");
  72. printColumn(rowData.getAfterColumnsList());
  73. }
  74. }
  75. }
  76. }
  77. private static void printColumn(List<Column> columns) {
  78. for (Column column : columns) {
  79. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  80. }
  81. }
  82. }
3. pom.xml依赖
  1. <!--canal依赖-->
  2. <dependency>
  3. <groupId>com.alibaba.otter</groupId>
  4. <artifactId>canal.client</artifactId>
  5. <version>1.1.7</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>com.alibaba.otter</groupId>
  9. <artifactId>canal.protocol</artifactId>
  10. <version>1.1.7</version>
  11. <optional>true</optional>
  12. </dependency>
  13. <dependency>
  14. <groupId>com.alibaba.otter</groupId>
  15. <artifactId>canal.common</artifactId>
  16. <version>1.1.7</version>
  17. </dependency>

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/716237
推荐阅读
相关标签
  

闽ICP备14008679号