当前位置:   article > 正文

springboot整合springcloud stream kafka_spring cloud stream 函数式编程整合 kafka

spring cloud stream 函数式编程整合 kafka

1.导包

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>

2.配置

单个binder

  1. spring:
  2. cloud:
  3. stream:
  4. kafka:
  5. binder:
  6. brokers: ip:port
  7. auto-create-topics: true
  8. bindings:
  9. output-channel:
  10. binder: kafka
  11. destination: topic_test
  12. content-type: application/json
  13. input-channel:
  14. binder: kafka
  15. destination: topic_test
  16. content-type: application/json

 多个binder:

  1. server:
  2. port: 8088
  3. spring:
  4. cloud:
  5. stream:
  6. binders:
  7. kafka1: #自定义名字,供下面bindings下面的binder匹配
  8. type: kafka
  9. environment:
  10. spring:
  11. cloud:
  12. stream:
  13. kafka:
  14. binder:
  15. brokers: ip:port # 根据字节的地址来填写
  16. auto-create-topics: true #自动创建topic
  17. kafka2: #自定义名字,供下面bindings下面的binder匹配
  18. type: kafka
  19. environment:
  20. spring:
  21. cloud:
  22. stream:
  23. kafka:
  24. binder:
  25. brokers: ip:port # 根据字节的地址来填写
  26. auto-create-topics: true #自动创建topic
  27. bindings:
  28. my_input_local_channel: # 消费者
  29. binder: kafka1
  30. destination: local_topic_stu
  31. group: group1
  32. my_output_local_channel: # 生产者
  33. binder: kafka1
  34. destination: local_topic_stu
  35. contentType: application/json
  36. my_input_dev_channel: # 消费者
  37. binder: kafka2
  38. destination: dev_topic_stu
  39. group: group1
  40. my_output_dev_channel: # 生产者
  41. binder: kafka2
  42. contentType: application/json
  43. destination: dev_topic_stu

3.编写通道代码

这边以多个binder来实现

  1. public interface MyChannel {
  2. String DEV_INPUT_CHANNEL_NAME = "my_input_dev_channel";
  3. String DEV_OUTPUT_CHANNEL_NAME = "my_output_dev_channel";
  4. String LOCAL_INPUT_CHANNEL_NAME = "my_input_local_channel";
  5. String LOCAL_OUTPUT_CHANNEL_NAME = "my_output_local_channel";
  6. @Input(MyChannel.DEV_INPUT_CHANNEL_NAME)
  7. SubscribableChannel getDevMessage();
  8. @Output(MyChannel.DEV_OUTPUT_CHANNEL_NAME)
  9. MessageChannel pushDevMsg();
  10. @Input(MyChannel.LOCAL_INPUT_CHANNEL_NAME)
  11. SubscribableChannel getLocalMessage();
  12. @Output(MyChannel.LOCAL_OUTPUT_CHANNEL_NAME)
  13. MessageChannel pushLocalMsg();
  14. }
  15. //消息体类
  16. @Data
  17. @AllArgsConstructor
  18. @NoArgsConstructor
  19. public class MessageDto implements Serializable {
  20. private static final long serialVersionUID = -7968206925064164353L;
  21. private String id;
  22. private String type;
  23. private String data;
  24. }
  25. //消息发送
  26. @Slf4j
  27. @EnableBinding(MyChannel.class)
  28. public class MessageSendService {
  29. @Resource
  30. private MyChannel channel;
  31. public boolean pushLocalMsg(MessageDto msg) {
  32. log.info("local发送消息:{}", JSONUtil.toJsonStr(msg));
  33. return channel.pushLocalMsg().send(MessageBuilder.withPayload(msg).build());
  34. }
  35. public boolean pushDevMsg(MessageDto msg) {
  36. log.info("dev发送消息:{}", JSONUtil.toJsonStr(msg));
  37. return channel.pushDevMsg().send(MessageBuilder.withPayload(msg).build());
  38. }
  39. }
  40. //消息接收
  41. @Slf4j
  42. @EnableBinding(MyChannel.class)
  43. public class MessageReceiverService {
  44. @StreamListener(MyChannel.LOCAL_INPUT_CHANNEL_NAME)
  45. public void consumerLocalMessage(Message<MessageDto> msg) {
  46. log.info("local接收到消息:{}", JSONUtil.toJsonStr(msg.getPayload()));
  47. }
  48. @StreamListener(MyChannel.DEV_INPUT_CHANNEL_NAME)
  49. public void consumerDevMessage(Message<MessageDto> msg) {
  50. log.info("dev:接收到消息:{}", JSONUtil.toJsonStr(msg.getPayload()));
  51. }
  52. }

4.编写测试controller

  1. @RestController
  2. public class MessageController {
  3. @Resource
  4. private MessageSendService messageSendService;
  5. @PostMapping("/sendMsg/{profile}")
  6. public String sendMsg(@PathVariable String profile, @RequestBody MessageDto dto) {
  7. dto.setId(UUID.fastUUID().toString(true));
  8. if (Objects.equals(profile, "dev")) {
  9. messageSendService.pushDevMsg(dto);
  10. } else {
  11. messageSendService.pushLocalMsg(dto);
  12. }
  13. return "success";
  14. }
  15. }

5.测试结果

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/1022087
推荐阅读
相关标签
  

闽ICP备14008679号