赞
踩
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
public class User {
private Integer id;
private String nickname;
private String password;
private Integer sex;
private String birthday;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getNickname() {
return nickname;
}
public void setNickname(String nickname) {
this.nickname = nickname;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public Integer getSex() {
return sex;
}
public void setSex(Integer sex) {
this.sex = sex;
}
public String getBirthday() {
return birthday;
}
public void setBirthday(String birthday) {
this.birthday = birthday;
}
}
package com.allen.kafka;
import com.allen.entity.User;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @ClassName: MySimpleProducer
* @Author: AllenSun
* @Date: 2022/12/23 下午1:01
*/
public class MySimpleProducer {
private final static String TOPIC_NAME="replicatedTopic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// (1)设置参数
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.19.11:9092,192.168.19.11:9093,192.168.19.11:9094");
// 把发送的key从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 把发送消息value从字符串序列化为字节数组
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// (2)创建生产消息的客户端,传入参数
Producer<String,String> producer=new KafkaProducer<String, String>(properties);
User user=new User();
user.setId(1);
user.setNickname("zhangsan");
user.setPassword("admin");
user.setSex(1);
user.setBirthday("2018-11-11");
// (3)创建消息
// key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
// 如果未指定分区,就会通过业务key的hash运算,算出消息往哪个分区上发
ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>(TOPIC_NAME,
user.getId().toString(), JSON.toJSONString(user));
// (4)发送消息,得到消息发送的元数据并输出
// 等待消息发送成功的同步阻塞方法
RecordMetadata metadata=producer.send(producerRecord).get();
System.out.println("同步方法发送消息结果:"+"topic->"+metadata.topic()+"|partition->"+metadata.partition()+"|offset->"+metadata.offset());
}
}
上述方法没有指定分区,所以会根据参数key值进行hash计算,算出要发给哪个分区
如果我们对key值进行随意的修改
ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>(TOPIC_NAME,
"keyString", "valueString");
然后再进行执行测试,可以看到这次的消息都发到0号分区里了
不指定分区的方式
ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>(TOPIC_NAME,
"keyString", "valueString");
指定分区的方式
ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>(TOPIC_NAME,0,
"keyString", "valueString");
这样所有的消息都会往partition0上发,就不会再使用默认的hash和key计算分区号了
生产者同步发消息,在收到kafka的ack告知发送成功之前,会一致处于阻塞状态,阻塞超过3s的时间如果还没有收到消息,生产者就会进行重试发送消息,重试的次数是3次。
package com.allen.kafka;
import com.alibaba.fastjson.JSON;
import com.allen.entity.User;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @ClassName: MySimpleProducer
* @Author: AllenSun
* @Date: 2022/12/23 下午1:01
*/
public class MySimpleProducer02 {
private final static String TOPIC_NAME="replicatedTopic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// (1)设置参数
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.19.11:9092,192.168.19.11:9093,192.168.19" +
".11:9094");
// 把发送的key从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 把发送消息value从字符串序列化为字节数组
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// (2)创建生产消息的客户端,传入参数
Producer<String,String> producer=new KafkaProducer<String, String>(properties);
User user=new User();
user.setId(1);
user.setNickname("zhangsan");
user.setPassword("admin");
user.setSex(1);
user.setBirthday("2018-11-11");
// (3)创建消息
// key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
// 如果未指定分区,就会通过业务key的hash运算,算出消息往哪个分区上发
ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>(TOPIC_NAME,
user.getId().toString(), JSON.toJSONString(user));
// (4)发送消息,得到消息发送的元数据并输出--同步发消息
// 等待消息发送成功的同步阻塞方法
try {
// 等待消息发送成功的同步阻塞方法
RecordMetadata metadata=producer.send(producerRecord).get();
// 阻塞
System.out.println("同步方法发送消息结果:"+"topic->"+metadata.topic()+"|partition->"+metadata.partition()+"|offset->"+metadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
// 1、记录日志,预警系统 +1
// 2、设置时间间隔1s,同步的方式再次发送,如果还不行,就日志预警+人工介入
Thread.sleep(1000);
try {
// 等待消息发送成功的同步阻塞方法
RecordMetadata metadata=producer.send(producerRecord).get();
} catch (Exception e1) {
// 人工介入了
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
异步发送,生产者发送完消息后就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback回调方法。
package com.allen.kafka;
import com.alibaba.fastjson.JSON;
import com.allen.entity.User;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @ClassName: MySimpleProducer
* @Author: AllenSun
* @Date: 2022/12/23 下午1:01
*/
public class MySimpleProducer02 {
private final static String TOPIC_NAME="replicatedTopic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// (1)设置参数
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.19.11:9092,192.168.19.11:9093,192.168.19" +
".11:9094");
// 把发送的key从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 把发送消息value从字符串序列化为字节数组
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// (2)创建生产消息的客户端,传入参数
Producer<String,String> producer=new KafkaProducer<String, String>(properties);
User user=new User();
user.setId(1);
user.setNickname("zhangsan");
user.setPassword("admin");
user.setSex(1);
user.setBirthday("2018-11-11");
// (3)创建消息
// key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
// 如果未指定分区,就会通过业务key的hash运算,算出消息往哪个分区上发
ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>(TOPIC_NAME,
user.getId().toString(), JSON.toJSONString(user));
// (5)异步发送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e!=null){
System.err.println("发送消息失败:"+e.getStackTrace());
}
if(recordMetadata!=null){
System.out.println("异步方法发送消息结果:"+"topic->"+recordMetadata.topic()+"|partition->"+recordMetadata.partition()+"|offset->"+recordMetadata.offset());
}
}
});
Thread.sleep(1000000000L);
}
}
在实际的使用中,同步发送消息用的更多,异步发送当消息发出后不等ack返回就直接往下执行了,有可能会出现消息丢失。
如果生产者发送消息后没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试,重试的次数3次。会有三个参数配置:
(1)-ack=0:生产者发送消息后,不用等任何分区读取信息,就可以直接返回ack。这种效率是最高的,但是也最容易丢失消息
(2)-ack=1:等1个分区收到消息后,也就是多副本之间的leader收到消息,并且把消息写入到本地的log文件中,才会返回ack给生产者。性能和安全性是最均衡的
(3)-ack=-1 / all:等所有分区收到消息后,也就是要等leader收到消息且写入本地log,且所有follower同步完消息写入本地log,才可以返回ack。里面有默认的配置min.insync.replicas=2(默认为1,推荐配置大于等于2)这种模式的安全性最高,但是效率也是最慢的
// 发出消息持久化机制参数
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 发送失败会重试,重试3次
properties.put(ProducerConfig.RETRIES_CONFIG,"3");
// 重试间隔设置,默认重试间隔100ms
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);
// 设置发送消息的本地缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// kafka本地线程会从缓冲区取数据,批量发送到broker,批量发送大小默认是16384,即16k,batch满16kb就发送出去
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// 默认是0,就是消息必须立即被发送,但这样会影响性能
// 一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch
// 如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去
// 如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
properties.put(ProducerConfig.LINGER_MS_CONFIG,10);
package com.allen.kafka;
import com.alibaba.fastjson.JSON;
import com.allen.entity.User;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @ClassName: MySimpleProducer
* @Author: AllenSun
* @Date: 2022/12/23 下午1:01
*/
public class MySimpleProducer03 {
private final static String TOPIC_NAME="replicatedTopic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// (1)设置参数
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.19.11:9092,192.168.19.11:9093,192.168.19" +
".11:9094");
// 发出消息持久化机制参数
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 发送失败会重试,默认重试间隔100ms
properties.put(ProducerConfig.RETRIES_CONFIG,"3");
// 重试间隔设置
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);
// 设置发送消息的本地缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// kafka本地线程会从缓冲区取数据,批量发送到broker,批量发送大小默认是16384,即16k,batch满16kb就发送出去
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// 默认是0,就是消息必须立即被发送,但这样会影响性能
properties.put(ProducerConfig.LINGER_MS_CONFIG,10);
// 把发送的key从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 把发送消息value从字符串序列化为字节数组
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// (2)创建生产消息的客户端,传入参数
Producer<String,String> producer=new KafkaProducer<String, String>(properties);
int msgNum = 5;
final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 1; i <= 5; i++) {
User user=new User();
user.setId(1);
user.setNickname("zhangsan");
user.setPassword("admin");
user.setSex(1);
user.setBirthday("2018-11-11");
// (3)创建消息
// key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
// 如果未指定分区,就会通过业务key的hash运算,算出消息往哪个分区上发
ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>(TOPIC_NAME,
user.getId().toString(), JSON.toJSONString(user));
// (5)异步发送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e!=null){
System.err.println("发送消息失败:"+e.getStackTrace());
}
if(recordMetadata!=null){
System.out.println("异步方法发送消息结果:"+"topic->"+recordMetadata.topic()+"|partition->"+recordMetadata.partition()+"|offset->"+recordMetadata.offset());
}
countDownLatch.countDown();
}
});
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.close();
}
}
package com.allen.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @ClassName: MyConsumer
* @Author: AllenSun
* @Date: 2022/12/23 下午8:52
*/
public class MySimpleConsumer01 {
private final static String TOPIC_NAME = "replicatedTopic";
private final static String CONSUMER_GROUP_NAME = "replicatedGroup";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.19.11:9092,192.168.19.11:9093,192.168.19" +
".11:9904");
// 消费分组名
properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records) {
System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
record.offset(),record.key(),record.value());
}
}
}
}
先启动消费者,然后启动生产者往kafka发送5条数据
可以看到生产者发送了5个消息,3个发到了partition1,2个发到了partition0
设置自动提交参数
// 是否自动提交offset,默认就是true
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// 自动提交offset的间隔时间
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
消费者poll到消息后,默认情况下,会自动向broker的_consumer_offsets主题提交当前主题-分区消费的偏移量。
自动提交会丢消息:因为如果消费者还没消费完poll下来的消息就自动提交不了偏移量,那么测试消费者挂了,于是下一个消费者会从已提交的offsets的下一个位置开始消费消息。之前未被消费的消息就丢失掉了
(1)设置手动提交参数
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
(2)在消费完消息后进行手动提交
1-手动同步提交
在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records) {
System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
record.offset(),record.key(),record.value());
}
// 上面所有的消息消费完
if(records.count()>0){
// 手动同步提交offset,当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitAsync();// 阻塞,提交成功
}
}
2-手动异步提交
在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records) {
System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
record.offset(),record.key(),record.value());
}
// 上面所有的消息消费完
if(records.count()>0){
// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e!=null){
System.err.println("Commit failed for "+ map);
System.err.println("Commit failed exception:"+ e.getStackTrace());
}
}
});
}
}
(1)默认情况下,消费者一次会poll 500 条消息
// 消费者建立了与broker之间的长连接,开始poll消息,默认一次poll 500 条消息
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
(2)代码中设置了长轮询的时间是1000毫秒
while (true) {
// 如果1s内每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息
// 如果超出了1s,则此次长轮询结束
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records) {
System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
record.offset(),record.key(),record.value());
}
(3)流程描述
1-如果一次poll到500条消息,就直接执行for循环遍历所有消息
2-如果这一次没有poll到500条消息,且时间在1s内,那么长轮询继续poll,要么到500条,要么到1s
3-如果多次poll都没达到500条消息,且1s时间到了,那么直接执行for循环
(4)从消费组中踢出较弱的消费者
如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,会造成性能开销,可以通过设置这个参数,让一次poll的消息条数少一点
// 可以根据消费速度的快慢来设置,因为如果两次poll的时间如果超过了30s的时间间隔
// kafka会认为其消费能力过弱,将其踢出消费组
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);
(5)消费者的健康状态检查
消费者每隔1s向kafka集群发送心跳,集群发现如果有超过10s没有续约的消费者,将其踢出消费组,触发该消费者的rebalance机制,把该分区交给消费组里的其他消费者进行消费。
// 消费者发送心跳的时间间隔
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);
// kafka如果超过10s没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000);
从分区的最近offset开始消费
// 消费者指定分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
// 消费者指定分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
// 消费者指定分区,并且从头开始消费
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
// 消费者指定分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
// 指定offset消费
consumer.seek(new TopicPartition(TOPIC_NAME,0),10);
// 消费者指定分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
// 指定offset消费
consumer.seek(new TopicPartition(TOPIC_NAME,0),10);
新消费组中的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第一次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量+1)
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
(1)Latest:默认的,消费新消息
(2)earliest:第一次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量+1)
(1)引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.allen</groupId>
<artifactId>kafka_springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka_springboot</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
(2)yml配置文件
server:
port: 8088
spring:
kafka:
bootstrap-servers: 192.168.19.11:9092,192.168.19.11:9093,192.168.19.11:9904
producer:
retries: 3 #设置大于0的值,则客户端会把发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false #是否为自动提交offset
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
ack-mode: manual_immediate #listner负责ack,每调用一次,就立即commit
# redis:
# host: 192.168.19.11
(3)生产者Controller
package com.allen.kafka_springboot.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName: MyKafkaController
* @Author: AllenSun
* @Date: 2022/12/24 上午12:42
*/
@RestController
@RequestMapping("/msg")
public class MyKafkaController {
private final static String TOPIC_NAME="springbootTopic";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/send")
public String sendMessage() {
kafkaTemplate.send(TOPIC_NAME,0,"key","this is message");
return "send success!";
}
}
(4)消费者
package com.allen.kafka_springboot.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* @ClassName: MyConsumer
* @Author: AllenSun
* @Date: 2022/12/24 上午12:48
*/
@Component
public class MyConsumer {
@KafkaListener(topics = "javaTopic",groupId = "replicatedGroup")
public void listenGroup(ConsumerRecord<String,String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手动提交offset
ack.acknowledge();
}
}
package com.allen.kafka_springboot.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* @ClassName: MyConsumer
* @Author: AllenSun
* @Date: 2022/12/24 上午12:48
*/
@Component
public class MyConsumer {
@KafkaListener(groupId = "replicatedGroup",topicPartitions = {
@TopicPartition(topic = "javaTopic",partitions = {"0","1"}),
@TopicPartition(topic = "replicatedTopic",partitions = "0",partitionOffsets = @PartitionOffset(partition = "1"
,initialOffset = "100"))
},concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费者数,建议小于等于分区总数
public void listenGroup01(ConsumerRecord<String,String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手动提交offset
ack.acknowledge();
}
}
Kafka集群中的broker在zk中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的controller,负责管理整个集群中的所有分区和副本的状态:
(1)当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本
(2)当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息
(3)当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。
前提是:消费者没有指明分区消费。当消费组里消费者和分区的关系发生变化,那么就会触发rebalance机制。这个机制会重新调整消费者消费哪个分区。
在触发rebalance机制之前,消费者消费哪个分区由三种策略:
(1)range:通过公示来计算某个消费者消费哪个分区
(2)轮询:大家轮着消费
(3)sticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整
HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW,consumer最多只能消费到HW所在的位置,另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能
(1)发送方:ack是1或者-1/all 可以防止消息丢失,如果要做到99.9999%,ack设成all,把min.insync.replicas配置成分区备份数
(2)消费方:把自动提交改为手动提交
一条消息被消费者消费多次。如果为了消息的不重复消费,而把生产端的重试机制关闭、消费端的手动提交改成自动提交,这样反而会出现消息丢失,那么可以直接在防治消息丢失的手段上再加上消费消息时的幂等性保证,就能解决消息的重复消费问题。
幂等性如何保证:
(1)mysql插入业务id作为主键,主键是唯一的,所以一次只能插入一条
(2)使用redis或zk的分布式锁(主流的方案)
(1)发送方:在发送时把ack不能设置0,关闭重试,使用同步发送,等到发送成功再发送下一条。确保消息是按顺序发送的。
(2)接收方:消息是发送到一个分区中,只能有一个消费组的消费者来接收信息。因此,kafka的顺序消费会牺牲掉性能。
消息积压会导致很多问题,如果磁盘被打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,就需要有相应的手段:
(1)方案一:在一个消费者中启动多个线程,让多个线程同时消费。(提升一个消费者的消费能力)
(2)方案二:如果方案一还不够的话,这个时候可以启动多个消费者,多个消费者部署在不同的服务器上。其实多个消费者部署在同一个服务器上也可以提高消费能力(充分利用服务器的CPU资源)
(3)方案三:让一个消费者去把收到的消息往另一个topic上发,另一个topic设置多个分区和多个消费者,进行具体的业务消费
延迟队列的应用场景:在订单创建成功后如果超过30min没有付款,则需要取消订单,此时可以用延迟队列来实现
(1)创建多个topic,每个topic表示延时的间隔
1-topic_5s:延时5s执行的队列
2-topic_1m:延时1分钟执行的队列
3-topic_30m:延时30分钟执行的队列
(2)消息发送者发送消息到相应的topic,并带上消息的发送时间
(3)消费者订阅相应的topic,消费时轮询消费整个topic中的消息
1-如果消息的发送时间,和消费的当前时间超过预设的值,比如30分钟
2-如果消息的发送时间,和消费的当前时间没有超过预设的值,则不消费当前的offset及之后的offset的所有消息都消费
3-下次继续消费该offset处的消息,判断时间是否已满足预设值
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。