当前位置:   article > 正文

spring boot kafka_手把手教你实战SpringCloudStream&集成kafka

springboot kafka stream

v2-00fbc200509367b5c32f5a946f4993e3_1440w.jpg?source=172ae18b

一、关于Spring-Cloud-Stream

  Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

  在这里我先放一张官网的图:

v2-4973741d0d93a3924fc548a973a80636_b.jpg

应用程序通过Spring Cloud Stream注入到输入和输出通道与外界进行通信。根据此规则我们很容易的实现消息传递,订阅消息与消息中转。并且当需要切换消息中间件时,几乎不需要修改代码,只需要变更配置就行了。

  在用例图中 Inputs代表了应用程序监听消息 、outputs代表发送消息、binder的话大家可以理解为将应用程序与消息中间件隔离的抽象,类似于三层架构下利用dao屏蔽service与数据库的实现的原理。

  springcloud默认提供了rabbitmq与kafka的实现。

二、springcloud集成kafka

1、添加gradle依赖:

  1. dependencies{
  2. compile('org.springframework.cloud:spring-cloud-stream')
  3. compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
  4. compile('org.springframework.kafka:spring-kafka')
  5. }

2、定义一个接口:

  spring-cloud-stream已经给我们定义了最基本的输入与输出接口,他们分别是 Source,Sink, Processor

Sink接口:

  1. package org.springframework.cloud.stream.messaging;
  2. import org.springframework.cloud.stream.annotation.Input;
  3. import org.springframework.messaging.SubscribableChannel;
  4. public interface Sink {
  5. String INPUT = "input";
  6. @Input("input")
  7. SubscribableChannel input();
  8. }

Source接口:

  1. package org.springframework.cloud.stream.messaging;
  2. import org.springframework.cloud.stream.annotation.Output;
  3. import org.springframework.messaging.MessageChannel;
  4. public interface Source {
  5. String OUTPUT = "output";
  6. @Output("output")
  7. MessageChannel output();
  8. }

Processor接口:

  1. package org.springframework.cloud.stream.messaging;
  2. public interface Processor extends Source, Sink {
  3. }

  这里面Processor这个接口既定义输入通道又定义了输出通道。同时我们也可以自己定义通道接口,代码如下:

  1. package com.bdqn.lyrk.shop.channel;
  2. import org.springframework.cloud.stream.annotation.Input;
  3. import org.springframework.cloud.stream.annotation.Output;
  4. import org.springframework.messaging.MessageChannel;
  5. import org.springframework.messaging.SubscribableChannel;
  6. public interface ShopChannel {
  7. /**
  8. * 发消息的通道名称
  9. */
  10. String SHOP_OUTPUT = "shop_output";
  11. /**
  12. * 消息的订阅通道名称
  13. */
  14. String SHOP_INPUT = "shop_input";
  15. /**
  16. * 发消息的通道
  17. *
  18. * @return
  19. */
  20. @Output(SHOP_OUTPUT)
  21. MessageChannel sendShopMessage();
  22. /**
  23. * 收消息的通道
  24. *
  25. * @return
  26. */
  27. @Input(SHOP_INPUT)
  28. SubscribableChannel recieveShopMessage();
  29. }

3、定义服务类

  1. package com.bdqn.lyrk.shop.server;
  2. import com.bdqn.lyrk.shop.channel.ShopChannel;
  3. import org.springframework.cloud.stream.annotation.StreamListener;
  4. import org.springframework.messaging.Message;
  5. import org.springframework.messaging.MessageChannel;
  6. import org.springframework.messaging.support.MessageBuilder;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import javax.annotation.Resource;
  10. @RestController
  11. public class ShopService {
  12. @Resource(name = ShopChannel.SHOP_OUTPUT)
  13. private MessageChannel sendShopMessageChannel;
  14. @GetMapping("/sendMsg")
  15. public String sendShopMessage(String content) {
  16. boolean isSendSuccess = sendShopMessageChannel.
  17. send(MessageBuilder.withPayload(content).build());
  18. return isSendSuccess ? "发送成功" : "发送失败";
  19. }
  20. @StreamListener(ShopChannel.SHOP_INPUT)
  21. public void receive(Message<String> message) {
  22. System.out.println(message.getPayload());
  23. }
  24. }

这里面大家注意 @StreamListener。这个注解可以监听输入通道里的消息内容,注解里面的属性指定我们刚才定义的输入通道名称,而MessageChannel则可以通过

输出通道发送消息。使用@Resource注入时需要指定我们刚才定义的输出通道名称

4、定义启动类

  1. package com.bdqn.lyrk.shop;
  2. import com.bdqn.lyrk.shop.channel.ShopChannel;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.cloud.stream.annotation.EnableBinding;
  6. @SpringBootApplication
  7. @EnableBinding(ShopChannel.class)
  8. public class ShopServerApplication {
  9. public static void main(String[] args) {
  10. SpringApplication.run(ShopServerApplication.class, args);
  11. }
  12. }

5、定义application.yml文件

  1. spring:
  2. application:
  3. name: shop-server
  4. cloud:
  5. stream:
  6. bindings:
  7. #配置自己定义的通道与哪个中间件交互
  8. shop_input: #ShopChannel里InputOutput的值
  9. destination: zhibo #目标主题
  10. shop_output:
  11. destination: zhibo
  12. default-binder: kafka #默认的binder是kafka
  13. kafka:
  14. bootstrap-servers: localhost:9092 #kafka服务地址
  15. consumer:
  16. group-id: consumer1
  17. producer:
  18. key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
  19. value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
  20. client-id: producer1
  21. server:
  22. port: 8100

 这里是重头戏,我们必须指定所有通道对应的消息主题,同时指定默认的binder为kafka,紧接着定义Spring-kafka的外部化配置,在这里指定producer的序列化类为ByteArraySerializer

启动程序成功后,我们访问 http://localhost:8100/sendMsg?content=2 即可得到如下结果

v2-5a8340d3ce6befc4b25a7f1819d3ca14_b.png
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/1022086
推荐阅读
相关标签
  

闽ICP备14008679号