赞
踩
目录
kafka的基本数据单元,由字节数组组成。可以理解成数据库的一条数据。
批次就是一组消息,把同一个主题和分区的消息分批次写入kafka,可以减少网络开销,提高效率;批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。
topic主题,kafka通过主题进行分类。主题可以理解成数据库的表或者文件系统里的文件夹。
partition分区可以理解成一个FIFO的消息队列。(同一个分区的消息保证顺序消费)
主题可以被分为若干分区,一个主题通过分区将消息存储在kafka集群中,提供横向扩展的能力。消息以追加的方式写入分区,每个分区保证先入先出的顺序读取。在需要严格保证消息顺序消费的场景下,可以将partition设置为1,即主题只有一个分区。
主题的分区策略有如下几种:
偏移量。
生产者offset:每个分区都有一个offset,叫做生产者的offset,可以理解为当前这个分区队列的最大值,下一个消息来的时候,就会将消息写入到offset这个位置。
消费者offset:每个消费者消费分区中的消息时,会记录消费的位置(offset),下一次消费时就会从这个位置开始消费。
broker为一个独立的kafka服务器;一个kafka集群里有多个broker。
broker接收来自生产者的消息,为消息设置偏移量,并将消息保存到磁盘。同时,broker为消费者提供服务,对读取分区的请求做出响应,返回已经保存到磁盘上的消息。(单个broker可以轻松处理数千个分区以及每秒百万级的消息量)。
集群中同一个主题的同一个分区,会在多个broker上存在;其中一个broker上的分区被称为首领分区,用于与生产者和消费者交互,其余broker上的分区叫做副本分区,用于备份分区数据,防止broker宕机导致消息丢失。
每个集群都有一个broker是集群控制器,作用如下:
生产者生产消息,消息被发布到一个特定的主题上。默认情况下,kafka会将消息均匀地分布到主题的所有分区上。分区策略有如下几种:
消费者通过偏移量来区分已经读过的消息,从而消费消息。消费者是消费组的一部分,消费组可以保证每个分区只能被一个消费者使用,避免重复消费。
- public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
- Map<String, Object> configs = new HashMap<>();
- // 设置连接Kafka的初始连接⽤到的服务器地址
- // 如果是集群,则可以通过此初始连接发现集群中的其他broker
- configs.put("bootstrap.servers", "node1:9092");
- // 设置key和value的序列化器
- configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
- configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- configs.put("acks", "1");
- KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
- // 用于封装Producer的消息
- ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
- "topic_1", // 主题名称
- 0, // 分区编号,现在只有⼀个分区,所以是0
- 0, // 数字作为key
- "message 0" // 字符串作为value
- );
- // 发送消息,同步等待消息的确认
- // producer.send(record).get(3_000, TimeUnit.MILLISECONDS);
-
- // 使用回调异步等待消息的确认
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- System.out.println(
- "主题:" + metadata.topic() + "\n"
- + "分区:" + metadata.partition() + "\n"
- + "偏移量:" + metadata.offset() + "\n"
- + "序列化的key字节:" + metadata.serializedKeySize() + "\n"
- + "序列化的value字节:" + metadata.serializedValueSize() + "\n"
- + "时间戳:" + metadata.timestamp()
- );
- } else {
- System.out.println("有异常:" + exception.getMessage());
- }
- }
- });
- // 关闭连接
- producer.close();
- }

消费者主要有KafkaConsumer对象,用于消费消息。Kafka不支持消息的推送,我们可以通过消息拉取(poll)方式实现消息的消费。KafkaConsumer主要参数如下:
- public static void main(String[] args) {
- Map<String, Object> configs = new HashMap<>();
- // 指定bootstrap.servers属性作为初始化连接Kafka的服务器。
- // 如果是集群,则会基于此初始化连接发现集群中的其他服务器。
- configs.put("bootstrap.servers", "node1:9092");
- // key和value的反序列化器
- configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
- configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- configs.put("group.id", "consumer.demo");
- // 创建消费者对象
- KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
-
- final Pattern pattern = Pattern.compile("topic_[0-9]");
- // 消费者订阅主题或分区
- // consumer.subscribe(pattern);
- // consumer.subscribe(pattern, new ConsumerRebalanceListener() {
- final List<String> topics = Arrays.asList("topic_1");
- consumer.subscribe(topics, new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- partitions.forEach(tp -> {
- System.out.println("剥夺的分区:" + tp.partition());
- });
- }
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- partitions.forEach(tp -> {
- System.out.println(tp.partition());
- });
- }
- });
- // 拉取订阅主题的消息
- final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
- // 获取topic_1主题的消息
- final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
- // 遍历topic_1主题的消息
- topic1Iterable.forEach(record -> {
- System.out.println("========================================");
- System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
- System.out.println("消息的key:" + record.key());
- System.out.println("消息的值:" + record.value());
- System.out.println("消息的主题:" + record.topic());
- System.out.println("消息的分区号:" + record.partition());
- System.out.println("消息的偏移量:" + record.offset());
- });
- // 关闭消费者
- consumer.close();
- }

- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
- spring:
- kafka:
- bootstrap-servers: node1:9092 # 用于建立初始连接的broker地址
- producer:
- key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- batch-size: 16384 # 默认的批处理记录数
- buffer-memory: 33554432 # 32MB的总发送缓存
- consumer:
- key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- group-id: spring-kafka-02-consumer # consumer的消费组id
- enable-auto-commit: true # 是否自动提交消费者偏移量
- auto-commit-interval: 100 # 每隔100ms向broker提交一次偏移量
- auto-offset-reset: earliest # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
- @Configuration
- public class KafkaConfig {
- @Bean
- public NewTopic topic1() {
- return new NewTopic("ntp-01", 5, (short) 1);
- }
- @Bean
- public NewTopic topic2() {
- return new NewTopic("ntp-02", 3, (short) 1);
- }
- }
- @RestController
- public class KafkaSyncProducerController {
- @Autowired
- private KafkaTemplate template;
-
- @RequestMapping("send/sync/{message}")
- public String sendSync(@PathVariable String message) {
- ListenableFuture future = template.send(new ProducerRecord<Integer, String>("topic-spring-02", 0, 1, message));
- try {
- // 同步等待broker的响应
- Object o = future.get();
- SendResult<Integer, String> result = (SendResult<Integer, String>) o;
- System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- return "success";
- }
- }
-
- @RestController
- public class KafkaAsyncProducerController {
- @Autowired
- private KafkaTemplate<Integer, String> template;
-
- @RequestMapping("send/async/{message}")
- public String asyncSend(@PathVariable String message) {
- ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic-spring-02", 0, 3, message);
- ListenableFuture<SendResult<Integer, String>> future = template.send(record);
- // 添加回调,异步等待响应
- future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){
- @Override
- public void onFailure(Throwable throwable) {
- System.out.println("发送失败: " + throwable.getMessage());
- }
-
- @Override
- public void onSuccess(SendResult<Integer, String> result) {
- System.out.println("发送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());
- }
- });
- return "success";
- }
- }

- @Component
- public class MyConsumer {
-
- @KafkaListener(topics = "topic-spring-02")
- public void onMessage(ConsumerRecord<Integer, String> record) {
- Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);
- if (optional.isPresent()) {
- System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());
- }
- }
- }
以上内容为个人学习理解,如有问题,欢迎在评论区指出。
部分内容截取自网络,如有侵权,联系作者删除。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。