赞
踩
基本/核心概念
Broker
Kafka的服务端程序,可以认为一个mq节点就是一个broker
broker存储topic的数据
Producer生产者
创建消息Message,然后发布到MQ中
该角色将消息发布到Kafka的topic中
Consumer消费者:
消费队列里面的消息
ConsumerGroup消费者组
同个topic, 广播发送给不同的group,一个group中只有一个consumer可以消费此消息
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思
Partition分区
kafka数据存储的基本单元,topic中的数据分割为一个或多个partition,每个topic至少有一个partition,是有序的
一个Topic的多个partitions, 被分布在kafka集群中的多个server上
消费者数量 <=小于或者等于Partition数量
Replication 副本(备胎)
同个Partition会有多个副本replication ,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务
默认每个topic的副本都是1(默认是没有副本,节省资源),也可以在创建topic的时候指定
如果当前kafka集群只有3个broker节点,则replication-factor最大就是3了,如果创建副本为4,则会报错
ReplicationLeader、ReplicationFollower
Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互
ReplicationFollower只是做一个备份,从replicationLeader进行同步
ReplicationManager
负责Broker所有分区副本信息,Replication 副本状态切换
offset
每个consumer实例需要为他消费的partition维护一个记录自己消费到哪里的偏移offset
kafka把offset保存在消费端的消费者组里
kafka特点
多订阅者
一个topic可以有一个或者多个订阅者
每个订阅者都要有一个partition,所以订阅者数量要少于等于partition数量
高吞吐量、低延迟: 每秒可以处理几十万条消息
高并发:几千个客户端同时读写
容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败
扩展性强:支持热扩展
实例代码(jdk11+kafka2.8)
package net.xdclass.kafkatest;
import org.apache.kafka.clients.admin.*;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* @Author NJUPT wly
* @Date 2021/8/14 12:15 上午
* @Version 1.0
*/
public class KafkaAdminTest {
private static final String TOPIC_NAME = "xdclass-sp-topic-1";
/**
* 设置admin客户端
*/
public static AdminClient initAdminClient(){
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
return AdminClient.create(properties);
}
@Test
public void createTopicTest(){
AdminClient adminClient = initAdminClient();
//指定分区数据,副本数量
NewTopic newTopic = new NewTopic(TOPIC_NAME,5,(short) 1);
CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
try {
//future 等待创建
result.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
@Test
public void listTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
Set<String> topics = listTopicsResult.names().get();
for (String name : topics){
System.out.println(name);
}
}
@Test
public void delTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient =initAdminClient();
DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList("xdclass-sp-topic"));
result.all().get();
}
@Test
public void detailTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(TOPIC_NAME));
Map<String,TopicDescription> stringTopicDescriptionMap = result.all().get();
Set<Map.Entry<String,TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
entries.forEach((entry)-> System.out.println("name:"+entry.getKey()+",des"+entry.getValue()));
}
@Test
public void incrPartitionTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String,NewPartitions> map = new HashMap<>();
map.put(TOPIC_NAME,newPartitions);
CreatePartitionsResult result = adminClient.createPartitions(map);
result.all().get();
}
}
package net.xdclass.kafkatest;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.*;
/**
* @Author NJUPT wly
* @Date 2021/8/15 10:43 上午
* @Version 1.0
*/
public class KafkaConsumerTest {
private static final String TOPIC_NAME = "xdclass-sp-topic-1";
public static Properties getProperties(){
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"xdclass-g-1");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
// properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
return properties;
}
@Test
public void simpleConsumerTest(){
Properties properties = getProperties();
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Collections.singleton(TOPIC_NAME));
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records){
System.err.printf("topic=%s,offset=%d,key=%s %n",record.topic(),record.offset(),record.key(),record.value());
}
// kafkaConsumer.commitSync();
if (!records.isEmpty()){
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null){
System.err.println("手动提交success"+offsets.toString());
} else {
System.err.println("手动提交fail"+offsets.toString());
}
}
});
}
}
}
}
package net.xdclass.kafkatest;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @Author NJUPT wly
* @Date 2021/8/14 4:12 下午
* @Version 1.0
*/
public class KafkaProductTest {
private static final String TOPIC_NAME = "xdclass-sp-topic-1";
public static Properties getProperties(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
// properties.put("bootstrap.server","127.0.0.1:9092");
properties.put("acks","all");
properties.put("retries",0);
properties.put("batch",16384);
properties.put("linger.ms",1);
properties.put("buffer.memory",33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
@Test
public void testSend(){
Properties properties = getProperties();
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
for (int i=0 ; i<3 ; i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,"xdclass-key"+i,"xdclass-value"+i));
try {
RecordMetadata metadata = future.get();
System.out.println("发送状态"+metadata.toString());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
@Test
public void testSendCallBack(){
Properties properties = getProperties();
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
for (int i=0 ; i<3 ; i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("发送状态"+recordMetadata.toString());
} else {
e.printStackTrace();
}
}
});
try {
RecordMetadata metadata = future.get();
System.out.println("发送状态"+metadata.toString());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
@Test
public void testSendPartition(){
Properties properties = getProperties();
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
for (int i=0 ; i<3 ; i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,4,"xdclass-key"+i,"xdclass-value"+i));
try {
RecordMetadata metadata = future.get();
System.out.println("发送状态"+metadata.toString());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
@Test
public void testSendP(){
Properties properties = getProperties();
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,"net.xdclass.kafkatest.config.PartitionerT");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
for (int i=0 ; i<3 ; i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass" + i, "xdclass-value" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("发送状态"+recordMetadata.toString());
} else {
e.printStackTrace();
}
}
});
try {
RecordMetadata metadata = future.get();
System.out.println("发送状态"+metadata.toString());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
}
logging:
config: classpath:logback.xml
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
retries: 1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
transaction-id-prefix: xdclass-tran-
consumer:
auto-commit-interval: 1S
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual_immediate
concurrency: 4
package net.xdclass.kafkatest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @Author NJUPT wly
* @Date 2021/8/16 1:05 下午
* @Version 1.0
*/
@Component
public class LIstener {
@KafkaListener(topics = {"user.register.topic"},groupId = "xdclass-test-gp2")
public void Onmessage(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC)String topic){
System.out.println("消费"+record.topic()+"-"+record.partition()+"-"+record.value());
ack.acknowledge();
}
}
package net.xdclass.kafkatest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author NJUPT wly
* @Date 2021/8/16 10:36 上午
* @Version 1.0
*/
@RestController
public class UserController {
private static final String TOPIC_NAME = "user.register.topic";
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@GetMapping("/api/v1/{num}")
public void sendMessage(@PathVariable("num")String num){
kafkaTemplate.send(TOPIC_NAME,"这是一个消息"+num).addCallback(success->{
assert success != null;
String topic = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
System.out.println(topic+partition+offset);
},failure->{
System.out.println("fail");
});
}
@GetMapping("/api/v1/tran")
public void sendMessage(int num){
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
@Override
public Object doInOperations(KafkaOperations<String, Object> kafkaOperations) {
kafkaOperations.send(TOPIC_NAME,"这是一个消息"+num);
return true;
}
});
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>net.xdclass</groupId>
<artifactId>kafkatest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafkatest</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.5.0</version>
</plugin>
</plugins>
</build>
</project>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。