赞
踩
1.1.1 主要功能与应用场景
Kafka 作为一个分布式流式处理平台,提供了三个关键的功能:
其主要的应用场景:
1.1.2 相比于其他 MQ 的优缺点
Kafka的设计目标是高吞吐量,所以他的一些特性都和这个目标相关,如
和其他消息队列的对比:
消息中间件 / 特性 | Kafka | MsgBroker | RabbitMQ | RocketMq |
---|---|---|---|---|
消息推送模式 | pull | push | 多协议,均支持 | 多协议,均支持 |
持久化能力 | 磁盘(文件系统page cache) | DB | 磁盘 | 磁盘 |
事务型消息 | 支持(version>0.11) | 支持 | 支持 | 支持 |
消息是否有序 | 有序,采用追加写、offset读 | 无序 | 有序 | 有序 |
数据可靠性 | replica机制,容错容灾.(依赖ZK中维护的元数据来实现的 ) | 可靠 | 保证数据不丢,slave备份 | 可靠,同步刷盘,同/异步复制 |
应用场景 | 高吞吐量,分布式规模流式数据处理 | 金融及支付高可靠性,重点强调事务型 | 高可用 | 高吞吐,高可靠 |
属于 Topic 的一部分。一个 Topic 可以有多个 Partition(即多副本机制) ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上。
同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过ofiset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
一个topic下,不同的 broker 上多个partition的追加写入:
tips:kafka 中 ,Producer 在发布消息时可以选择指定要发送的 Partition,或者使用默认的分区策略让 Kafka 自动选择 Partition。
多副本机制是什么?
Kafka 为 Partition 引入了多副本(Replica)机制,来实现故障自动转移。Partition 中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
多副本架构:
生产者和消费者只与 leader 副本交互。
follower 保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。
多分区 以及多副本机制有什么好处?
默认情况下,当 leader 副本发生故障时,只有在ISR 集合中的副本才有资格被选举为新的 leader。
ISR 与 HW 和LEO 也有紧密的关系。Kafka 通过ISR的同步机制及优化策略,用 HW & LEO 的方式很好地确保了数据不丢失以及吞吐率。而ISR的管理最终都会反馈到 ZK 上。
ISR相关参数:
说明:
Leader和Follower之间的消息同步过程,以及HW、LEO的变化:
生产者客户端的整体架构:
Kafka有很多版本的生产者客户端:
核心对象:
public class ProducerRecords<K, V>{
private final String topic; //主题
private final Integer partition; //分区号
private final Headers headers; //消息头部
private final K key; //键
private final V value; //值
private final Long timestamp;//消息的时间戳
//省略其他成员方法和构造方法
}
说明:
1、KafkaProducer:
KafkaProducer 是 Kafka 客户端库提供的原生生产者实现,它提供了更底层的 API 来操作 Kafka 生产者。通过使用 KafkaProducer,您可以更细粒度地控制生产者的配置和行为,包括设置生产者的属性、自定义分区策略、实现自定义的消息发送等。
优点:
缺点:
2、使用 KafkaTemplate:
KafkaTemplate 是 Spring Kafka 提供的高级封装,它简化了 Kafka 生产者的使用,并提供了更易于理解和使用的 API。通过使用 KafkaTemplate,您可以不必关心生产者的初始化、线程安全等细节,而是直接使用 KafkaTemplate 提供的方法来发送消息。
优点:
缺点:
KafkaProducer对外提供了两个 send()方法:
public Future<RecordMetadata> send(ProducerRecord<K, V> record)
public Future<RecordMetadata> send(ProducerRecord<K, V> record,Callback callback)
KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。
常见的可重试异常有:NetworkException、LeaderNotAvailableException, UnknownTopicOrPart廿ionException、 NotEnoughReplicasException、NotCoordinatorException 等。重试配置:org.apache.kafka.clients.producer.ProducerConfig#RETRIES_CONFIG。
不可重试的异常,如 RecordTooLargeException(发送的消息太大) 异常,KafkaProducer对此不会进行任何重试, 直接抛出异常。
分区器的作用就是为消息分配分区。
如果消息 ProducerRecord 中指定了 partition字段, 那么就不需要分区器, 因为partition代表的就是所要发往的分区号。
如果消息ProducerRecord中没有 指定partition字段,那么就需要依赖分区器,根据key 这个字段来计算partition的值。
Kafka 中提供的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
#kafka配置,更多配置请参考:KafkaProperties
spring.kafka:
#公共参数,其他的timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms保持默认值
properties:
#指定producer在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小(batch-size),到达时间后生产者也会发送批量消息到broker。默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息。设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟。
linger.ms: 50 #默认值:0毫秒,当消息发送比较频繁时,增加一些延迟可增加吞吐量和性能。
#这个参数指定producer在一个TCP connection可同时发送多少条消息到broker并且等待broker响应,设置此参数较高的值可以提高吞吐量,但同时也会增加内存消耗。另外,如果设置过高反而会降低吞吐量,因为批量消息效率降低。设置为1,可以保证发送到broker的顺序和调用send方法顺序一致,即便出现失败重试的情况也是如此。
#注意:当前消息符合at-least-once,自kafka1.0.0以后,为保证消息有序以及exactly once,这个配置可适当调大为5。
max.in.flight.requests.per.connection: 1 #默认值:5,设置为1即表示producer在connection上发送一条消息,至少要等到这条消息被broker确认收到才继续发送下一条,因此是有序的。
producer:
#这个参数可以是任意字符串,它是broker用来识别消息是来自哪个客户端的。在broker进行打印日志、衡量指标或者配额限制时会用到。
clientId: ${spring.application.name} #方便kafkaserver打印日志定位请求来源
bootstrap-servers: 127.0.0.1:8080 #kafka服务器地址,多个以逗号隔开
#acks=0:生产者把消息发送到broker即认为成功,不等待broker的处理结果。这种方式的吞吐最高,但也是最容易丢失消息的。
#acks=1:生产者会在该分区的leader写入消息并返回成功后,认为消息发送成功。如果群首写入消息失败,生产者会收到错误响应并进行重试。这种方式能够一定程度避免消息丢失,但如果leader宕机时该消息没有复制到其他副本,那么该消息还是会丢失。另外,如果我们使用同步方式来发送,延迟会比前一种方式大大增加(至少增加一个网络往返时间);如果使用异步方式,应用感知不到延迟,吞吐量则会受异步正在发送中的数量限制。
#acks=all:生产者会等待所有副本成功写入该消息,这种方式是最安全的,能够保证消息不丢失,但是延迟也是最大的。
acks: all #默认值:1
#当生产者发送消息收到一个可恢复异常时,会进行重试,这个参数指定了重试的次数。在实际情况中,这个参数需要结合retry.backoff.ms来使用,建议总的重试时间比集群重新选举leader的时间长,这样可以避免生产者过早结束重试导致失败。
#另外需注意,当开启重试时,若未设置max.in.flight.requests.per.connection=1,则可能出现发往同一个分区的两批消息的顺序出错,比如,第一批发送失败了,第二批成功了,然后第一批重试成功了,此时两者的顺序就颠倒了。
retries: 2 #发送失败时重试多少次,0=禁用重试(默认值)
retry-backoff-ms: 1000 #重试等待间隔
#默认情况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy压缩算法由Google研发,这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
compressionType: "none" #如果不开启压缩,可设置为none(默认值),比较大的消息可开启。
#当多条消息发送到一个分区时,Producer会进行批量发送,这个参数指定了批量消息大小的上限(以字节为单位)。当批量消息达到这个大小时,Producer会一起发送到broker;但即使没有达到这个大小,生产者也会有定时机制来发送消息,避免消息延迟过大。
batch-size: 16384 #默认16K,值越小延迟越低,但是吞吐量和性能会降低。0=禁用批量发送
#这个参数设置Producer暂存待发送消息的缓冲区内存的大小,如果应用调用send方法的速度大于Producer发送的速度,那么调用会阻塞一定(max.block.ms)时间后抛出异常。
buffer-memory: 33554432 #缓冲区默认大小32M
一条消息的流转,会经过 Producer / Broker / Consumer ,所以消息的丢失,可能出现在这三个位置,我们分情况讨论。具体的实现,可参考:消息可靠性方案的实现
1. Producer 是否等待 TopicLeader 的消息回执(ack:默认为1)
acks = 1: Leader 写入消息到本地日志,则请求被认为成功。如果此时 Leader 应答请求后挂掉了,消息会丢失。
acks = 0: Producer 执行发送请求后立即返回,不等待 Leader 的确认,消息会丢失。
acks = -1:分区 Leader 必须等待消息被成功写入到所有的ISR副本(In-Sync Replace)中才认为请求成功。该种情况下配合 Producer 的消息重试机制 来保证消息可靠性。
2. Producer 进程意外停止导致的消息丢失
为了提升发送效率,减少IO次数,Producer 在发送数据时,可以将多个消息缓存在本地 buffer 中进行合并后发送(主要通过在 KafkaProducer 对象中配置 buffer.memory 、batch.size 、linger.ms 参数来控制)。
可以看到存在本地 Buffer 的消息,在 Producer 进程突然结束后,会出现消息丢失的情况。如果有必要,可以适当调小上面提及的参数,使发送频率变高,减少丢失数据。
3. Producer 网络问题导致的消息丢失
由于 Producer 的网络问题,在经过 retries 和 retry.backoff.ms 后,依然发送失败,那么将使用其他的方案进行重试发送。
1. Leader 切换
Broker 的消息丢失,最容易出现在 Partition Leader 切换时,我们可以通过如下的一些参数来保证 Broker 消息的可靠性:
1. Consumer 按时间间隔自动提交 offset
使用 enable.auto.commit 和 auto.commit.interval.ms 控制异步 offset 提交,可能会出现消息丢失或者重复消费。
2. Consumer 手动提交 offset
手动提交 offset 并不能 100% 保证消息一定不丢失,(比如,消费某几条消息失败后,导致 offset 没有提交,那么默认情况下,消费者会进行重试,如果重试还是失败,消费者将继续消费下一批消息,如果后续的消费成功提交 offset ,那么消费失败的消息将会丢失),需要配合异常重试、死信队列、死信队列消息处理机制,才能保证 Consumer 100% 不丢失消息。
消息不唯一,可能会出现在 Producer 端多次发送,在 Consumer 端多次消费,下面分情况讨论:
1. Producer 客户端通过 Producer ID(PID)和 Sequence Number 保证消息唯一
0.11.0 后的版本,引入了 Producer ID(PID)和 Sequence Number 实现 Producer 的幂等,保证一个消息只被投递一次。
2. Consumer 添加持久化的去重判断
Consumer 消息的消费和业务紧密相关,所以需建立持久化的唯一标识,来避免消息重复消费。
Kafka 只能为我们保证每个 Partition(分区) 中的消息有序, 同一个 Topic 下的多个 Partition 内的消息在消费时,不保证有序性.
为了保证消息的顺序消费, 有如下几种方案:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。