赞
踩
分布式事件流平台。希望不仅仅是存储数据,还能够数据存储、数据分析、数据集成等功能。消息队列(把数据从一方发给另一方),消息生产好了但是消费方不一定准备好了(读写不一致),就需要一个中间商来存储信息,kafka就是中间商
架构图如下:
名称 | 解释 |
Broker | 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 |
Topic | Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic |
Producer | 消息生产者,向Broker发送消息的客户端 |
Consumer | 消息消费者,从Broker读取消息的客户端 |
ConsumerGroup | 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息 |
Partition | 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的 |
offset | partition中每条消息的唯一编号 |
消息生产者,向broker发送消息,也称为发布者
读取消息的客户端
一个consumer group由多个consumer组成,消费者组可以消费某个分区中的所有消息,消费的消息不会立马被删除。也称为订阅者
逻辑上的区分,通过topic将消息进行分类,不同topic会被订阅该topic的消费者消费
特点:topic的一个分区只能被consumer group的一个consumer消费;同一条消息可以被多个消费者组消费,但同一个分区只能被某个消费者组中的一个消费者消费。
问题:topic消息非常多,消息会被保存在log日志文件中,文件过大
解决:分区
将一个topic中的消息分区来存储,有序序列,真正存放消息的消息队列
分区中的每条消息都有唯一的编号,用来唯一标识这一条消息(message)
每个分区都可以设置自己对应的副本(replication-factor参数),有一个主副本(leader)、多个从副本(follower)
单一职责。leader负责和生产消费者交互,follower负责副本拷贝,副本是为了保证消息存储安全性,当其中一个leader挂掉,则会从follower中选举出新的leader,提高了容灾能力,但是副本也会占用存储空间
动态集合,保存正在同步的副本集,是与leader同步的副本。如果某个副本不能正常同步数据或落后的数据比较多,会从副本集中把节点中剔除,当追赶上来了在重新加入。kafka默认的follower副本能够落后leader副本的最长时间间隔是10S
参数设置:replica.lag.time.max.ms
生产者生产好消息之后调用send()方法发送到broker端,broker将收到的消息存储的对应topic中的patition中,而broker中的消息实际上是存储在了commit-log文件中,消费者监听定时循环拉取消息
- package com.example;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
-
-
- public class MyProductor {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- //kafka的配置
- Properties properties = new Properties();
-
- //kafka服务器地址和端口
- properties.put("bootstrap.servers", "localhost:9092");
-
- //Producer的压缩算法使用的是GZIP
- //为什么要压缩?
- properties.put("compression.type","gzip");
-
- //指定发送消息的key和value的序列化类型
- properties.put("key.serializer", "org.apache.kafka.common,serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common,serialization.StringSerializer");
- //补充:为什么要序列化/反序列化?
-
- //实例化一个生产者对象,指定发送的主题、key、value、分区号等
- KafkaProducer<Object, Object> producer = new KafkaProducer<>(properties);
-
- //发送100条消息
- for (int i = 0; i < 100; i++) {
- //调用send方法,向kafka发送数据,并返回一个Future对象,通过该对象来获取结果
- Future<RecordMetadata> result = producer.send(new ProducerRecord<>("my-topic", Integer.toString(i),
- Integer.toString(i)));
- RecordMetadata recordMetadata = result.get();
- }
-
- //关闭生产者对象
- producer.close();
- }
- }
指定生产消息要达到的kafka服务器地址,压缩方式、序列化方式
Producer生产的每个消息都经过GZIP压缩,在传输的过程中能够节省网络传输带宽和Broker磁盘占用
数据在网络传输过程中都是以字节流的形式传输的,在生产者发送消息的时候需要将消息先进行序列化
生产者在发送消息前会对请求的消息进行拦截,起到过滤和处理的作用。
我们可以自定义拦截器,拦截器中定义自己需要的逻辑,满足个性化配置。比方说对消息进行加密解密、消息格式转换、消息路由等等
数据在网络传输过程中都是以字节流的形式传输的,在生产者发送消息的时候需要将消息先进行序列化
通过上面的操作生产者已经知道该往哪个主题、哪个分区发送这条消息了。
①、如果消息发送成功:broker收到消息之后会返回一个Future类型RecordMetadata对象,可以通过该对象来获取发送的结果,对象中记录了此条消息发送到的topic、partition、offset。
②、消息发送失败:错误消息。在收到错误消息之后会有尝试机制,尝试重新发送消息
但直接使用send(msg)会出现问题,调用之后会立即返回,如果因为网络等外界因素影响导致消息没有发送到broker,出现生产者程序丢失数据问题,只能通过处理返回的Future对象处理才能感知到。
对应的解决方案是我们可以使用send(msg,callbakc)的方式发哦是那个消息并设置回调函数
在发送消息后,会立即调用回调函数来处理发送结果,回调函数中定义了处理逻辑
前文中提到生产者发送到broker的消息都是基于topic进行分类的(逻辑上),而topic中的消息是以partition为单位存储的(物理上),每条消息都有自己的offset
每个partition都有一个commit log文件
如果commitlog文件很大的话可能导致一台服务器无法承担所有的数据量,机器无法存储,分区之后可以把不同的分区放在不同的机器上,相当于是分布式存储
每一个partition都对应了一个commit log文件,日志文件中存储了消息等信息,新到达的消息以追加的方式写入分区的末尾,然后以先入先出的顺序读取。
如果不停的一致向日志文件中写入消息,日志文件大小也是有上限的,所以kafka会定期的清理磁盘,有两种方式:
注意的是,Follower副本的数据同步是异步进行的,即Follower副本不需要等待数据同步完成才返回成功响应。这样可以提高消息的处理速度和吞吐量。但也意味着,在数据同步过程中,Follower副本可能会滞后于Leader副本一段时间,这个时间间隔称为追赶(lag)。Kafka提供了配置参数来控制同步和追赶的速度,以平衡数据的一致性和性能的需求。
- package com.example;
-
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Map;
- import java.util.Properties;
-
-
- public class Consumer {
- public static void main(String[] args) {
- Properties properties = new Properties();
- //要连接的kafka服务器
- properties.put("bootstrap.servers", "localhost:9092");
- //标识当前消费者所属的小组
- properties.put("group.id", "test");
-
- //---------位移提交(自动提交)----------
- //为true,自动定期地向服务器提交偏移量(offset)
- properties.put("enable.auto.commit", "true");
- //自动提交offset的间隔,默认是5000ms(5s)
- properties.put("auto.commit.interval.ms", "1000");
- //每隔固定实践消费者就会把poll获取到的最大偏移量进行自动提交
- //出现的问题:如果刚提交了offset,还没到5s,2s的时候就发生了均衡,导致分区会重新划分,此时offset是不准确的
-
-
- //key和value反序列化
- properties.put("key.serializer", "org.apache.kafka.common,serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common,serialization.StringSerializer");
-
- KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
- //指定consumer消费的主题(订阅多个)
- consumer.subscribe(Arrays.asList("my-topic", "bar"));
-
- //轮询向服务器定时请求数据
- while (true) {
- //拉取数据
- ConsumerRecords<Object, Object> records = consumer.poll(100);
- for (ConsumerRecord<Object, Object> record : records) {
- //同步提交:提交当前轮询的最大offset
- consumer.commitSync();
- //如果失败还会进行重试
- //优点:提交成功准确率上升;缺点:降低程序吞吐量
-
- System.out.printf("offset=%d,key=%s,value=%s%n", record.offset(), record.key(), record.value());
-
- //异步提交并定义回调
- //优点:提高程序吞吐量(不需要等待请求响应,程序可以继续往下执行)
- //缺点:当提交失败的时候不会自动重试;
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
- Exception exception) {
- if (exception != null) {
- System.out.println("错误处理");
- offsets.forEach((x, y) -> System.out.printf(
- "topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset()));
- }
- }
- });
- }
- }
- }
- }
配置要消费消息的kafka服务器、消费者所在的消费组、offset是自动提交还是手动提交
enable.auto.commit和auto.commit.interval.ms参数为是否自动提交参数
- enable.auto.commit=true:自动定期地向服务器提交偏移量(offset)
- auto.commit.interval.ms:动提交offset的间隔,默认是5000ms(5s)
逻辑:每隔固定实践消费者就会把poll获取到的最大偏移量进行自动提交
出现的问题:如果刚提交了offset,还没到5s,2s的时候就发生了均衡,导致分区会重新划分,此时offset是不准确的,所以我们也可以配置手动提交的方式,具体的手动提交方式在下面第四步会讲到
调用subscribe()方法可以订阅多个主题
通过poll()方法设置定时拉取消息的时间间隔,消费者会循环的从kafka服务器拉取消息
前文中提到我们可以通过收到的方式提交offset,而手动提交又分为了两种,同步提交和异步提交。下面我直接上代码观看更直观
①、同步提交:如果失败还会进行重试,保证了提交成功准确率上升,但缺点是降低程序吞吐量,会发生阻塞
consumer.commitSync();
②、异步提交并回调:提高程序吞吐量(不需要等待请求响应,程序可以继续往下执行),不会阻塞,但缺点是当提交失败的时候不会自动重试;
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {
- if (exception != null) {
- System.out.println("错误处理");
- offsets.forEach((x, y) -> System.out.printf(
- "topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset()));
- }
- }
- });
topic中的partition收到生产者发送的消息后,broker会向生产者发送一个ack确认,如果收到则继续发送,没收到则重新发送。
需要注意的是,Kafka只能保证在单个分区内的消息顺序。如果一个主题有多个分区,那么多个分区之间的消息顺序无法保证。消费者可能会并行消费多个分区,并且不同分区的消息到达消费者的顺序可能会不同。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。