赞
踩
canal1.1.5好像就开始支持rabbitmq了,然后我下载的是1.1.6,为啥要整合rabbitmq,首先其他mq我也
不会啊,其次各有所需对吧。
首先要修改canal.properties文件
## tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 修改为rabbitmq
canal.serverMode = rabbitMQ
## 配置上我们的rabbitmq信息
rabbitmq.host = 8.142.188.187
rabbitmq.virtual.host = /
rabbitmq.exchange = canal_exchange
rabbitmq.username = admin
rabbitmq.password = admin
rabbitmq.deliveryMode =
再修改instance.properties文件
# 链接数据库的信息
canal.instance.master.address=127.0.0.1:3306
# username/password 链接数据库的账号密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# mq config 这里是rabbitmq的routerkey
canal.mq.topic=canal_key
修改这几项基本就OK了
rabbitmq配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CanalConfig {
@Bean
Queue queue(){
return new Queue("canal_queue");
}
@Bean
DirectExchange directExchange(){
return new DirectExchange("canal_exchange");
}
@Bean
Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with("canal_key");
}
}
这样我每次变动数据库,都会把变动的信息投递给rabbitmq了
这里收到的消息都是ASCLL码,所以要转一下
@RabbitListener(queues = "canal_queue")
public void getMsg(Message message, Channel channel, String msg){
String[]chars=msg.split(",");
StringBuffer stringBuffer = new StringBuffer();
for(int i=0;i<chars.length;i++){
stringBuffer.append((char)Integer.parseInt(chars[i]));
}
JSONObject jsonObject = JSONObject.parseObject(stringBuffer.toString());
System.out.println(jsonObject);
}
打完收工
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。