赞
踩
参考:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
目前最新版本1.1.5 开发版,官网主推的是kafka 和 rocketMQ ,rabbitMQ 的配置不多。
canal.properties
- rabbitmq.host = 127.0.0.1:5672
- rabbitmq.virtual.host = data
- rabbitmq.exchange = canal
- rabbitmq.username = guest
- rabbitmq.password = guest
关于RabbitMQ 配置动态topic :
- # mq config
- canal.mq.topic=default
- # dynamic topic route by schema or table regex
- canal.mq.dynamicTopic=.*\\..*
- canal.mq.partition=0
官网关于 dynamicTopic 的说明:
canal.mq.dynamicTopic 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔
- 例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上
- 例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
- 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
- 例子4:test\\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
- 例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值
为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table
- 例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上
- 例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
- 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
- 例子4:testA:test\\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
- 例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值
这段文字没明白,看实际MQ生产者代码逻辑(CanalRabbitMQProducer.java)
- if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
- // 动态topic
- Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
- destination.getTopic(),
- destination.getDynamicTopic());
-
- for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
- final String topicName = entry.getKey();//这里改成不替换. ,方便使用rabbitmq 的topic , "." 是单词分隔符 。实际源码是 entry.getKey().replace(".","_")
- final com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
-
- template.submit(() -> send(destination, topicName, messageSub));
- }
-
- template.waitForResult();
- } else {
- send(destination, destination.getTopic(), message);
- }

真实处理dynamicTopic的类是:MQMessageUtils.java
- String schemaName = entry.getHeader().getSchemaName();
- String tableName = entry.getHeader().getTableName();
-
- if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) {
- put2MapMessage(messages, message.getId(), defaultTopic, entry);
- } else {
- Set<String> topics = matchTopics(schemaName + "." + tableName, dynamicTopicConfigs);
- if (topics != null) {
- for (String topic : topics) {
- put2MapMessage(messages, message.getId(), topic, entry);
- }
- } else {
- topics = matchTopics(schemaName, dynamicTopicConfigs);
- if (topics != null) {
- for (String topic : topics) {
- put2MapMessage(messages, message.getId(), topic, entry);
- }
- } else {
- put2MapMessage(messages, message.getId(), defaultTopic, entry);
- }
- }
- }

topic 要么就是schemaName、schemaName.tableName 、defaultTopic (canal.mq.topic 配置
)
匹配topic 在 matchTopics() 方法,
- private static Set<String> matchTopics(String name, String dynamicTopicConfigs) {
- String[] router = StringUtils.split(StringUtils.replace(dynamicTopicConfigs, ",", ";"), ";");
- Set<String> topics = new HashSet<>();
- for (String item : router) {
- int i = item.indexOf(":");
- if (i > -1) {
- String topic = item.substring(0, i).trim();
- String topicConfigs = item.substring(i + 1).trim();
- if (matchDynamicTopic(name, topicConfigs)) {
- topics.add(topic);
- // 匹配了一个就退出
- break;
- }
- } else if (matchDynamicTopic(name, item)) {
- // 匹配了一个就退出
- topics.add(name.toLowerCase());
- break;
- }
- }
- return topics.isEmpty() ? null : topics;
- }

matchDynamicTopic() 只是匹配,返回boolean类型值。这里看出":"的含义就是,匹配schema.table 到指定的topic 。 (topic:.*\..* )
默认情况 canal.mq.dynamicTopic=.*\\..*
根据代码: final String topicName = entry.getKey().replace('.', '_'); ,匹配的是 schemaName_tableName
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。