当前位置:   article > 正文

rabbitmq使用整合mysql_Canal+RabbitMQ数据异构填坑指南

canal 和 rabbitmq的转换器关系

此文章主要补充Canal整合RabbitMQ时填过的坑。

1、完整数据处理流程

d7c1a7160c60

用户中心数据处理流程

这里核心处理点,在于Canal的部署和RabbitMQ的整合。

Canal主要模拟MySQL的Slave,实现主从复制,拉取Mysql的binlog进行解析转换成相应的数据,获取到数据后,将数据推送到RabbitMQ(默认支持的是RocketMQ和Kafka,新版加入对RabbitMQ的支持)。

Canal整合RabbitMQ主要采用的是 路由模式。

2、Canal 配置及部署

2.1、Canal Admin 安装及配置

新版本Canal已经支持在线管理,我们可以先安装一个canal-admin。参考:Canal Admin QuickStart

canal-admin的核心模型主要有:

instance,对应canal-server里的instance,一个最小的订阅mysql的队列

server,对应canal-server,一个server里可以包含多个instance

集群,对应一组canal-server,组合在一起面向高可用HA的运维

简单解释:

instance因为是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上,

有了任务和资源的绑定关系后,对应的资源服务就会接收到这个任务配置,在对应的资源上动态加载instance,并提供服务。动态加载的过程,有点类似于之前的autoScan机制,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件

将server抽象成资源之后,原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可)

集群模式配置

先配置一个zk地址和集群名称。

d7c1a7160c60

集群模式

配置canal.properties

2.1. 选择主配置

d7c1a7160c60

集群模式主配置

2.2. 设置canal.properties,可以先载入模板,然后进行修改。

d7c1a7160c60

canal.properties

canal.properties核心配置:

...

# register ip to zookeeper

canal.register.ip = 10.8.158.4

canal.port = 11111

canal.metrics.pull.port = 11112

# canal instance user/passwd

canal.user = canal

canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config

canal.admin.manager = 10.8.158.4:8089

canal.admin.port = 11110

canal.admin.user = admin

canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers = 10.8.158.4:2181

# flush data to zk

canal.zookeeper.flush.period = 1000

canal.withoutNetty = false

# tcp, kafka, RocketMQ

canal.serverMode = rabbitmq

...

# table meta tsdb info

canal.instance.tsdb.enable = false

...

# 一下几项均为 1.1.5 新版本新增支持 rabbitmq 的配置

rabbitmq.host = 10.224.45.9:25674

rabbitmq.virtual.host = /

# 指定 rabbitmq 上的 exchange 名称, "新建 `Exchange`" 步骤新建的名称

rabbitmq.exchange = usercenter

# 连接 rabbitmq 的用户名

rabbitmq.username = guest

# 连接 rabbitmq 的密码

rabbitmq.password = guest

...

3、 Canal Server配置及部署

canal-server接入canal-admin,参考:Canal Admin ServerGuide

3.1. 核心配置

root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5/conf# cat canal_local.properties

# register ip

canal.register.ip = 10.8.158.4

# canal admin config

canal.admin.manager = 10.8.158.4:8089

canal.admin.port = 11110

canal.admin.user = admin

canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

# admin auto register

canal.admin.register.auto = true

canal.admin.register.cluster = usercenter

3.2. 启动canal-server,注册canal-admin

root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5# bash bin/startup.sh local

注册后在admin中的展现。

d7c1a7160c60

server注册admin

4.Instance实例配置

4.1. 新增instance实例

Instance实例启动后,即可监听mysql的binlog,并将数据推到RabbitMQ。推送的规则参考:Canal Kafka/RocketMQ QuickStart

RabbitMQ的配置的topic,事实上是设置到routeKey上。

d7c1a7160c60

新增instance实例

d7c1a7160c60

instance列表

5. 最后我们调试一下

MQ监控处理

@RabbitListener(bindings = {

@QueueBinding(

value = @Queue(value = "usercenter", durable = "true"),

exchange = @Exchange(value = "usercenter", type = ExchangeTypes.TOPIC),

key = "usercenter"

)

}, concurrency = "10")

public void test(String message) {

log.info("test接收到消息。message:{}", message);

}

修改表数据

d7c1a7160c60

修改数据库数据

MQ监控输入日志如下

2021-02-04 11:34:53.037 INFO [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.18278394004589313","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test","ts":1612409677905,"type":"INSERT"}

2021-02-04 11:34:53.050 INFO [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.461750433278969","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_1","ts":1612409677905,"type":"INSERT"}

2021-02-04 11:34:53.051 INFO [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.12820304849017694","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_2","ts":1612409677905,"type":"INSERT"}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/445429
推荐阅读
相关标签
  

闽ICP备14008679号