当前位置:   article > 正文

SpringCloudStream整合Kafka,解决两个通道对应同一个topic报错问题。

SpringCloudStream整合Kafka,解决两个通道对应同一个topic报错问题。

总结

  1. 一个通道(如:evad_input)只能唯一对应一个topic,否则会报错
  2. 消费者组则可以被多个通道共同使用

在这里插入图片描述

报错日志

2022-05-25 14:46:03.697 ERROR 17108 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds
。。。
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
。。。
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'Evad.consumer-group-evad.errors.recoverer' defined in null。。。
  • 1
  • 2
  • 3
  • 4
  • 5

问题所在

yml配置文件中定义的两个通道:evad_input和devilvan_input,却共用了一个topic:Evad,导致绑定失败。

配置文件

spring:
  application:
    name: devilvan-kafka
  cloud:
    stream:
      default-binder: kafka
      bindings:
        evad_input:
          destination: Evad
          binder: kafka
          group: consumer-group-evad
          content-type: text/plain
        evad_output:
          destination: Evad
          binder: kafka
          content-type: text/plain
        devilvan_input:
          # 一个通道只能唯一对应一个topic,否则会报binder
          destination: Evad
          binder: kafka
          # 一个消费者组可以被多个通道使用
          group: consumer-group-evad
          content-type: text/plain
        devilvan_output:
          destination: Evad
          binder: kafka
          content-type: text/plain
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

解决方法

新定义一个Topic:Evad05,使devilvan通道对应topic,区别于evad通道对应的topic

修改后

spring:
  application:
    name: devilvan-kafka
  cloud:
    stream:
      default-binder: kafka
      bindings:
        evad_input:
          destination: Evad
          binder: kafka
          group: consumer-group-evad
          content-type: text/plain
        evad_output:
          destination: Evad
          binder: kafka
          content-type: text/plain
        devilvan_input:
          # 一个通道只能唯一对应一个topic,否则会报binder
          destination: Evad05
          binder: kafka
          # 一个消费者组可以被多个通道使用
          group: consumer-group-evad
          content-type: text/plain
        devilvan_output:
          destination: Evad05
          binder: kafka
          content-type: text/plain
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

代码

1. XXXController(生产消息的控制器)

    @PostMapping("sendEvadMessage")
    public ResultMessage<String> sendEvadMessage(@RequestBody String message) {
        ResultMessage<String> resultMessage = new ResultMessage<>();
        sender.sendEvadMessage(message);
        resultMessage.setData(message);
        return resultMessage.success();
    }

    @PostMapping("sendDevilvanMessage")
    public ResultMessage<String> sendDevilvanMessage(@RequestBody String message) {
        ResultMessage<String> resultMessage = new ResultMessage<>();
        sender.sendDevilvanMessage(message);
        resultMessage.setData(message);
        return resultMessage.success();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2. 自定义通道

public interface EvadChannel {
    String EVAD_INPUT = "evad_input";
    String EVAD_OUTPUT = "evad_output";
    String DEVILVAN_INPUT = "devilvan_input";
    String DEVILVAN_OUTPUT = "devilvan_output";

    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(EVAD_INPUT)
    MessageChannel receiveEvadMessage();

    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(EVAD_OUTPUT)
    MessageChannel sendEvadMessage();

    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(DEVILVAN_INPUT)
    MessageChannel receiveDevilvanMessage();

    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(DEVILVAN_OUTPUT)
    MessageChannel sendDevilvanMessage();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3. EvadMessageSender(通过通道发送消息)

@Slf4j
@Component
public class EvadMessageSender {
    @Autowired
    private EvadChannel channel;

    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     *
     * @param message
     */
    public void sendEvadMessage(String message) {
        channel.sendEvadMessage().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     *
     * @param message
     */
    public void sendDevilvanMessage(String message) {
        channel.sendDevilvanMessage().send(MessageBuilder.withPayload(message).build());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

4. EvadReceiveListener(订阅/消费者)

@Slf4j
@Configuration
@EnableBinding(value = EvadChannel.class)
public class EvadReceiveListener {
    @StreamListener(EvadChannel.EVAD_INPUT)
    public void receiveEvadMessage(Message<String> message) {
        log.info("{}    订阅消息:通道 = " + EvadChannel.EVAD_INPUT + ",data = {}",
                DateUtil.now(), message.getPayload());
    }

    @StreamListener(EvadChannel.DEVILVAN_INPUT)
    public void receiveDevilvanMessage(Message<String> message) {
        log.info("{}    订阅消息:通道 = " + EvadChannel.DEVILVAN_INPUT + ",data = {}",
                DateUtil.now(), message.getPayload());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/840805
推荐阅读
相关标签
  

闽ICP备14008679号