赞
踩
官方文档:https://github.com/alibaba/canal
canal ,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
MySQL主备复制原理
canal 工作原理
本文讲解MySQL同步Redis,分为两种方式:CanalClient,MQ形式。
1. MySQL配置
配置MySQL的 my.ini/my.cnf 开启允许基于binlog文件主从同步
- log-bin=mysql-bin
- binlog-format=ROW
- server_id=108
配置该文件后,重启mysql服务器即可
手动创建cannl账号或者直接使用root账号
- drop user 'canal'@'%';
- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
- grant all privileges on *.* to 'canal'@'%' identified by 'canal';
- flush privileges;
创建完后,在mysql库user表里检查配置都为yes即代表创建并授权成功。
2. CanalServer端配置
修改\conf\example下的instance.properties 配置文件内容
- canal.instance.master.address=192.168.0.108:3306
- canal.instance.dbUsername=root
- canal.instance.dbPassword=root
启动\bin\startup.bat,查看 \logs\example example.log日志文件出现 start successful....则代表启动成功。
3. CanalClient
核心Jar包:
- <dependencies>
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.0</version>
- </dependency>
-
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>2.9.0</version>
- </dependency>
- </dependencies>
RedisUtil:
- import redis.clients.jedis.Jedis;
-
- public class RedisUtil {
-
- private static Jedis jedis = null;
-
- public static synchronized Jedis getJedis() {
- if (jedis == null) {
- jedis = new Jedis("127.0.0.1", 6379);
- }
- return jedis;
- }
- public static boolean existKey(String key) {
- return getJedis().exists(key);
- }
- public static void delKey(String key) {
- getJedis().del(key);
- }
- public static String stringGet(String key) {
- return getJedis().get(key);
- }
- public static String stringSet(String key, String value) {
- return getJedis().set(key, value);
- }
- public static void hashSet(String key, String field, String value) {
- getJedis().hset(key, field, value);
- }
- }
CanalClient:
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry.*;
- import com.alibaba.otter.canal.protocol.Message;
- import java.net.InetSocketAddress;
- import java.util.List;
-
- public class CanalClient {
-
- public static void main(String args[]) {
- // 连接我们的CanalServer端
- CanalConnector connector = CanalConnectors.newSingleConnector(new
- InetSocketAddress("127.0.0.1",
- 11111), "example", "", "");
- int batchSize = 100;
- try {
- connector.connect();
- connector.subscribe("cacal.user_table"); //同步cacal库下的user_table
- connector.rollback();
- while (true) {
- // 获取指定数量的数据
- Message message = connector.getWithoutAck(batchSize);
- long batchId = message.getId();
- int size = message.getEntries().size();
-
- if (batchId == -1 || size == 0) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- printEntry(message.getEntries());
- }
- // 提交确认
- connector.ack(batchId);
- // connector.rollback(batchId); // 处理失败, 回滚数据
- }
- } finally {
- connector.disconnect();
- }
- }
-
- private static void printEntry(List<Entry> entrys) {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
- RowChange rowChage = null;
- try {
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
-
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.DELETE) {
- redisDelete(rowData.getBeforeColumnsList());
- } else if (eventType == EventType.INSERT) {
- redisInsert(rowData.getAfterColumnsList());
- } else {
- System.out.println("-------> before");
- printColumn(rowData.getBeforeColumnsList());
- System.out.println("-------> after");
- redisUpdate(rowData.getAfterColumnsList());
- }
- }
- }
- }
-
- private static void printColumn(List<Column> columns) {
- for (Column column : columns) {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- }
- }
-
- private static void redisInsert(List<Column> columns) {
- JSONObject json = new JSONObject();
- for (Column column : columns) {
- json.put(column.getName(), column.getValue());
- }
- if (columns.size() > 0) {
- RedisUtil.stringSet("canal:user:" + columns.get(0).getValue(), json.toJSONString());
- }
- }
-
- private static void redisUpdate(List<Column> columns) {
- JSONObject json = new JSONObject();
- for (Column column : columns) {
- json.put(column.getName(), column.getValue());
- }
- if (columns.size() > 0) {
- RedisUtil.stringSet("canal:user:" + columns.get(0).getValue(), json.toJSONString());
- }
- }
-
- private static void redisDelete(List<Column> columns) {
- JSONObject json = new JSONObject();
- for (Column column : columns) {
- json.put(column.getName(), column.getValue());
- }
- if (columns.size() > 0) {
- RedisUtil.delKey("canal:user:" + columns.get(0).getValue());
- }
- }
- }
新建cacal库,user_table,无论新增,更改,删除,都会同步到Redis。
Canal支持两种MQ:Kafka和RocketMQ,本文讲解Kafka。
1. Kafka环境安装
启动zookeeper,并运行ZooInspector,具体安装前面博客有讲解:
- 解压 kafka_2.13-2.4.0 改名为 kafka
-
- 修改 server.properties中的配置
-
- log.dirs=D:\MyTools\Kafka\logs
- Cmd 进入到该目录:D:\MyTools\Kafka
- .\bin\windows\kafka-server-start.bat .\config\server.properties
2. Canal配置更改
- 1.修改 example/instance.properties
- canal.mq.topic=zb-topic
- 2.修改 canal.properties
- # tcp, kafka, RocketMQ
- canal.serverMode = kafka
- canal.mq.servers = 127.0.0.1:9092
3. 编写消费者代码
- @RestController
- public class KafkaController {
-
- @Autowired
- private RedisUtils redisUtils;
-
- // 消费者使用日志打印消息
- @KafkaListener(topics = "zb-topic")
- public void receive(ConsumerRecord<?, ?> consumer) {
- System.out.println("topic名称:" + consumer.topic() + ",key:" +
- consumer.key() + "," +
- "分区位置:" + consumer.partition()
- + ", 下标" + consumer.offset() + "," + consumer.value());
- String json = (String) consumer.value();
- JSONObject jsonObject = JSONObject.parseObject(json);
- String sqlType = jsonObject.getString("type");
- JSONArray data = jsonObject.getJSONArray("data");
- JSONObject userObject = data.getJSONObject(0);
- String id = userObject.getString("id");
- String database = jsonObject.getString("database");
- String table = jsonObject.getString("table");
- String key = database + "_" + table + "_" + id;
- if ("UPDATE".equals(sqlType) || "INSERT".equals(sqlType)) {
- redisUtils.setString(key, userObject.toJSONString());
- return;
- }
- if ("DELETE".equals(sqlType)) {
- redisUtils.deleteKey(key);
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。