当前位置:   article > 正文

【Canal】数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧!_alibaba canal

alibaba canal

@Override

public String toString() {

return “Book{” +

“id=” + id +

“, name='” + name + ‘’’ +

“, author='” + author + ‘’’ +

“, publishtime=” + publishtime +

“, price=” + price +

“, publishgroup='” + publishgroup + ‘’’ +

‘}’;

}

}

其中,我们在Book实体类中,使用Solr的注解@Field定义了实体类字段与Solr域之间的关系。

各种工具类的实现

接下来,我们就在io.mykit.canal.demo.utils包下创建各种工具类。

  • BinlogValue

用于存储binlog分析的每行每列的value值,代码如下所示。

package io.mykit.canal.demo.utils;

import java.io.Serializable;

/**

  • ClassName: BinlogValue

  • binlog分析的每行每列的value值;

  • 新增数据:beforeValue 和 value 均为现有值;

  • 修改数据:beforeValue是修改前的值;value为修改后的值;

  • 删除数据:beforeValue和value均是删除前的值; 这个比较特殊主要是为了删除数据时方便获取删除前的值

*/

public class BinlogValue implements Serializable {

private static final long serialVersionUID = -6350345408773943086L;

private String value;

private String beforeValue;

/**

  • binlog分析的每行每列的value值;

  • 新增数据: value:为现有值;

  • 修改数据:value为修改后的值;

  • 删除数据:value是删除前的值; 这个比较特殊主要是为了删除数据时方便获取删除前的值

*/

public String getValue() {

return value;

}

public void setValue(String value) {

this.value = value;

}

/**

  • binlog分析的每行每列的beforeValue值;

  • 新增数据:beforeValue为现有值;

  • 修改数据:beforeValue是修改前的值;

  • 删除数据:beforeValue为删除前的值;

*/

public String getBeforeValue() {

return beforeValue;

}

public void setBeforeValue(String beforeValue) {

this.beforeValue = beforeValue;

}

}

  • CanalDataParser

用于解析数据,代码如下所示。

package io.mykit.canal.demo.utils;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import org.apache.commons.lang.SystemUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.protocol.Message;

import com.alibaba.otter.canal.protocol.CanalEntry.Column;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;

import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;

import com.google.protobuf.InvalidProtocolBufferException;

/**

  • 解析数据

*/

public class CanalDataParser {

protected static final String DATE_FORMAT = “yyyy-MM-dd HH:mm:ss”;

protected static final String yyyyMMddHHmmss = “yyyyMMddHHmmss”;

protected static final String yyyyMMdd = “yyyyMMdd”;

protected static final String SEP = SystemUtils.LINE_SEPARATOR;

protected static String context_format = null;

protected static String row_format = null;

protected static String transaction_format = null;

protected static String row_log = null;

private static Logger logger = LoggerFactory.getLogger(CanalDataParser.class);

static {

context_format = SEP + “****************************************************” + SEP;

context_format += “* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}” + SEP;

context_format += "* Start : [{}] " + SEP;

context_format += "* End : [{}] " + SEP;

context_format += “****************************************************” + SEP;

row_format = SEP

  • “----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms”

  • SEP;

transaction_format = SEP + “================> binlog[{}:{}] , executeTime : {} , delay : {}ms” + SEP;

row_log = “schema[{}], table[{}]”;

}

public static List convertToInnerBinlogEntry(Message message) {

List innerBinlogEntryList = new ArrayList();

if(message == null) {

logger.info(“接收到空的 message; 忽略”);

return innerBinlogEntryList;

}

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

logger.info(“接收到空的message[size=” + size + “]; 忽略”);

return innerBinlogEntryList;

}

printLog(message, batchId, size);

List entrys = message.getEntries();

//输出日志

