赞
踩
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
文档资料
修改 my.cnf
中配置
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
查看授权情况
select * from mysql.user where User = 'canal'\G
*************************** 1. row ***************************
Host: %
User: canal
Select_priv: Y
Repl_slave_priv: Y
Repl_client_priv: Y
下载解压
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
配置修改
conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 100
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = 123456
启动服务
# 启动
bash bin/startup.sh
# 关闭
bash bin/stop.sh
# 重启
bash bin/restart.sh
# 查看日志
tail -f logs/canal/canal.log
可以直接消费数据;也可以先放到MQ中,再消费数据
直接通过Canal客户端消费数据
依赖
<!-- canal -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
客户端代码
package com.example.demo; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class SimpleCanalClientExample { public static void run() { // 连接信息配置 String hostname = "127.0.0.1"; int port = 11111; String destination = "example"; String username = ""; String password = ""; // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(hostname, port), destination, username, password ); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); // 没有拿到数据 if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { System.out.printf("message[batchId=%s, size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entries) { for (Entry entry : entries) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChange.getEventType(); System.out.println(String.format("binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); // 数据变化 for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue()); } } public static void main(String[] args) { run(); } }
修改MySQL中的数据,就可以监听到
采用docker会简单很多
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
或者 docker-compose.yml
version: "3.7" services: rabbitmq: image: rabbitmq:3-management container_name: rabbitmq hostname: rabbitmq restart: always ports: - "15672:15672" - "5672:5672" volumes: - ./app/rabbitmq/data/:/var/lib/rabbitmq/ - ./app/rabbitmq/log/:/var/log/rabbitmq/ environment: - RABBITMQ_DEFAULT_USER=root - RABBITMQ_DEFAULT_PASS=123456
管理界面:http://localhost:15672/
canal.properties
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
# canal.serverMode = tcp
canal.serverMode = rabbitMQ
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = root
rabbitmq.password = 123456
instance.properties
# mq config
# canal.mq.topic=example
canal.mq.topic=canal-routing-key
重启canal服务端
SpringBoot 项目结构
$ tree -I target . ├── pom.xml └── src ├── main │ ├── java │ │ └── com │ │ └── example │ │ └── demo │ │ ├── Application.java │ │ ├── config │ │ │ ├── CanalProvider.java │ │ │ └── RabbitConfig.java │ │ ├── constant │ │ │ ├── EventTypeConstant.java │ │ │ └── RabbitConstant.java │ │ ├── consumer │ │ │ ├── CanalConsumer.java │ │ │ └── SimpleCanalClientExample.java │ │ └── controller │ │ └── IndexController.java │ └── resources │ ├── application.yml │ ├── static │ └── templates └── test └── java └── com └── example └── demo └── ApplicationTests.java
依赖 pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置 application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 123321
# 消息确认配置项
# 确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
# 确认消息已发送到队列(Queue)
publisher-returns: true
RabbitMQ配置类 RabbitConfig.java
package com.example.demo.config; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(); template.setConnectionFactory(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } /** * template.setMessageConverter(new Jackson2JsonMessageConverter()); * 这段和上面这行代码解决RabbitListener循环报错的问题 */ @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
Canal消息生产者:
package com.example.demo.config; import com.example.demo.constant.RabbitConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Canal消息提供者,canal-server生产的消息通过RabbitMQ消息队列发送 */ @Configuration public class CanalProvider { /** * 队列 */ @Bean public Queue canalQueue() { /** * durable:是否持久化,默认false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;暂存队列:当前连接有效 * exclusive:默认为false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除 */ return new Queue(RabbitConstant.CanalQueue, true); } /** * 交换机,这里使用直连交换机 */ @Bean DirectExchange canalExchange() { return new DirectExchange(RabbitConstant.CanalExchange, true, false); } /** * 绑定交换机和队列,并设置匹配键 */ @Bean Binding bindingCanal() { return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting); } }
Canal消息消费者:
package com.example.demo.consumer; import com.example.demo.constant.EventTypeConstant; import com.example.demo.constant.RabbitConstant; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; /** * Canal消息消费者 */ @Component @RabbitListener(queues = RabbitConstant.CanalQueue) public class CanalConsumer { @RabbitHandler public void process(Map<String, Object> msg) { System.out.println("收到canal消息:" + msg); // 收到canal消息:{data=[{id=21, name=Mini+1, age=30}], database=data, es=1684302821000, id=2, isDdl=false, mysqlType={id=int unsigned, name=varchar(20), age=tinyint unsigned}, old=[{name=Mini}], pkNames=[id], sql=, sqlType={id=4, name=12, age=-6}, table=tb_user, ts=1684302821452, type=UPDATE} boolean isDdl = (boolean) msg.get("isDdl"); // 不处理DDL事件 if (isDdl) { return; } // 数据库 String database = (String) msg.get("database"); // 表 String table = (String) msg.get("table"); // 类型:INSERT/UPDATE/DELETE String type = (String) msg.get("type"); // 每一列的数据值 List<?> data = (List<?>) msg.get("data"); System.out.printf("%s.%s", database, table); // 只处理指定类型 if (EventTypeConstant.INSERT.equalsIgnoreCase(type)) { System.out.println("INSERT"); } else if (EventTypeConstant.UPDATE.equalsIgnoreCase(type)) { System.out.println("UPDATE"); } else if (EventTypeConstant.DELETE.equalsIgnoreCase(type)) { System.out.println("DELETE"); } else { // 其他事件 } } }
数据变化事件类型
package com.example.demo.constant;
/**
* 数据变化事件类型
*/
public class EventTypeConstant {
public static final String INSERT = "INSERT";
public static final String UPDATE = "UPDATE";
public static final String DELETE = "DELETE";
}
配置
package com.example.demo.constant;
/**
* 配置
*/
public class RabbitConstant {
public static final String CanalQueue = "canal-queue";
public static final String CanalExchange = "canal-exchange";
public static final String CanalRouting = "canal-routing-key";
}
参考
Canal实战,一套带走
SpringBoot整合 Canal、RabbitMQ 监听数据变更-微信
SpringBoot整合 Canal、RabbitMQ 监听数据变更-掘金
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。