赞
踩
一、添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.edu.tju</groupId>
<artifactId>springbootkafka</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.6</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
</dependencies>
</project>
二、通过application.yml配置kafka stream
spring:
kafka:
streams:
application-id: test-kafka-stream
bootstrap-servers: xx.xx.xx.xx:9092
# auto-startup: true
properties:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
timestamp:
extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
consumer:
group-id: abc
bootstrap-servers: xx.xx.xx.xx:9092
server:
port: 9011
或者定义一个Bean来配置kafka stream
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "abc");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
三、启动类加@EnableKafkaStreams
package cn.edu.tju;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;
@SpringBootApplication
@EnableKafkaStreams
public class Start {
public static void main(String[] args) {
SpringApplication.run(Start.class,args);
}
}
四、定义kafka stream bean
//@Bean
public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
KStream<String, String> stream = streamsBuilder.stream("testTopic3");
stream.map(this::uppercaseValue).to("testTopic2", Produced.with(Serdes.String(),
Serdes.String()));
return stream;
}
private KeyValue<String, String> uppercaseValue(String key, String value) {
return new KeyValue<>(key, value.toUpperCase());
}
@Bean
public KStream<Object, Object> kStream2(StreamsBuilder streamsBuilder) {
KStream<Object, Object> stream = streamsBuilder.stream("testTopic3");
stream.map((key, value) -> {
System.out.println("从流中抽取到了消息:" + value) ;
return new KeyValue<>(key, value+"hahah") ;
}).to("testTopic2") ;
return stream;
}
五、定义controller用来生产消息和消费消息
package cn.edu.tju.controller;
import cn.edu.tju.domain.Student;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/addMessage/{info}")
public String addMessage(@PathVariable("info")String info){
kafkaTemplate.send("testTopic3","test",info);
return "ok";
}
}
package cn.edu.tju.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class MyConsumer {
@KafkaListener(topics = "testTopic")
public void processMessage(String content) {
System.out.println("received......"+content);
}
@KafkaListener(topics = "testTopic2")
public void processMessage2(String content) {
System.out.println("received[2]......"+content);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。