赞
踩
分享模块 | 分享链接 | |
Mysql日志-binlog | 有道云笔记 (youdao.com) | |
canal-deployer实战 | 有道云笔记 (youdao.com) | |
canal-tcp客户端 | 有道云笔记 (youdao.com) | |
canal-adapter实战 | 有道云笔记 (youdao.com) | |
canal源码下载地址 | Gitee 极速下载/canal |
- package com.lx.utils.canalClient;
-
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.common.utils.AddressUtils;
- import com.alibaba.otter.canal.protocol.CanalEntry.*;
- import com.alibaba.otter.canal.protocol.Message;
-
- import java.net.InetSocketAddress;
- import java.util.List;
-
-
- public class SimpleCanalClientExample {
-
-
- public static void main(String args[]) {
- // 创建链接
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
- 11121), "example", "", "");
-
-
-
- // CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", "example", "", "");
- int batchSize = 1000;
- int emptyCount = 0;
- try {
- connector.connect();
- connector.subscribe(".*\\..*");
- connector.rollback();
- int totalEmptyCount = 120;
- while (emptyCount < totalEmptyCount) {
- Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- emptyCount++;
- System.out.println("empty count : " + emptyCount);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- }
- } else {
- emptyCount = 0;
- // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
- printEntry(message.getEntries());
- }
-
- //connector.ack(batchId); // 提交确认
- // connector.rollback(batchId); // 处理失败, 回滚数据
- }
-
- System.out.println("empty too many times, exit");
- } finally {
- connector.disconnect();
- }
- }
-
- private static void printEntry(List<Entry> entrys) {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
-
- RowChange rowChage = null;
- try {
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
-
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
-
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.DELETE) {
- printColumn(rowData.getBeforeColumnsList());
- } else if (eventType == EventType.INSERT) {
- printColumn(rowData.getAfterColumnsList());
- } else {
- System.out.println("-------> before");
- printColumn(rowData.getBeforeColumnsList());
- System.out.println("-------> after");
- printColumn(rowData.getAfterColumnsList());
- }
- }
- }
- }
-
- private static void printColumn(List<Column> columns) {
- for (Column column : columns) {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- }
- }
-
-
- }
- <!--canal依赖-->
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.7</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.protocol</artifactId>
- <version>1.1.7</version>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.common</artifactId>
- <version>1.1.7</version>
- </dependency>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。