当前位置:   article > 正文

canal RabbitMQ dynamicTopic 使用记录_canal.mq.dynamictopic参数说明

canal.mq.dynamictopic参数说明

canal RabbitMQ 

参考:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

目前最新版本1.1.5 开发版,官网主推的是kafka 和 rocketMQ ,rabbitMQ 的配置不多。

canal.properties

  1. rabbitmq.host = 127.0.0.1:5672
  2. rabbitmq.virtual.host = data
  3. rabbitmq.exchange = canal
  4. rabbitmq.username = guest
  5. rabbitmq.password = guest

关于RabbitMQ 配置动态topic :

  1. # mq config
  2. canal.mq.topic=default
  3. # dynamic topic route by schema or table regex
  4. canal.mq.dynamicTopic=.*\\..*
  5. canal.mq.partition=0

官网关于 dynamicTopic 的说明:

canal.mq.dynamicTopic 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

  • 例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上
  • 例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test\\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上
  • 例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
  • 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
  • 例子4:testA:test\\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
  • 例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值

这段文字没明白,看实际MQ生产者代码逻辑(CanalRabbitMQProducer.java)

  1. if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
  2. // 动态topic
  3. Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
  4. destination.getTopic(),
  5. destination.getDynamicTopic());
  6. for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
  7. final String topicName = entry.getKey();//这里改成不替换. ,方便使用rabbitmq 的topic , "." 是单词分隔符 。实际源码是 entry.getKey().replace(".","_")
  8. final com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
  9. template.submit(() -> send(destination, topicName, messageSub));
  10. }
  11. template.waitForResult();
  12. } else {
  13. send(destination, destination.getTopic(), message);
  14. }

 真实处理dynamicTopic的类是:MQMessageUtils.java

  1. String schemaName = entry.getHeader().getSchemaName();
  2. String tableName = entry.getHeader().getTableName();
  3. if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) {
  4. put2MapMessage(messages, message.getId(), defaultTopic, entry);
  5. } else {
  6. Set<String> topics = matchTopics(schemaName + "." + tableName, dynamicTopicConfigs);
  7. if (topics != null) {
  8. for (String topic : topics) {
  9. put2MapMessage(messages, message.getId(), topic, entry);
  10. }
  11. } else {
  12. topics = matchTopics(schemaName, dynamicTopicConfigs);
  13. if (topics != null) {
  14. for (String topic : topics) {
  15. put2MapMessage(messages, message.getId(), topic, entry);
  16. }
  17. } else {
  18. put2MapMessage(messages, message.getId(), defaultTopic, entry);
  19. }
  20. }
  21. }

topic 要么就是schemaName、schemaName.tableName 、defaultTopic (canal.mq.topic 配置

匹配topic 在 matchTopics() 方法,

  1. private static Set<String> matchTopics(String name, String dynamicTopicConfigs) {
  2. String[] router = StringUtils.split(StringUtils.replace(dynamicTopicConfigs, ",", ";"), ";");
  3. Set<String> topics = new HashSet<>();
  4. for (String item : router) {
  5. int i = item.indexOf(":");
  6. if (i > -1) {
  7. String topic = item.substring(0, i).trim();
  8. String topicConfigs = item.substring(i + 1).trim();
  9. if (matchDynamicTopic(name, topicConfigs)) {
  10. topics.add(topic);
  11. // 匹配了一个就退出
  12. break;
  13. }
  14. } else if (matchDynamicTopic(name, item)) {
  15. // 匹配了一个就退出
  16. topics.add(name.toLowerCase());
  17. break;
  18. }
  19. }
  20. return topics.isEmpty() ? null : topics;
  21. }
matchDynamicTopic() 只是匹配,返回boolean类型值。这里看出":"的含义就是,匹配schema.table 到指定的topic 。 (topic:.*\..* )

默认情况 canal.mq.dynamicTopic=.*\\..* 

根据代码: final String topicName = entry.getKey().replace('.', '_');   ,匹配的是 schemaName_tableName

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/830230
推荐阅读
相关标签
  

闽ICP备14008679号