当前位置:   article > 正文

使用Canal搭配RocketMQ进行数据同步

使用Canal搭配RocketMQ进行数据同步

RocketMQ不进行叙述,我这里使用的默认端口号9876

1.Canal下载:链接
在这里插入图片描述

2.修改MySQL的配置文件my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  • 1
  • 2
  • 3
  • 4

使用命令重启mysql

service mysqld restart
  • 1

使用sql查询日志是否开启

SHOW VARIABLES like 'log_bin'
  • 1

在这里插入图片描述

3.解压canal,目录为。
在这里插入图片描述
需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。
一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example
实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加
实例的名称。

修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root

# 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=mysql\\.slave_.*

# 监听topic
canal.mq.topic=ceshi_topic
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

修改conf的canal.properties

# tcp, kafka, rocketMQ, rabbitMQ ,这里选择rocketMQ模式
canal.serverMode = rocketMQ
# 解析器的线程数,默认是注释掉的,不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize = 16
# 这个是配置instance,默认好像就是example,要配置多个用逗号隔开
canal.destinations = example
# rocketmq配置,设置生产组
rocketmq.producer.group = ceshi_producer_group
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

因为是在win环境下测试
于是bin目录下找到startup.bat启动
在这里插入图片描述
报了个致命错误,本机装的jdk17于是很多地方容易出问题

cmd重设下环境(仅此次cmd生效)再启动

set JAVA_HOME=D:\Program Files\java\jdk1.8.0_221_bushuyvyuanwenjian

set Path=D:\Program Files\java\jdk1.8.0_221_bushuyvyuanwenjian\bin

set CLASSPATH=.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar

startup.bat
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

启动成功
在这里插入图片描述
java添加消息实体类

@Data
public class CanalBinlog {
    //数据
    private List<Map<String,Object>> data;
    //数据库名称
    private String database;

    private long es;
    //递增,从1开始
    private int id;
    //是否是DDL语句
    private boolean isDdl;

    //表结构的字段类型
    private String mysqlType;
    //UPDATE语句,旧数据
    private String old;
    //主键名称
    private List<String> pkNames;
    //sql语句
    private String sql;

    private String sqlType;
    //表名
    private String table;

    private long ts;
    //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
    private String type;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

java添加消息消费者

@Component
@RocketMQMessageListener(topic = "ceshi_topic", consumerGroup = "ceshi_consumer_group",selectorExpression="*")
public class RocketMQTool implements RocketMQListener<String> {

    private Gson gson=new Gson();
    @Override
    public void onMessage(String mqjson){
    	CanalBinlog canalBinlog=gson.fromJson(mqjson,CanalBinlog.class);
        System.out.println(mqjson);
        System.out.println(canalBinlog);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

修改数据库见打印

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/445356
推荐阅读
相关标签
  

闽ICP备14008679号