赞
踩
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
单个binder:
- spring:
- cloud:
- stream:
- kafka:
- binder:
- brokers: ip:port
- auto-create-topics: true
- bindings:
- output-channel:
- binder: kafka
- destination: topic_test
- content-type: application/json
- input-channel:
- binder: kafka
- destination: topic_test
- content-type: application/json
多个binder:
- server:
- port: 8088
-
- spring:
- cloud:
- stream:
- binders:
- kafka1: #自定义名字,供下面bindings下面的binder匹配
- type: kafka
- environment:
- spring:
- cloud:
- stream:
- kafka:
- binder:
- brokers: ip:port # 根据字节的地址来填写
- auto-create-topics: true #自动创建topic
- kafka2: #自定义名字,供下面bindings下面的binder匹配
- type: kafka
- environment:
- spring:
- cloud:
- stream:
- kafka:
- binder:
- brokers: ip:port # 根据字节的地址来填写
- auto-create-topics: true #自动创建topic
- bindings:
- my_input_local_channel: # 消费者
- binder: kafka1
- destination: local_topic_stu
- group: group1
- my_output_local_channel: # 生产者
- binder: kafka1
- destination: local_topic_stu
- contentType: application/json
- my_input_dev_channel: # 消费者
- binder: kafka2
- destination: dev_topic_stu
- group: group1
- my_output_dev_channel: # 生产者
- binder: kafka2
- contentType: application/json
- destination: dev_topic_stu
这边以多个binder来实现
- public interface MyChannel {
-
- String DEV_INPUT_CHANNEL_NAME = "my_input_dev_channel";
- String DEV_OUTPUT_CHANNEL_NAME = "my_output_dev_channel";
-
- String LOCAL_INPUT_CHANNEL_NAME = "my_input_local_channel";
- String LOCAL_OUTPUT_CHANNEL_NAME = "my_output_local_channel";
-
- @Input(MyChannel.DEV_INPUT_CHANNEL_NAME)
- SubscribableChannel getDevMessage();
-
- @Output(MyChannel.DEV_OUTPUT_CHANNEL_NAME)
- MessageChannel pushDevMsg();
-
-
- @Input(MyChannel.LOCAL_INPUT_CHANNEL_NAME)
- SubscribableChannel getLocalMessage();
-
- @Output(MyChannel.LOCAL_OUTPUT_CHANNEL_NAME)
- MessageChannel pushLocalMsg();
- }
-
- //消息体类
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class MessageDto implements Serializable {
- private static final long serialVersionUID = -7968206925064164353L;
-
- private String id;
- private String type;
- private String data;
- }
-
- //消息发送
- @Slf4j
- @EnableBinding(MyChannel.class)
- public class MessageSendService {
-
- @Resource
- private MyChannel channel;
-
-
- public boolean pushLocalMsg(MessageDto msg) {
- log.info("local发送消息:{}", JSONUtil.toJsonStr(msg));
- return channel.pushLocalMsg().send(MessageBuilder.withPayload(msg).build());
- }
-
-
- public boolean pushDevMsg(MessageDto msg) {
- log.info("dev发送消息:{}", JSONUtil.toJsonStr(msg));
- return channel.pushDevMsg().send(MessageBuilder.withPayload(msg).build());
- }
- }
-
- //消息接收
- @Slf4j
- @EnableBinding(MyChannel.class)
- public class MessageReceiverService {
-
- @StreamListener(MyChannel.LOCAL_INPUT_CHANNEL_NAME)
- public void consumerLocalMessage(Message<MessageDto> msg) {
- log.info("local接收到消息:{}", JSONUtil.toJsonStr(msg.getPayload()));
- }
-
-
- @StreamListener(MyChannel.DEV_INPUT_CHANNEL_NAME)
- public void consumerDevMessage(Message<MessageDto> msg) {
- log.info("dev:接收到消息:{}", JSONUtil.toJsonStr(msg.getPayload()));
- }
-
- }
- @RestController
- public class MessageController {
-
- @Resource
- private MessageSendService messageSendService;
-
- @PostMapping("/sendMsg/{profile}")
- public String sendMsg(@PathVariable String profile, @RequestBody MessageDto dto) {
- dto.setId(UUID.fastUUID().toString(true));
-
- if (Objects.equals(profile, "dev")) {
- messageSendService.pushDevMsg(dto);
- } else {
- messageSendService.pushLocalMsg(dto);
- }
- return "success";
- }
- }
5.测试结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。