for (Entry entry : entrys) {

long executeTime = entry.getHeader().getExecuteTime();

long delayTime = new Date().getTime() - executeTime;

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {

TransactionBegin begin = null;

try {

begin = TransactionBegin.parseFrom(entry.getStoreValue());

} catch (InvalidProtocolBufferException e) {

throw new RuntimeException(“parse event has an error , data:” + entry.toString(), e);

}

// 打印事务头信息,执行的线程id,事务耗时

logger.info(“BEGIN ----> Thread id: {}”, begin.getThreadId());

logger.info(transaction_format, new Object[] {entry.getHeader().getLogfileName(),

String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {

TransactionEnd end = null;

try {

end = TransactionEnd.parseFrom(entry.getStoreValue());

} catch (InvalidProtocolBufferException e) {

throw new RuntimeException(“parse event has an error , data:” + entry.toString(), e);

}

// 打印事务提交信息,事务id

logger.info(“END ----> transaction id: {}”, end.getTransactionId());

logger.info(transaction_format,

new Object[] {entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()),

String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

}

continue;

}

//解析结果

if (entry.getEntryType() == EntryType.ROWDATA) {

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException(“parse event has an error , data:” + entry.toString(), e);

}

EventType eventType = rowChage.getEventType();

logger.info(row_format, new Object[] { entry.getHeader().getLogfileName(),

String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),

entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

//组装数据结果

if (eventType == EventType.INSERT || eventType == EventType.DELETE || eventType == EventType.UPDATE) {

String schemaName = entry.getHeader().getSchemaName();

String tableName = entry.getHeader().getTableName();

List<Map<String, BinlogValue>> rows = parseEntry(entry);

InnerBinlogEntry innerBinlogEntry = new InnerBinlogEntry();

innerBinlogEntry.setEntry(entry);

innerBinlogEntry.setEventType(eventType);

innerBinlogEntry.setSchemaName(schemaName);

innerBinlogEntry.setTableName(tableName.toLowerCase());

innerBinlogEntry.setRows(rows);

innerBinlogEntryList.add(innerBinlogEntry);

} else {

logger.info(" 存在 INSERT INSERT UPDATE 操作之外的SQL [" + eventType.toString() + “]”);

}

continue;

}

}

return innerBinlogEntryList;

}

private static List<Map<String, BinlogValue>> parseEntry(Entry entry) {

List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();

try {

String schemaName = entry.getHeader().getSchemaName();

String tableName = entry.getHeader().getTableName();

RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());

EventType eventType = rowChage.getEventType();

// 处理每个Entry中的每行数据

for (RowData rowData : rowChage.getRowDatasList()) {

StringBuilder rowlog = new StringBuilder(“rowlog schema[” + schemaName + “], table[” + tableName + “], event[” + eventType.toString() + “]”);

Map<String, BinlogValue> row = new HashMap<String, BinlogValue>();

List beforeColumns = rowData.getBeforeColumnsList();

List afterColumns = rowData.getAfterColumnsList();

beforeColumns = rowData.getBeforeColumnsList();

if (eventType == EventType.DELETE) {//delete

for(Column column : beforeColumns) {

BinlogValue binlogValue = new BinlogValue();

binlogValue.setValue(column.getValue());

binlogValue.setBeforeValue(column.getValue());

row.put(column.getName(), binlogValue);

}

} else if(eventType == EventType.UPDATE) {//update

for(Column column : beforeColumns) {

BinlogValue binlogValue = new BinlogValue();

binlogValue.setBeforeValue(column.getValue());

row.put(column.getName(), binlogValue);

}

for(Column column : afterColumns) {

BinlogValue binlogValue = row.get(column.getName());

if(binlogValue == null) {

binlogValue = new BinlogValue();

}

binlogValue.setValue(column.getValue());

row.put(column.getName(), binlogValue);

}

} else { // insert

for(Column column : afterColumns) {

BinlogValue binlogValue = new BinlogValue();

binlogValue.setValue(column.getValue());

binlogValue.setBeforeValue(column.getValue());

row.put(column.getName(), binlogValue);

}

}

rows.add(row);

String rowjson = JacksonUtil.obj2str(row);

logger.info(“########################### Data Parse Result ###########################”);

logger.info(rowlog + " , " + rowjson);

logger.info(“########################### Data Parse Result ###########################”);

logger.info(“”);

}

} catch (InvalidProtocolBufferException e) {

throw new RuntimeException(“parseEntry has an error , data:” + entry.toString(), e);

}

return rows;

}

private static void printLog(Message message, long batchId, int size) {

long memsize = 0;

for (Entry entry : message.getEntries()) {

memsize += entry.getHeader().getEventLength();

}

String startPosition = null;

String endPosition = null;

if (!CollectionUtils.isEmpty(message.getEntries())) {

startPosition = buildPositionForDump(message.getEntries().get(0));

endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));

}

SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);

