赞
踩
包com.mindata;
导入com.alibaba.otter.canal.client.CanalConnector;
导入com.alibaba.otter.canal.client.CanalConnectors;
导入com.alibaba.otter.canal.common.utils.AddressUtils;
导入com.alibaba.otter.canal.protocol.CanalEntry;
导入com.alibaba.otter.canal.protocol.Message;
导入java.net.InetSocketAddress;
导入java.util.List;
公共类MainApp {
公共静态void主对象(String...args)引发异常{
//创建链接
CanalConnector连接器= CanalConnectors.newSingleConnector(新的InetSocketAddress(AddressUtils.getHostIp(),
11111),\ quot;例如\ quot;,\ quot; \ quot ;、 \ quot; \ quot;);
int batchSize = 1000;
int emptyCount = 0;
尝试{
connector.connect();
connector.subscribe(\ quot..* \\\\\\\\..* \ quot;);
connector.rollback();
int totalEmptyCount = 120;
while(emptyCount \\ ult; totalEmptyCount){
消息消息= connector.getWithoutAck(batchSize);//获取指定数量的数据
long batchId = message.getId();
int大小= message.getEntries()。大小();
if(batchId ==-1 || size == 0){
emptyCount ++;
System.out.println(\ quot;空计数:\ quot + emptyCount);
尝试{
线程睡眠(1000);
} catch(InterruptedException e){
}
}其他{
emptyCount = 0;
//System.out.printf(\ quot;消息[batchId =%s,size =%s] \\\\ n \ quot ;, batchId,size);
printEntry(message.getEntries());
}
connector.ack(batchId);//提交确认
//connector.rollback(batchId);//处理失败,回滚数据
}
System.out.println("空太多",退出" u");
}最后{
connector.disconnect();
}
}
私有静态无效printEntry(列出条目){
对于(CanalEntry.Entry条目:条目){
如果(entry.getEntryType()== CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType()== CanalEntry
.EntryType
.TRANSACTIONEND){
继续;
}
CanalEntry.RowChange rowChage = null;
尝试{
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch(Exception e){
抛出新的RuntimeException(\ quot; eromanga-event的错误##解析器有错误,数据:\ quot; + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format(\ quot; ================ uu2626 gt; binlog [%s:%s],名称[%s,%s ],eventType:%s \\,
entry.getHeader()。 getLogfileName(),entry.getHeader()。 getLogfileOffset(),
entry.getHeader()。 getSchemaName(),entry.getHeader()。 getTableName(),
事件类型));
对于(CanalEntry.RowData rowData:rowChage.getRowDatasList()){
如果(eventType == CanalEntry.EventType.DELETE){
printColumn(rowData.getBeforeColumnsList());
}否则,如果(eventType == CanalEntry.EventType.INSERT){
printColumn(rowData.getAfterColumnsList());
}其他{
System.out.println(\\-之前的-------\\)
printColumn(rowData.getBeforeColumnsList());
System.out.println(\\-之后的-------\\)
printColumn(rowData.getAfterColumnsList());
}
}
}
}
私有静态void printColumn(列出列){
对于(CanalEntry.Column列:列){
System.out.println(column.getName()+ \ quot ;: \ quot; + column.getValue()+ \ quot; update = \ quot; + column.getUpdated());
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。