赞
踩
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
spring.application.name=springboot-kafka-02 server.port=8080 # 用于建立初始连接的broker地址 spring.kafka.bootstrap-servers=localhost:9092 # producer用到的key和value的序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 默认的批处理记录数 spring.kafka.producer.batch-size=16384 # 32MB的总发送缓存 spring.kafka.producer.buffer-memory=33554432 # consumer用到的key和value的反序列化类 spring.kafka.consumer.key- deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value- deserializer=org.apache.kafka.common.serialization.StringDeserializer # consumer的消费组id spring.kafka.consumer.group-id=spring-kafka-02-consumer # 是否自动提交消费者偏移量 spring.kafka.consumer.enable-auto-commit=true # 每隔100ms向broker提交一次偏移量 spring.kafka.consumer.auto-commit-interval=100 # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量 spring.kafka.consumer.auto-offset-reset=earliest
- @SpringBootApplication
- public class SpringbootkafkaApplication {
- public static void main(String[] args) {
- SpringApplication.run(SpringbootkafkaApplication.class, args);
- }
- }
同步等待broker的响应
- @RestController
- public class KafkaSyncProducerController {
-
- @Autowired
- private KafkaTemplate<Integer,String> template;
-
- @RequestMapping("send/sync/{message}") public String sendSync(@PathVariable String message) {
- ListenableFuture<SendResult<Integer, String>> future = template.send(new ProducerRecord<Integer, String>(
- "topic_1",
- 0,
- 1,
- message
- ));
- try {
- SendResult<Integer, String> result = future.get();
- RecordMetadata metadata = result.getRecordMetadata();
- System.out.println(metadata.serializedKeySize());
- System.out.println(metadata.serializedValueSize());
- System.out.println(metadata.topic());
- System.out.println(metadata.partition());
- System.out.println(metadata.offset());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- return "success";
- }
-
- }
添加回调,异步等待响应
- @RestController
- public class KafkaASyncProducerController {
-
- @Autowired
- private KafkaTemplate<Integer,String> template;
-
- @RequestMapping("send/sync/{message}") public String sendSync(@PathVariable String message) {
- ListenableFuture<SendResult<Integer, String>> future = template.send(new ProducerRecord<Integer, String>(
- "topic_1",
- 0,
- 1,
- message
- ));
- future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
- @Override
- public void onFailure(Throwable ex) {
- System.out.println("消息发送失败!"+ ex);
- }
-
- @Override
- public void onSuccess(SendResult<Integer, String> result) {
- RecordMetadata metadata = result.getRecordMetadata();
- System.out.println(metadata.serializedKeySize());
- System.out.println(metadata.serializedValueSize());
- System.out.println(metadata.topic());
- System.out.println(metadata.partition());
- System.out.println(metadata.offset());
- System.out.println("消息发送成功!");
- }
- });
- return "success";
- }
-
- }
- @Component
- public class MyConsumer {
-
- @KafkaListener(topics="topic_1")
- public void onMessage(ConsumerRecord<Integer,String> record){
- System.out.println(record.topic());
- System.out.println(record.offset());
- System.out.println(record.partition());
- System.out.println(record.serializedKeySize());
- System.out.println(record.serializedValueSize());
- System.out.println(record.key());
- System.out.println(record.value());
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。