当前位置:   article > 正文

Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理_canel 做实时数据推送到rabbitmq

canel 做实时数据推送到rabbitmq

前言

关于Canal的介绍及原理不在此赘述,可自行查阅。笔者在使用Canal同步Mysql实时操作记录至RabbitMQ的过程中,也翻阅了一些大牛们的文章,可能是我使用的Canal版本与文中版本不一致,出现了一些问题,在此总结记录一下可行的方案。
注:本文使用的Canal为 v1.1.7

一、Mysql数据库开启bin_log

  • 先查看目标数据库是否开启bin_log
SHOW VARIABLES LIKE 'log_bin'
  • 1

如结果中,log_bin的值为OFF则未开启,为ON则已开启。
在这里插入图片描述

  • 如未开启,可编辑Mysql配置文件:/etc/my.cnf
[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可
  • 1
  • 2
  • 3
  • 4

重启MySQL ,再次通过上一步查看配置是否生效。

二、数据库创建新用户

  • 创建专用于数据同步的新用户
-- 创建一个新用户,名称可自行定义
create user canal@'%' IDENTIFIED by 'canal';
-- 为新用户授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
-- 刷新缓存中的用户数据
FLUSH PRIVILEGES;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

三、配置RabhitMQ

以下使用的名称均可自行定义,保证唯一即可

1. 添加交换机

在这里插入图片描述

2. 添加队列

在这里插入图片描述

3. 绑定交换机与队列,设置 Routing key

在这里插入图片描述

四、下载、配置、运行Canal(windows环境)

1. 下载服务端

  • 可到以下地址下载所需版本的包:github-alibaba-canal
    本文使用较新的 v1.1.7
    在这里插入图片描述

  • 选择下载 canal.deployer-1.1.7.tar.gz
    在这里插入图片描述

2. 配置

  • 解压下载包,获得如下文件。
    在这里插入图片描述
  • 编辑:conf\canal.properties(仅列出需要修改的配置项)
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
##################################################
######### 		    RabbitMQ	     #############
##################################################
# host 无需添加端口号
rabbitmq.host = 192.168.0.2
# 填写 / 即可
rabbitmq.virtual.host = /
# RabbitMQ的用户名、密码
rabbitmq.username = admin
rabbitmq.password = 123456
# 上文配置的交换机(exchange)名称:Name
rabbitmq.exchange = canal.exchange
# 交换机类型:Type
rabbitmq.deliveryMode = direct

# 以下两个字段为自行添加,否则会报空指针异常
# 队列(queue)名称:Name
rabbitmq.queue = canal.queue
# 绑定队列-交换机时的路由秘钥:Routing key
rabbitmq.routingKey = canal.routing.key
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 编辑:conf\example\instance.properties(仅列出需要修改的配置项)
# 目标数据库地址
canal.instance.master.address=192.168.0.1:3306
# 目标数据库用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123

# 表过滤正则表达式(按需修改)
# 全库全表 : .*\\..*
# 指定库所有表:  库名\..*   例:test\..*
# 单表:  库名.表名  例:test.user
# 多规则组合使用:  库名1\..*,库名2.表名1,库名3.表名2 (逗号分隔)  例 test\..*,test2.user1,test3.user2 (逗号分隔)
canal.instance.filter.regex=.*\\..*
# canal.instance.filter.regex=project.sys_user,project.sys_role
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3. 运行

windows环境下直接运行bin\startup.bat,linux环境下执行bin\startup.sh
执行启动脚本后,查看日志信息logs\canal\canal.log,出现如下信息,表示启动成功。

[main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  • 1

五、测试

对监听的数据库表做修改操作,至RabbitMQ控制台的队列中查看是否插入消息。
如下,即成功插入实时操作数据。
在这里插入图片描述

六、项目中监听处理

  • 创建一个maven项目

在这里插入图片描述

  • pom.xml中引入spring-boot-starter-amqp依赖,此包集成了对RabbitMQ的支持。
	<!-- RabbitMQ 集成支持 -->
	<dependency>
	    <groupId>org.springframework.boot</groupId>
	    <artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
	
	<!-- fastjson 解析数据 -->
	<dependency>
	    <groupId>com.alibaba</groupId>
	    <artifactId>fastjson</artifactId>
	    <version>2.0.9.graal</version>
	</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 修改配置文件application.yml(此处已按个人偏好,文件类型改为yaml),配置RabbitMQ。
spring:
  rabbitmq:
    host: 192.168.0.2
    port: 5672
    username: admin
    password: 123456
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • binLog数据实体类BinLogEntity
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Data
public class BinLogEntity {
    /**
     * 数据库
     */
    private String database;

    /**
     * 表
     */
    private String table;

    /**
     * 操作类型
     */
    private String type;

    /**
     * 操作数据
     */
    private JSONArray data;

    /**
     * 变更前数据
     */
    private JSONArray old;

    /**
     * 主键名称
     */
    private JSONArray pkNames;

    /**
     * 执行sql语句
     */
    private String sql;
    
    private Long es;
    private String gtid;
    private Long id;
    private Boolean isDdl;
    private JSONObject mysqlType;
    private JSONObject sqlType;
    private Long ts;

    public <T> List<T> getData(Class<T> clazz) {
        if (this.data == null || this.data.size() == 0) {
            return null;
        }
        return this.data.toJavaList(clazz);
    }

    public <T> List<T> getOld(Class<T> clazz) {
        if (this.old == null || this.old.size() == 0) {
            return null;
        }
        return this.old.toJavaList(clazz);
    }

    public List<String> getPkNames() {
        if (this.pkNames == null || this.pkNames.size() == 0) {
            return null;
        }
        List<String> pkNames = new ArrayList<>();
        for (Object pkName : this.pkNames){
            pkNames.add(pkName.toString());
        }
        return pkNames;
    }

    public Map<String, String> getMysqlType() {
        if(this.mysqlType == null){
            return null;
        }
        Map<String, String> mysqlTypeMap = new HashMap<>();
        this.mysqlType.forEach((k, v) -> {
            mysqlTypeMap.put(k, v.toString());
        });
        return mysqlTypeMap;
    }

    public Map<String, Integer> getSqlType() {
        if(this.sqlType == null){
            return null;
        }
        Map<String, Integer> sqlTypeMap = new HashMap<>();
        this.sqlType.forEach((k, v) -> {
            sqlTypeMap.put(k, Integer.valueOf(v.toString()));
        });
        return sqlTypeMap;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 操作数据实体类
@Data
public class User implements Serializable {
	private static final long serialVersionUID = 1L;

	/**
	 * ID
	 */
	private Long id;

	/**
	 * 姓名
	 */
	private String name;

	/**
	 * 年龄
	 */
	private Integer age;

	/**
	 * 电话
	 */
	private String phone;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 监听类CanalListener
import com.alibaba.fastjson.JSON;
import com.example.canalclient.entity.BinLogEntity;
import com.example.canalclient.entity.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * 监听数据库数据变化时RabbitMQ发送的信息
 */
@Component
public class CanalListener {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "canal.queue", durable = "true"),
                    exchange = @Exchange(value = "canal.exchange"),
                    key = "canal.routing.key"
            )
    })
	public void handleDataChange(@Payload Message message) {
        // 获取消息内容
        String content = new String(message.getBody(), StandardCharsets.UTF_8);
        // 反序列化
        BinLogEntity binLog = JSON.parseObject(content, BinLogEntity.class);
        // 获取操作数据
        User user = binLog.getData(User.class).get(0);
        User oldUser = binLog.getOld(User.class).get(0);

        System.out.println("数据库:" + binLog.getDatabase());
        System.out.println("表:" + binLog.getTable());
        System.out.println("操作类型:" + binLog.getType());
        System.out.println("主键:" + JSON.toJSONString(binLog.getPkNames()));
        System.out.println("数据:" + JSON.toJSONString(User));
        System.out.println("原数据:" + JSON.toJSONString(User));
        System.out.println("MysqlType:" + JSON.toJSONString(binLog.getMysqlType()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 打印结果(修改操作)
数据库:project
表:sys_user
操作类型:UPDATE
主键:["id"]
数据:{
	"id": 1,
	"name": "张三",
	"age": 21,
	"phone": 13333333333
}
原数据:{
	"age": 20,
	"phone": 12222222222
}
MysqlType:{
	"id": "bigint unsigned",
	"name": "varchar(50)",
	"age": "int(3) unsigned",
	"phone": "varchar(50)"
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

至此,已实现对目标数据库实时操作数据进行监听,可根据不同的操作类型,采取相应的业务处理。

七、参考文章

Canal+Msql+RabbitMq数据库同步配置,看这一篇就够了

使用canal同步mysql数据库信息到RabbitMQ

Canal配置connector.subscribe和canal.instance.filter.regex遇到的坑

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/593422
推荐阅读
相关标签
  

闽ICP备14008679号