赞
踩
2022-05-25 14:46:03.697 ERROR 17108 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
。。。
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer:
。。。
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'Evad.consumer-group-evad.errors.recoverer' defined in null。。。
yml配置文件中定义的两个通道:evad_input和devilvan_input,却共用了一个topic:Evad,导致绑定失败。
spring:
application:
name: devilvan-kafka
cloud:
stream:
default-binder: kafka
bindings:
evad_input:
destination: Evad
binder: kafka
group: consumer-group-evad
content-type: text/plain
evad_output:
destination: Evad
binder: kafka
content-type: text/plain
devilvan_input:
# 一个通道只能唯一对应一个topic,否则会报binder
destination: Evad
binder: kafka
# 一个消费者组可以被多个通道使用
group: consumer-group-evad
content-type: text/plain
devilvan_output:
destination: Evad
binder: kafka
content-type: text/plain
新定义一个Topic:Evad05,使devilvan通道对应topic,区别于evad通道对应的topic
spring:
application:
name: devilvan-kafka
cloud:
stream:
default-binder: kafka
bindings:
evad_input:
destination: Evad
binder: kafka
group: consumer-group-evad
content-type: text/plain
evad_output:
destination: Evad
binder: kafka
content-type: text/plain
devilvan_input:
# 一个通道只能唯一对应一个topic,否则会报binder
destination: Evad05
binder: kafka
# 一个消费者组可以被多个通道使用
group: consumer-group-evad
content-type: text/plain
devilvan_output:
destination: Evad05
binder: kafka
content-type: text/plain
@PostMapping("sendEvadMessage")
public ResultMessage<String> sendEvadMessage(@RequestBody String message) {
ResultMessage<String> resultMessage = new ResultMessage<>();
sender.sendEvadMessage(message);
resultMessage.setData(message);
return resultMessage.success();
}
@PostMapping("sendDevilvanMessage")
public ResultMessage<String> sendDevilvanMessage(@RequestBody String message) {
ResultMessage<String> resultMessage = new ResultMessage<>();
sender.sendDevilvanMessage(message);
resultMessage.setData(message);
return resultMessage.success();
}
public interface EvadChannel {
String EVAD_INPUT = "evad_input";
String EVAD_OUTPUT = "evad_output";
String DEVILVAN_INPUT = "devilvan_input";
String DEVILVAN_OUTPUT = "devilvan_output";
/**
* 缺省接收消息通道
* @return channel 返回缺省信息接收通道
*/
@Input(EVAD_INPUT)
MessageChannel receiveEvadMessage();
/**
* 缺省发送消息通道
* @return channel 返回缺省信息发送通道
*/
@Output(EVAD_OUTPUT)
MessageChannel sendEvadMessage();
/**
* 缺省接收消息通道
* @return channel 返回缺省信息接收通道
*/
@Input(DEVILVAN_INPUT)
MessageChannel receiveDevilvanMessage();
/**
* 缺省发送消息通道
* @return channel 返回缺省信息发送通道
*/
@Output(DEVILVAN_OUTPUT)
MessageChannel sendDevilvanMessage();
}
@Slf4j
@Component
public class EvadMessageSender {
@Autowired
private EvadChannel channel;
/**
* 消息发送到默认通道:缺省通道对应缺省主题
*
* @param message
*/
public void sendEvadMessage(String message) {
channel.sendEvadMessage().send(MessageBuilder.withPayload(message).build());
}
/**
* 消息发送到默认通道:缺省通道对应缺省主题
*
* @param message
*/
public void sendDevilvanMessage(String message) {
channel.sendDevilvanMessage().send(MessageBuilder.withPayload(message).build());
}
}
@Slf4j
@Configuration
@EnableBinding(value = EvadChannel.class)
public class EvadReceiveListener {
@StreamListener(EvadChannel.EVAD_INPUT)
public void receiveEvadMessage(Message<String> message) {
log.info("{} 订阅消息:通道 = " + EvadChannel.EVAD_INPUT + ",data = {}",
DateUtil.now(), message.getPayload());
}
@StreamListener(EvadChannel.DEVILVAN_INPUT)
public void receiveDevilvanMessage(Message<String> message) {
log.info("{} 订阅消息:通道 = " + EvadChannel.DEVILVAN_INPUT + ",data = {}",
DateUtil.now(), message.getPayload());
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。