logger.info(context_format, new Object[] {batchId, size, memsize, format.format(new Date()), startPosition, endPosition });

}

private static String buildPositionForDump(Entry entry) {

long time = entry.getHeader().getExecuteTime();

Date date = new Date(time);

SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);

return entry.getHeader().getLogfileName() + “:” + entry.getHeader().getLogfileOffset() + “:” + entry.getHeader().getExecuteTime() + “(” + format.format(date) + “)”;

}

}

  • DateUtils

时间工具类,代码如下所示。

package io.mykit.canal.demo.utils;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

public class DateUtils {

private static final String FORMAT_PATTERN = “yyyy-MM-dd HH:mm:ss”;

private static SimpleDateFormat sdf = new SimpleDateFormat(FORMAT_PATTERN);

public static Date parseDate(String datetime) throws ParseException{

if(datetime != null && !“”.equals(datetime)){

return sdf.parse(datetime);

}

return null;

}

public static String formatDate(Date datetime) throws ParseException{

if(datetime != null ){

return sdf.format(datetime);

}

return null;

}

public static Long formatStringDateToLong(String datetime) throws ParseException{

if(datetime != null && !“”.equals(datetime)){

Date d = sdf.parse(datetime);

return d.getTime();

}

return null;

}

public static Long formatDateToLong(Date datetime) throws ParseException{

if(datetime != null){

return datetime.getTime();

}

return null;

}

}

  • InnerBinlogEntry

Binlog实体类,代码如下所示。

package io.mykit.canal.demo.utils;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

public class InnerBinlogEntry {

/**

  • canal原生的Entry

*/

private Entry entry;

/**

  • 该Entry归属于的表名

*/

private String tableName;

/**

  • 该Entry归属数据库名

*/

private String schemaName;

/**

  • 该Entry本次的操作类型,对应canal原生的枚举;EventType.INSERT; EventType.UPDATE; EventType.DELETE;

*/

private EventType eventType;

private List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();

public Entry getEntry() {

return entry;

}

public void setEntry(Entry entry) {

this.entry = entry;

}

public String getTableName() {

return tableName;

}

public void setTableName(String tableName) {

this.tableName = tableName;

}

public EventType getEventType() {

return eventType;

}

public void setEventType(EventType eventType) {

this.eventType = eventType;

}

public String getSchemaName() {

return schemaName;

}

public void setSchemaName(String schemaName) {

this.schemaName = schemaName;

}

public List<Map<String, BinlogValue>> getRows() {

return rows;

}

public void setRows(List<Map<String, BinlogValue>> rows) {

this.rows = rows;

}

}

  • JacksonUtil

Json工具类,代码如下所示。

package io.mykit.canal.demo.utils;

import java.io.IOException;

import org.codehaus.jackson.JsonGenerationException;

import org.codehaus.jackson.JsonParseException;

import org.codehaus.jackson.map.JsonMappingException;

import org.codehaus.jackson.map.ObjectMapper;

