赞
踩
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- server:
- port: 8080
- spring:
- kafka:
- bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:90947
- producer: # ⽣产者
- retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送
- batch-size: 16384
- buffer-memory: 33554432
- acks: 1
- # 指定消息key和消息体的编解码⽅式
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer: # 消费者
- group-id: default-group
- enable-auto-commit: false
- auto-offset-reset: earliest
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- max-poll-records: 500
- listener:
- # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
- # RECORD
- # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
- # BATCH
- # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
- # TIME
- # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
- # COUNT
- # TIME | COUNT 有⼀个条件满⾜时提交
- # COUNT_TIME
- # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
- # MANUAL
- # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
- # MANUAL_IMMEDIATE
- ack-mode: MANUAL_IMMEDIATE
- redis:
- host: 172.16.253.21
- package com.qf.kafka.spring.boot.demo.controller;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- @RestController
- @RequestMapping("/msg")
- public class MyKafkaController {
- private final static String TOPIC_NAME = "my-replicated-topic";
- @Autowired
- private KafkaTemplate<String,String> kafkaTemplate;
- @RequestMapping("/send")
- public String sendMessage(){
- kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
- return "send success!";
- }
- }
- package com.qf.kafka.spring.boot.demo.consumer;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.support.Acknowledgment;
- import org.springframework.stereotype.Component;
- @Component
- public class MyConsumer {
- @KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
- public void listenGroup(ConsumerRecord<String, String> record,
- Acknowledgment ack) {
- String value = record.value();
- System.out.println(value);
- System.out.println(record);
- //⼿动提交offset
- ack.acknowledge();
- }
- }
- @KafkaListener(groupId = "testGroup", topicPartitions = {
- @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
- @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets =
- @PartitionOffset(partition = "1", initialOffset = "100")) },concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议⼩于等于分区总数
- public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack)
- {
- String value = record.value();
- System.out.println(value);
- System.out.println(record);
- //⼿动提交offset
- ack.acknowledge();
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。