赞
踩
mysql+canal+rabbitMq+Springboot 实现数据库数据同步监听
本机:Mac
mysql:5.7 自己弄的测试机
canal:canal.deployer-1.1.5 Canal
MQ: docker- rabbitmq- 3.7.7-management(有图形界面)
使用 show variables like ‘%log_bin%’; 查看是否成功启用MySQL的binlog功能。
没有的话,去手动开启,这个不会的话,自己可以网上查询一下
开启后:
新建一个Exchange
消费账号根据自己的需要。我这边是自己测试研究用的,直接就用admin。或者guest
Vhost权限是要的,不然后面有影响
1.1.5版本
canal.properties
##换成rabbitMQ
canal.serverMode = rabbitMQ
#对应修改rabbitMQ的配置
rabbitmq.host =127.0.0.1
rabbitmq.virtual.host =my_vhost
rabbitmq.exchange =exchange_user_sync
rabbitmq.username =admin
rabbitmq.password =admin
rabbitmq.deliveryMode =
instance.properties
# table regex 具体哪个数据库哪个表,可以默认全部
canal.instance.filter.regex=test.user,test.sbc_user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
然后启动
xml
<properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <!--版本冲突会报错故cloud版本需要切换--> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Greenwich.SR4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
server.port=8080
spring.application.name=mla
spring.rabbitmq.host= localhost
spring.rabbitmq.port= 5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vhost
spring.cloud.stream.bindings.sms-service-message-output.destination=exchange_user_sync
spring.cloud.stream.bindings.sms-service-message-input.destination=exchange_user_sync
相关类
package com.mla.test.demo.MQ; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @author malaian * @description * @datetime 2021-08-04 11:30 **/ @EnableBinding public interface MessageBinding { //发送 String Q_SMS_SERVICE_MESSAGE_SEND = "sms-service-message-output"; //接收 String Q_SMS_SERVICE_MESSAGE_SEND_A = "sms-service-message-input"; /** * 消息发送 * @return */ @Output(Q_SMS_SERVICE_MESSAGE_SEND) MessageChannel subOutput(); /** * 消息接收 * @return */ @Input(Q_SMS_SERVICE_MESSAGE_SEND_A) SubscribableChannel subInput(); }
实现
package com.mla.test.demo.MQ; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * @author malaian * @description * @datetime 2021-08-04 13:35 **/ @Slf4j @Component @EnableBinding(MessageBinding.class) public class MessageImpl { @StreamListener(MessageBinding.Q_SMS_SERVICE_MESSAGE_SEND_A) public void subGoodsInfoStock(String json) { log.info("接收的信息:{}",json); } }
一切就绪后启动
mysql:新增
消息接受者接收:
转换数据后:
{ "data":[ { "user_id":"7", "pass_word1":"99999", "user_name":"000000" } ], "database":"test", "es":1628066207000, "id":148, "isDdl":false, "mysqlType":{ "user_id":"bigint(20)", "pass_word1":"varchar(255)", "user_name":"varchar(255)" }, "old":null, "pkNames":[ "user_id" ], "sql":"", "sqlType":{ "user_id":-5, "pass_word":12, "user_name":12 }, "table":"user", "ts":1628066210647, "type":"INSERT" }
其中有很多字面意思很好理解的。table, sqlType等等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。