public class JacksonUtil {

private static ObjectMapper mapper = new ObjectMapper();

public static String obj2str(Object obj) {

String json = null;

try {

json = mapper.writeValueAsString(obj);

} catch (JsonGenerationException e) {

e.printStackTrace();

} catch (JsonMappingException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

return json;

}

public static T str2obj(String content, Class valueType) {

try {

return mapper.readValue(content, valueType);

} catch (JsonParseException e) {

e.printStackTrace();

} catch (JsonMappingException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

return null;

}

}

同步程序的实现

准备好实体类和工具类后,我们就可以编写同步程序来实现MySQL数据库中的数据实时同步到Solr索引库了,我们在io.mykit.canal.demo.main包中常见MykitCanalDemoSync类,代码如下所示。

package io.mykit.canal.demo.main;

import io.mykit.canal.demo.bean.Book;

import io.mykit.canal.demo.utils.BinlogValue;

import io.mykit.canal.demo.utils.CanalDataParser;

import io.mykit.canal.demo.utils.DateUtils;

import io.mykit.canal.demo.utils.InnerBinlogEntry;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import org.apache.solr.client.solrj.SolrServer;

import org.apache.solr.client.solrj.impl.HttpSolrServer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

import java.text.ParseException;

import java.util.List;

import java.util.Map;

public class SyncDataBootStart {

private static Logger logger = LoggerFactory.getLogger(SyncDataBootStart.class);

public static void main(String[] args) throws Exception {

String hostname = “192.168.175.100”;

Integer port = 11111;

String destination = “example”;

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
img
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加V获取:vip1024b (备注Java)
img

最后

我想问下大家当初选择做程序员的初衷是什么?有思考过这个问题吗?高薪?热爱?

既然入了这行就应该知道,这个行业是靠本事吃饭的,你想要拿高薪没有问题,请好好磨练自己的技术,不要抱怨。有的人通过培训可以让自己成长,有些人可以通过自律强大的自学能力成长,如果你两者都不占,还怎么拿高薪?

架构师是很多程序员的职业目标,一个好的架构师是不愁所谓的35岁高龄门槛的,到了那个时候,照样大把的企业挖他。为什么很多人想进阿里巴巴,无非不是福利待遇好以及优质的人脉资源,这对个人职业发展是有非常大帮助的。

如果你也想成为一名好的架构师,那或许这份Java核心架构笔记你需要阅读阅读,希望能够对你的职业发展有所帮助。

中高级开发必知必会:

一个人可以走的很快,但一群人才能走的更远。如果你从事以下工作或对以下感兴趣,欢迎戳这里加入程序员的圈子,让我们一起学习成长!

AI人工智能、Android移动开发、AIGC大模型、C C#、Go语言、Java、Linux运维、云计算、MySQL、PMP、网络安全、Python爬虫、UE5、UI设计、Unity3D、Web前端开发、产品经理、车载开发、大数据、鸿蒙、计算机网络、嵌入式物联网、软件测试、数据结构与算法、音视频开发、Flutter、IOS开发、PHP开发、.NET、安卓逆向、云计算

.(img-nlithakA-1712462886219)]
[外链图片转存中…(img-uzS3ge16-1712462886220)]
[外链图片转存中…(img-yTNanVfJ-1712462886220)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加V获取:vip1024b (备注Java)
[外链图片转存中…(img-V15VSIuY-1712462886221)]

最后

我想问下大家当初选择做程序员的初衷是什么?有思考过这个问题吗?高薪?热爱?

既然入了这行就应该知道,这个行业是靠本事吃饭的,你想要拿高薪没有问题,请好好磨练自己的技术,不要抱怨。有的人通过培训可以让自己成长,有些人可以通过自律强大的自学能力成长,如果你两者都不占,还怎么拿高薪?

架构师是很多程序员的职业目标,一个好的架构师是不愁所谓的35岁高龄门槛的,到了那个时候,照样大把的企业挖他。为什么很多人想进阿里巴巴,无非不是福利待遇好以及优质的人脉资源,这对个人职业发展是有非常大帮助的。

如果你也想成为一名好的架构师,那或许这份Java核心架构笔记你需要阅读阅读,希望能够对你的职业发展有所帮助。

中高级开发必知必会:

[外链图片转存中…(img-3dcKHjo6-1712462886221)]

一个人可以走的很快,但一群人才能走的更远。如果你从事以下工作或对以下感兴趣,欢迎戳这里加入程序员的圈子,让我们一起学习成长!

AI人工智能、Android移动开发、AIGC大模型、C C#、Go语言、Java、Linux运维、云计算、MySQL、PMP、网络安全、Python爬虫、UE5、UI设计、Unity3D、Web前端开发、产品经理、车载开发、大数据、鸿蒙、计算机网络、嵌入式物联网、软件测试、数据结构与算法、音视频开发、Flutter、IOS开发、PHP开发、.NET、安卓逆向、云计算

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/935976
推荐阅读
相关标签
  

闽ICP备14008679号