当前位置:   article > 正文

canal整合rabbitmq_canal1.1.6 + rabbit

canal1.1.6 + rabbit

canal1.1.5好像就开始支持rabbitmq了,然后我下载的是1.1.6,为啥要整合rabbitmq,首先其他mq我也
不会啊,其次各有所需对吧。
在这里插入图片描述

首先要修改canal.properties文件

## tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 修改为rabbitmq 
canal.serverMode = rabbitMQ

## 配置上我们的rabbitmq信息
rabbitmq.host = 8.142.188.187
rabbitmq.virtual.host = /
rabbitmq.exchange = canal_exchange
rabbitmq.username = admin
rabbitmq.password = admin
rabbitmq.deliveryMode =
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

再修改instance.properties文件

# 链接数据库的信息
canal.instance.master.address=127.0.0.1:3306

# username/password 链接数据库的账号密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root

# mq config  这里是rabbitmq的routerkey
canal.mq.topic=canal_key
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

修改这几项基本就OK了

rabbitmq配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CanalConfig {

    @Bean
    Queue queue(){
        return  new Queue("canal_queue");
    }

    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("canal_exchange");
    }

    @Bean
    Binding binding(){
        return BindingBuilder.bind(queue()).to(directExchange()).with("canal_key");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

这样我每次变动数据库,都会把变动的信息投递给rabbitmq了
在这里插入图片描述
在这里插入图片描述
这里收到的消息都是ASCLL码,所以要转一下

@RabbitListener(queues = "canal_queue")
    public void  getMsg(Message message, Channel channel, String msg){
        String[]chars=msg.split(",");
        StringBuffer stringBuffer = new StringBuffer();
        for(int i=0;i<chars.length;i++){
            stringBuffer.append((char)Integer.parseInt(chars[i]));
        }
        JSONObject jsonObject = JSONObject.parseObject(stringBuffer.toString());
        System.out.println(jsonObject);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在这里插入图片描述
在这里插入图片描述
打完收工

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号