赞
踩
RocketMQ不进行叙述,我这里使用的默认端口号9876
1.Canal下载:链接
2.修改MySQL的配置文件my.cnf
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
使用命令重启mysql
service mysqld restart
使用sql查询日志是否开启
SHOW VARIABLES like 'log_bin'
3.解压canal,目录为。
需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。
一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example
实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加
实例的名称。
修改canal.deployer-1.1.5\conf\example\instance.properties配置文件
# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=mysql\\.slave_.*
# 监听topic
canal.mq.topic=ceshi_topic
修改conf的canal.properties
# tcp, kafka, rocketMQ, rabbitMQ ,这里选择rocketMQ模式
canal.serverMode = rocketMQ
# 解析器的线程数,默认是注释掉的,不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize = 16
# 这个是配置instance,默认好像就是example,要配置多个用逗号隔开
canal.destinations = example
# rocketmq配置,设置生产组
rocketmq.producer.group = ceshi_producer_group
因为是在win环境下测试
于是bin目录下找到startup.bat启动
报了个致命错误,本机装的jdk17于是很多地方容易出问题
cmd重设下环境(仅此次cmd生效)再启动
set JAVA_HOME=D:\Program Files\java\jdk1.8.0_221_bushuyvyuanwenjian
set Path=D:\Program Files\java\jdk1.8.0_221_bushuyvyuanwenjian\bin
set CLASSPATH=.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar
startup.bat
启动成功
java添加消息实体类
@Data
public class CanalBinlog {
//数据
private List<Map<String,Object>> data;
//数据库名称
private String database;
private long es;
//递增,从1开始
private int id;
//是否是DDL语句
private boolean isDdl;
//表结构的字段类型
private String mysqlType;
//UPDATE语句,旧数据
private String old;
//主键名称
private List<String> pkNames;
//sql语句
private String sql;
private String sqlType;
//表名
private String table;
private long ts;
//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
private String type;
}
java添加消息消费者
@Component
@RocketMQMessageListener(topic = "ceshi_topic", consumerGroup = "ceshi_consumer_group",selectorExpression="*")
public class RocketMQTool implements RocketMQListener<String> {
private Gson gson=new Gson();
@Override
public void onMessage(String mqjson){
CanalBinlog canalBinlog=gson.fromJson(mqjson,CanalBinlog.class);
System.out.println(mqjson);
System.out.println(canalBinlog);
}
}
修改数据库见打印
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。