赞
踩
此文章只展示 spring boot整合 kafka的操作,不讲原理
首先认识Kafka的名词
1、Producer:生产者,向kafka发送消息的角色
2、Consumer:消费者,从Kafka中获取消息的角色
3、Consumer Group:消费者组:消费者组 中有多个消费者,消费者一定属于某个消费组,一个分区的消息只能由一个消费者组中的一个消费者消费
4、Broker:一个Kafka服务器就是 broker,一个Kafka集群有多个broker组成,一个broker可以有多个Topic(主题)
5、Topic:主题,生产者和消费者分别向 Kafka的主题发送和接收消息,类似 rabbitmq的队列
6、Partition:分区,非常大的数据发送到一个Topic中,此时效率性能很低,可以将这个topic分到不同的broker上,并且每个broker上一个topic分为不同的区(partition),每个分区是有序的队列(分区有序,不能保证全局有序),通过不同的broker和不同的partition,发送接收消息,提高性能。
7、Replica:副本,为保证集群中某个节点发送故障,节点上的分区数据不丢失,Kafka提供了副本机制,一个Topic的每个分区都有若干个副本,一个leader 和多个follower
8、Leader:每个分区多个副本的主角色,生产者发送的对象,消费者消费的对象都是leader
9、Follower:每个分区多个副本的副角色,当主角色leader挂了,某个Follower会成为新的leader,它实时同步leader的数据,保持一致性。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=127.0.0.1:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 声明事务
spring.kafka.producer.transaction-id-prefix=kafka_tx.
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
注入 KafkaTemplate 这个kafka的操作工具对象,send方法,参数为:主题,和消息内容
,将消息内容发送到对应的 topic上。
@Controller
public class TestController {
@Autowired(required = false)
private KafkaTemplate<Object,Object> template;
@ResponseBody
@RequestMapping("/kafka")
public void demo(){
template.send("topic-input","11111");
}
}
@KafkaListener注解的使用
监听订阅的主题,获取消息
此处的@KafkaListener注解中的id 表示消费组,topics代表监听的主题
@Component
@Slf4j
public class KafkaCustom {
@KafkaListener(id = "webGroup",topics = "topic-input")
public void getKafkaInfo(String input){
log.info("--topic-input--" + input);
}
}
测试结果:
调用接口,发送主题消息,消费者成功获取该主题的消息内容
注意:此时的模式为 点对点,一个消息只能由一个消费者接收
包括:
1、获取分组下的表述信息
2、创建消费者
3、获取topic的lag
4、创建topic,指定partition
5、删除topic
6、列出所有topic名称
7、获取指定topic的分区数
8、修改指定topic的分区数
9、询Topic的配置信息
10、修改Topic的配置信息
11、修改Topic的配置信息
package com.springboot.kafka.service;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
@Service
public class KafkaService {
@Autowired
private ConsumerFactory<Long, String> consumerFactory;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaProperties kafkaProperties;
/**
* 获取分组下的表述信息
**/
private long[] getDescribe(String topic) {
long[] describe = new long[3];
Consumer<Long, String> consumer = createConsumer();
List<PartitionInfo> partitionInfos = kafkaTemplate.partitionsFor(topic);
List<TopicPartition> tp = new ArrayList<>();
partitionInfos.forEach(str -> {
TopicPartition topicPartition = new TopicPartition(topic, str.partition());
tp.add(topicPartition);
long logEndOffset = consumer.endOffsets(tp).get(topicPartition);
consumer.assign(tp);
//consumer.position(topicPartition);
long currentOffset = consumer.position(topicPartition);
//System.out.println("logEndOffset : " + logEndOffset + ", currentOffset : "+ currentOffset);
describe[0] += currentOffset;
describe[1] += logEndOffset;
describe[2] = describe[1] - describe[0];
tp.clear();
});
//System.out.println(Arrays.toString(describe));
return describe;
}
/**
* 创建消费者
**/
private Consumer<Long, String> createConsumer() {
return this.consumerFactory.createConsumer();
}
/**
* 获取topic的lag
* @param topic
* @return
*/
public Long getLag(String topic) {
return getDescribe(topic)[2];
}
/**
* 创建topic,指定partition
* @param topicName
* @param numPar
* @return
*/
public boolean createToipc(String topicName, int numPar) {
AdminClient client = AdminClient.create(this.kafkaProperties.buildAdminProperties());
if(client !=null) {
try {
Collection<NewTopic> newTopics = new ArrayList<>(1);
newTopics.add(new NewTopic(topicName, numPar, (short) 1));
client.createTopics(newTopics);
}
catch (Throwable e) {
e.printStackTrace();
return false;
}
finally {
client.close();
}
}
return true;
}
/**
* 删除topic
* @param topic
* @return
* @throws Exception
*/
public boolean deleteTopic(String topic) {
AdminClient client = AdminClient.create(this.kafkaProperties.buildAdminProperties());
// 服务端server.properties需要设置delete.topic.enable=true,才可以使用同步删除,否则只是将主题标记为删除
try {
client.deleteTopics(Arrays.asList(topic));
}
catch (Throwable e) {
e.printStackTrace();
return false;
}
finally {
client.close();
}
return true;
}
/**
* 列出所有topic名称
* @return
*/
public String listTopics() {
AdminClient client = AdminClient.create(this.kafkaProperties.buildAdminProperties());
String r = "";
if (client != null) {
try {
ListTopicsResult result = client.listTopics();
Set<String> topics = result.names().get();
r = topics.toString();
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
}
return r;
}
/**
* 获取指定topic的分区数
* @param topic
* @return
*/
public int getPartition(String topic) {
AdminClient client = AdminClient.create(this.kafkaProperties.buildAdminProperties());
int num = 0;
try {
TopicDescription description = client.describeTopics(Arrays.asList(topic)).all().get().get(topic);
//r = description.toString();
num = description.partitions().size();
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
return num;
}
/**
* 修改指定topic的分区数
* @param topic
* @param numPartitions
* @return:如果指定的新分区数小于现有分区数,不成功,返回false
*/
public boolean updatePartitions(String topic, Integer numPartitions) {
AdminClient client = AdminClient.create(this.kafkaProperties.buildAdminProperties());
NewPartitions newPartitions = NewPartitions.increaseTo(numPartitions);
Map<String, NewPartitions> map = new HashMap<>(1, 1);
map.put(topic, newPartitions);
try {
client.createPartitions(map).all().get();
}
catch (Throwable e) {
e.printStackTrace();
return false;
}
finally {
client.close();
}
return true;
}
/**
* 查询Topic的配置信息
*/
public void describeConfig() {
AdminClient client = AdminClient.create(this.kafkaProperties.buildAdminProperties());
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test1");
Collection<ConfigResource> coll = new ArrayList<ConfigResource>();
coll.add(configResource);
DescribeConfigsResult result = client.describeConfigs(coll);
try {
Map<ConfigResource, Config> map = result.all().get();
map.forEach((key, value) ->
System.out.println("name: " + key.name() + ", desc: " + value));
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
}
/**
* 修改Topic的配置信息
*/
public void incrementalAlterConfig() {
// 指定ConfigResource的类型及名称
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "MyTopic");
Collection<ConfigResource> coll = new ArrayList<ConfigResource>();
coll.add(configResource);
// 配置项同样以ConfigEntry形式存在,只不过增加了操作类型
// 以及能够支持操作多个配置项,相对来说功能更多、更灵活
Collection<AlterConfigOp> configs = new ArrayList<AlterConfigOp>();
configs.add(new AlterConfigOp(
new ConfigEntry("preallocate", "false"),
AlterConfigOp.OpType.SET
));
AdminClient client = AdminClient.create(this.kafkaProperties.buildAdminProperties());
Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>();
configMaps.put(configResource, configs);
AlterConfigsResult result = client.incrementalAlterConfigs(configMaps);
try {
System.out.println(result.all().get());
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
}
/**
* 修改Topic的配置信息
*/
public void alterConfig() {
// 指定ConfigResource的类型及名称
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test1");
// 配置项以ConfigEntry形式存在
Collection<ConfigEntry> coll = new ArrayList<ConfigEntry>();
coll.add(new ConfigEntry("preallocate", "true"));
Config config = new Config(coll);
AdminClient client = AdminClient.create(this.kafkaProperties.buildAdminProperties());
Map<ConfigResource, Config> configMaps = new HashMap<>();
configMaps.put(configResource, config);
AlterConfigsResult result = client.alterConfigs(configMaps);
try {
System.out.println(result.all().get());
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
client.close();
}
}
}
方式一、
@RequestMapping("/kafka")
@ResponseBody
public String sendMessage2( ) {
template.send("topic-test", "111111").addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
System.out.println("消息发送到的topic:" + topic);
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
System.out.println("消息发送到的分区:" + partition);
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("消息在分区内的offset:" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
return "end~";
}
结果:
方式二:异步获取
@RequestMapping("/kafka/callbackTwo")
@ResponseBody
public void sendMessage3() {
template.send("topic-test", "22222").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<Object, Object> result) {
System.out.println("发送消息成功:主题:" + result.getRecordMetadata().topic() + "-分区:"
+ result.getRecordMetadata().partition() + "-偏移量:" + result.getRecordMetadata().offset());
}
});
}
结果:
kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
③ patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,
配置自定义分区
# 自定义分区器
spring.kafka.producer.properties.partitioner.class=com.springboot.kafka.config.SelfPartitioner
代码:
package com.springboot.kafka.config;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区器
*/
public class SelfPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 0就是指定的分区数
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
主要是在配置文件中开启事务
spring.kafka.producer.transaction-id-prefix=kafka_tx.
spring.kafka.producer.retries=3
spring.kafka.producer.acks=-1
不使用事务实例:
@RequestMapping("/kafka/transactiotn")
@ResponseBody
public void sendMessage7() {
// 不声明事务:后面报错但前面消息已经发送成功了
template.send("topic1", "test executeInTransaction");
throw new RuntimeException("fail");
}
这个是不声明事务的,后面报错,消息也会发出去
事务代码1:
使用事务必须在配置文件中配置开始事务,才可以生效
@RequestMapping("/kafka/transactiotn")
@ResponseBody
public void sendMessage7() {
// 声明事务:后面报错消息不会发出去
template.executeInTransaction(operations -> {
operations.send("topic1", "test executeInTransaction");
throw new RuntimeException("fail");
});
}
声明事务后,报错时消息不会发出去
事务代码2
使用事务必须在配置文件中配置开始事务,才可以生效
@RequestMapping("/kafka/transactiotn")
@ResponseBody
@Transactional(rollbackFor = RuntimeException.class)
public void sendMessage7() {
// 不声明事务:后面报错但前面消息已经发送成功了
template.send("topic1", "test executeInTransaction");
throw new RuntimeException("fail");
}
使用 注解@Transactional(rollbackFor = RuntimeException.class) 也是可以的
注意:一旦开启事务回滚,每个生产者发送消息的接口 都要有@Transactional注解,声明事务,否则会报错
监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。
/**
* @Title 指定topic、partition、offset消费
* @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
* @Author long.yuan
* @Date 2020/3/22 13:38
* @Param [record]
* @return void
**/
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
属性介绍:
① id:消费者ID;
② groupId:消费组ID;
③ topics:监听的topic,可监听多个;
④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
注意:topics和topicPartitions不能同时使用;
开启批量消费,一个消费者监听 多个主题
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
topics中,监听多个 topic
@KafkaListener(id = "consumer1",groupId = "lsgroup",topics = {"topic1","topic2"})
public void getKafkaInfo3(List<ConsumerRecord<?,?>> records){
System.out.println("批量消费:records.size==" + records.size());
for (ConsumerRecord<?,?> record : records) {
System.out.println("---" + record.value());
}
}
@RequestMapping("/kafka")
@ResponseBody
public String sendMessage2() {
template.send("topic1","11111");
template.send("topic2","22222");
return "end~";
}
首先写个异常处理配置类
异常处理器的message.getPayload()也可以拿到各条消息的信息
@Configuration
@Slf4j
public class kafkaConfig {
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
return null;
}
};
}
}
在kafka监听注解中使用上述的拦截实例
@Component
@Slf4j
public class ErrorListener11 {
@KafkaListener(id = "err", topics = "topic1", errorHandler = "consumerAwareErrorHandler")
public void errorListener(String data) {
log.info("topic.quick.error receive : " + data);
throw new RuntimeException("fail");
}
}
结果
或者【这个不会打印;log】
@KafkaListener(id = "err", topics = "topic1", errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) {
log.info("111----"+record.toString());
throw new RuntimeException("简单消费-模拟异常");
}
@KafkaListener(topics = {"topic2","topic3"},errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
System.out.println("批量消费一次...");
throw new Exception("批量消费-模拟异常");
}
此处同时监听两个topic的消息,
依旧可以获取,对应topic的消息体
消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
在配置类中配置监听器容器工厂中的 消息过滤器----测试过滤消息体为奇数的topic
package com.springboot.kafka.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Configuration
@Slf4j
public class kafkaConfig {
@Autowired
ConsumerFactory consumerFactory;
/**
* 消费者异常处理拦截器
* @return
*/
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
// 相应 topic的消息体
log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
return null;
}
};
}
// 消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
// 消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
// 测试,过滤奇数
if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}
}
监听器中使用此过滤器
// 消息过滤监听
@KafkaListener(topics = {"topic4"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
生产者
for (int i = 1;i<11;i++ ) {
template.send("topic4",String.valueOf(i));
}
结果 :过滤了奇数的消息体,只保留了偶数的消息
A从topicA中获取消息,经过处理后,转发到TopicB,再由B消费者监听处理消息
使用注解:@SendTo()
@KafkaListener(topics = "topic1")
@SendTo("topic2")
public String getKafkaInfo2(String input){
log.info("--topic1--" +input);
return input + "转发到2";
}
@KafkaListener(topics = "topic2")
public void getKafkaInfo1(String input){
log.info("--topic2--" + input);
}
测试:
template.send("topic1","11111");
结果:
默认情况下:项目运行后,监听器就开始启动。下面开始实现,定时启动或停止监听器 使用KafkaListenerEndpointRegistry
① 禁止监听器自启动;
在配置类中 配置禁止自动开启
// 监听器容器工厂
// 设置消息过滤器
// 设置禁止 监听器自启动
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
// 设置禁止 监听器自启动
factory.setAutoStartup(false);
// 消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
// 测试,过滤奇数
if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}
② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
首先在 启动类加上注解 @EnableScheduling 开启定时任务
然后 编写定时任务
根据监听器的Id进行 开启和关闭监听器
package com.springboot.kafka.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class CronTimer {
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired
private KafkaListenerEndpointRegistry registry;
// 监听器
@KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "filterContainerFactory")
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
// 定时启动监听器 每5秒钟开启
@Scheduled(cron = "*/5 * * * * ?")
public void startListener() {
System.out.println("启动监听器...");
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("timingConsumer").isRunning()) {
registry.getListenerContainer("timingConsumer").start();
}
//registry.getListenerContainer("timingConsumer").resume();
}
// 定时停止监听器
@Scheduled(cron = "*/4 * * * * ? ")
public void shutDownListener() {
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}
结果
可在配置文件中配置 spring.kafka.producer.acks的值
要是发送不成功,在配置重试次数后,还是不成功,生产者有回调方法,在发送失败中在进行处理
1、手动提交Ack模式
首先关闭自动提交,并设置consumer的消费模式
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
监听器:
手动提交:ack.acknowledge();
消息延迟:ack.nack(1000);
@KafkaListener( topics = "topic11")
public String listen(String input, Acknowledgment ack) {
log.info("input value: {}", input);
if ("kl".equals(input)) {
// 手动提交
ack.acknowledge();
}
return "successful";
}
2、上述的是全局配置,如果不想全局配置手动确认消费的话,可以这样
在配置类中设置,手动确认 manual
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
@Bean
public ConcurrentKafkaListenerContainerFactory<?,?> filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
// 设置消费者手动提交确认
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// 设置消费者消息延迟5秒
factory.getContainerProperties().setAckTime(5000);
// 设置禁止 监听器自启动
factory.setAutoStartup(false);
// // 消息过滤策略
// factory.setRecordFilterStrategy(consumerRecord -> {
// // 测试,过滤奇数
// if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
// return false;
// }
// //返回true消息则被过滤
// return true;
// });
return factory;
}
在 监听器中设置这个配置类
@KafkaListener(containerFactory = "filterContainerFactory" ,
topics = "topic11",
autoStartup="true")
public String listen(String input, Acknowledgment ack) {
log.info("input value: {}", input);
if ("kl".equals(input)) {
// 手动提交
ack.acknowledge();
}
return "successful";
}
默认情况下 @KafkaListener的参数 autoStartup = “true” ,也就是自动开启消费,
我们可以通过 KafkaListenerEndpointRegistry进行干预他的声明周期:
start():开启
pause() :停止
resume():继续
@GetMapping("/send/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendFoo(@PathVariable String input) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
template.send(record);
}
@Autowired
private KafkaListenerEndpointRegistry registry;
@GetMapping("/stop/{listenerID}")
public void stop(@PathVariable String listenerID){
registry.getListenerContainer(listenerID).pause();
}
@GetMapping("/resume/{listenerID}")
public void resume(@PathVariable String listenerID){
registry.getListenerContainer(listenerID).resume();
}
@GetMapping("/start/{listenerID}")
public void start(@PathVariable String listenerID){
registry.getListenerContainer(listenerID).start();
}
@KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")
public String listen(String input) {
log.info("input value: {}", input);
return "successful";
}
最初,监听器的autoStartup 为false,表示不开启消费,这个时候,发送消息时,是消费不了消息的。‘
当我们调用 start方法,开启消费时,这个时候发送消息时可以接收的
当我们调用 stop方法,停止消费时,这个时候发送消息时不可以消费的
当我们调用 resume方法时,继续消费,这个时候会将上个停止消费发送的消息接收。
可以设置当消费消息出现异常的时候,重试这个消息,并设置重试达到多少次次数后,让消息进入预定好的 topic中,也就是死信队列。 死信队列的Topic的规则是,业务Topic名字+“.DLT”。
参考
同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。这也是kafka用来实现一个topic消息的广播和单播的手段,如果需要实现广播,一个consumer group内只放一个消费者即可,要实现单播,将所有的消费者放到同一个consumer group即可。
kafka原理
window10下安装 zookeeper和kafka[版本一直!]
kafka其他操作可以参考这个的专栏【后续有时间的话,把这个操作一遍!!!】
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。