赞
踩
之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。
新版提倡用函数式进行发送和消费信息
定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +
在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。
比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:
spring: kafka: bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092 cloud: stream: kafka: binder: brokers: ${spring.kafka.bootstrap-servers} binders: kafkahub: type: kafka environment: spring: cloud: stream: kafka: ${spring.cloud.stream.kafka.binder} default-binder: kafkahub function: definition: inputChannel,outputChannel bindings: inputChannel-in-0: binder: kafkahub destination: test-kafka-topic group: test-kafka-group content-type: text/plain outputChannel-out-0: binder: kafkahub destination: test-kafka-topic content-type: text/plain producer: partition-count: 3 #分区数目
此时消息生产者为:
@Resource
private StreamBridge streamBridge;
@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){
boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());
return send;
}
此时消息消费者为:
@Configuration public class KafkaChannel { @Resource private StreamBridge streamBridge; /** * inputChannel 消费者 * @author senfel * @date 2024/6/18 15:26 * @return java.util.function.Consumer<java.lang.String> */ @Bean public Consumer<Message<String>> inputChannel(){ return message -> { System.out.println("接收到消息Payloa:" + message.getPayload()); System.out.println("接收到消息Header:" + message.getHeaders()); }; }
}
我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:
正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费
本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.12.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>cce-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>seata-demo-order</name> <description>Demo project for Spring Boot</description> <properties> <java.version>8</java.version> <spring-cloud.version>Hoxton.SR12</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <version>3.2.4</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
spring: #kafka kafka: bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092 cloud: stream: kafka: # kafka配置 binder: brokers: ${spring.kafka.bootstrap-servers} auto-add-partitions: true #自动分区 auto-create-topics: true #自动创建主题 replication-factor: 3 #副本 min-partition-count: 3 #最小分区 bindings: outputChannel-out-0: producer: # 无限制重发不产生消息丢失 retries: Integer.MAX_VALUE #acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低 #acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中 #acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长 #可以设置的值为:all, -1, 0, 1 acks: all min: insync: replicas: 3 #感知副本数 inputChannel-in-0: consumer: concurrency: 1 #消费者数量 max-concurrency: 5 #最大消费者数量 recovery-interval: 3000 #3s 重连 auto-rebalance-enabled: true #主题分区消费者组成员自动平衡 auto-commit-offset: false #手动提交偏移量 enable-dlq: true # 开启 dlq队列 dlq-name: test-kafka-topic.dlq deserializationExceptionHandler: sendToDlq #异常加入死信 binders: # 与外部mq组件绑定 kafkahub: type: kafka environment: spring: cloud: stream: kafka: ${spring.cloud.stream.kafka.binder} default-binder: kafkahub #默认绑定 function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out) definition: inputChannel;outputChannel;dlqChannel bindings: # 通道绑定 inputChannel-in-0: binder: kafkahub destination: test-kafka-topic group: test-kafka-group content-type: text/plain consumer: maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3 backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行 backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2 backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s outputChannel-out-0: binder: kafkahub destination: test-kafka-topic content-type: text/plain producer: partition-count: 3 #分区数目 dlqChannel-in-0: binder: kafkahub destination: test-kafka-topic.dlq group: test-kafka-group content-type: text/plain consumer: maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3 backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行 backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2 backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s dlqChannel-out-0: binder: kafkahub destination: test-kafka-topic.dlq content-type: text/plain producer: partition-count: 3 #分区数目
import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import javax.annotation.Resource; import java.util.function.Consumer; /** * KafkaCustomer * @author senfel * @version 1.0 * @date 2024/6/18 15:22 */ @Configuration public class KafkaChannel { @Resource private StreamBridge streamBridge; /** * inputChannel 消费者 * @author senfel * @date 2024/6/18 15:26 * @return java.util.function.Consumer<java.lang.String> */ @Bean public Consumer<Message<String>> inputChannel(){ return message -> { System.out.println("接收到消息:" + message.getPayload()); System.out.println("接收到消息:" + message.getHeaders()); if(message.getPayload().contains("9")){ boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build()); System.err.println("向dlqChannel发送消息:"+send); } }; } /** * dlqChannel 死信消费者 * @author senfel * @date 2024/6/18 15:26 * @return java.util.function.Consumer<java.lang.String> */ @Bean public Consumer<Message<String>> dlqChannel(){ return message -> { System.out.println("死信dlqChannel接收到消息:" + message.getPayload()); System.out.println("死信dlqChannel接收到消息:" + message.getHeaders()); }; } }
@Resource
private StreamBridge streamBridge;
@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){
boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());
return send;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。