赞
踩
Kafka 是一种高吞吐量的分布式发布订阅消息系统,用于数据的缓冲。具有高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性。
作用一:消息系统。具备冗余存储、缓冲、异步通信、扩展性、可恢复性等功能。
作用二:存储系统:Kafka有消息持久化和多副本机制。将消息持久化到磁盘,可以把它作为长期的数据存储系统来使用
作用三:流式处理平台。Kafka 可以和流式处理框架进行集成。比如像Spark Streaming和Flink。提供了窗口、连接、变换和聚合等各类操作。
生产者、Broker、消费者、ZK;
注意:Zookeeper中保存Broker id和消费者offsets等信息,但是没有生产者信息。
整个生产者客户端由两个线程协调运行,这两个线程分别是main线程和Sender线程。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用后缓存到消息累加器(RecordAccumlator,也称为消息收集器)中。Sender线程负载从RecordAccumulator中获取消息并将其发送到Kafka集群中。
请求从Sender线程发往至Kafka集群之前还会保存到InFlightRequests中,其中保存的具体形式为Map<NodeId,Deque<Request>>。
InFlightRequests可以通过配置参数来限制缓存的broker的连接数(客户端和Node之间的连接),默认为5.
InFlightRequests还可以决定leastLoadedNode(当前在InFlightRequests中负载最小的node),发送请求时会优先发送leastLoadedNode
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once)=幂等性 + 至少一次(ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)。
Kafka机器数量=2(峰值生产速度副本数/100)+ 1
ACK=0 表示生产者在成功写入消息之前不会等待任何来自服务器的响应.
ACK=1 表示只要集群的leader分区副本接收到了消息,就会向生产者发送一个成功响应的ack,此时生产者接收到ack之后就可以认为该消息是写入成功的.
ACK=-1 表示只有所有参与复制的节点(ISR列表的副本)全部收到消息时,生产者才会接收到来自服务器的响应.
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
一般我们设置成2个或3个,很多企业设置为2个。
副本的优势:提高可靠性;副本劣势:增加了网络IO传输
分区重分配Kafka是基于zookeeper的,controller的选择也是在zookeeper上完成的。 Kafka 当前选举控制器的规则是:Kafka 集群中第一个启动的 broker 通过在 ZooKeeper 里创建一个临时节点 /controller 让自己成为 controller 控制器。
分区leader副本的选举由控制器负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作),或分区上线(比如分区中原先的leader 副本下线,此时分区需要选举一个新的leader上线来对外提供服务)的时候,都需要执行leader的选举动作。基本策略是按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中 。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。
GroupCoordinator是负责执行消费者的分区分配和再均衡操作,在初始阶段,当消费者未保存与消费组对应的GroupCoordinator节点信息时,需要通过向集群中负载最小的节点发送请求来寻找,Kafka通过消费组的groupId的哈希值计算__consumer_offsets中的分区编号,找到分区后,再寻找分区leader副本所在的broker节点,该节点就是对应的GroupCoordinator,消费者最终的分区分配方案以及组内消费者所提交的消费位移信息都会发送给次分区leader副本所在的broker节点。
GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法很简单,当消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果当前leader退出消费组,则会挑选以HashMap结构保存的消费者节点数据中,第一个键值对来作为leader。
默认保存7天;生产环境建议3天
每天总数据量100g,每天产生1亿条日志, 10000万/24/60/60=1150条/每秒钟
平均每秒钟:1150条
低谷每秒钟:50条
高峰每秒钟:1150条(2-20倍)=2300条-23000条
每条日志大小:0.5k-2k(取1k)
每秒多少数据量:2.0M-20MB
每天的数据量100g2个副本3天/70%
公司自己开发的监控器;
开源的监控器:KafkaManager、KafkaMonitor、KafkaEagle
1)创建一个只有1个分区的topic
2)测试这个topic的producer吞吐量和consumer吞吐量。
3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)
例如:producer吞吐量=20m/s;consumer吞吐量=50m/s,期望吞吐量100m/s;
分区数=100 / 20 =5分区
如何根据数据量确定Kafka分区个数、Kafka的分区是不是越多越好、Kafak生产者分发策略,消费者负载均衡 09_kafka每个分区数据量有多大_啊策策的博客-CSDN博客
分区数一般设置为:3-10个
通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。
ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。
Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
第一步:将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。
1)Flume记录
2)日志有记录
3)短期没事
为了提高发送数据的效率,可以将数据缓存在Buffer中,一次性发送大量数据,那么如果Buffer中数据没有及时处理Producer就因为某些原因退出(OOM、被Kill等)就可能导致数据丢失。或者生产数据过快,导致Buffer装满了还没有发送,也可能导致丢失。
避免或者缓解方式:
Producer同步数据发送,或者采用阻塞的带一定容量上限的线程池
扩大Buffer大小,防止Buffer装满,但是如果程序被异常退出,则数据会丢失
生产的数据在Producer本地落盘,然后再使用另一个程序来发送到Kafka,例如Filebeat、Logstash等
如果主分片在第一个步骤断电,数据在内存中,Producer收不到ACK会重试,知道达到重试阈值。
这个过程需要权衡配置acks的要求,如果为0,则表示Producer直接写入不等待ACK,数据很可能丢失;如果为1,则表示Producer写入数据到系统Cache,数据在Producer可能丢失,如果为-1,表示要等待所有从分片确认信息,数据则不回丢失
Consumer会从Kafka中接收并处理消息,然后记录offset,提交offset有两种方式,分为手动提交和自动提交,如果是自动提交,就可能存在数据丢失的情况,就是在数据还没有消费成功就已经提交了offset,如果这时Consumer被退出,数据就丢失了。
避免方法:
采用手动提交方式可以避免出现上述问题,但会造成数据被重复消费
幂等性+ack-1+事务
Kafka数据重复,可以再下一级:SparkStreaming、redis或者hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;
1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
零拷贝会经过JVM堆吗?
将数据直接从磁盘文件复制到网卡设备中,不需要经由应用程序之手。减少了内核和用户模式的上下文切换。底层通过sendfile 方法实现。
传统IO需要四步。读两步:磁盘到Read Buffer,读缓冲区到用于程序。写两步:应用程序写数据到写缓冲区Socket Buffer,写缓冲区写到网卡设备中。
零拷贝技术通过DMA技术将文件内容复制到内核模式的Read Buffer中,和传统IO不同的是,不需要再到用户态走一圈,不再需要额外的Socket Buffer。DMA engine直接将数据从内核模式中传递到网卡设备中。
应用程序空间,用户态。应用程序存放数据就是在堆咯,所以,不会经过JVM堆
1、零拷贝:避免了传统IO四步操作,采用DMA 技术,用DMA引擎直接将数据从内核模式传递到网卡设备中
Producer 生产的数据持久化到 broker,采用 mmap 文件映射,实现顺序的快速写入
Customer 从 broker 读取数据,采用 sendfile,将磁盘文件读到 OS 内核缓冲区后,转到 NIO buffer进行网络发送,减少 CPU 消耗
2、 页缓存:将磁盘的数据缓存到内存中,将对磁盘的访问变成对内存的访问
3、 顺序追加:消息落到磁盘中,采用顺序追加,不支持随机访问
4、 分区机制:partition ,实现横向扩展,并行处理
kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中, 常常会出现一条消息大于1M,如果不对kafka进行配置。则会出现生产者无法将消息推送到kafka或消费者无法去消费kafka里面的数据, 这时我们就要对kafka进行以下配置:server.properties
replica.fetch.max.bytes: 1048576 broker可复制的消息的最大字节数, 默认为1M
message.max.bytes: 1000012 kafka 会接收单个消息size的最大限制, 默认为1M左右
注意:message.max.bytes必须小于等于replica.fetch.max.bytes,否则就会导致replica之间数据同步失败。
保证数据没有被引用(没人消费他)
日志清理保存的策略只有delete和compact两种
log.cleanup.policy=delete启用删除策略
log.cleanup.policy=compact启用压缩策略
CRC32:4个字节。消息的CRC校验码。
magic:1个字节。模数标识,与消息格式有关,取值为0或1。当magic为0时,消息的offset使用绝对offset且消息格式中没有timestamp部分;当magic为1时,消息的offset使用相对offset且消息格式中存在timestamp部分。
attributes:1个字节。0~2位表示消息使用的压缩类型,0(000)->无压缩 1(001)->gzip 压缩 2(010)->snappy 压缩 3(011)-> lzo 压缩。第3位表示时间戳类型,0->创建时间 1->追加时间。
timestamp:8个字节,时间戳。
key length:消息key的长度。
key:消息的key。
value length:消息的内容长度。
value:消息的内容。
一个主题会有多个分区,那么就会有多个topic-partition 的文件夹。每个分区的日志会切分为多个LogSegment。每个LogSegment 的.log 日志文件都会有两个对应的索引文件。偏移量索引文件(.index 为后缀)和时间戳索引文件(以.timeindex为后缀的文件)。
单分区内有序;多分区,分区与分区间无序;
1、一个分区,消费者将消息全部写入一个分区中,一个消费者进行消费。
2、自定义分区器Partitioner ,重写partition 方法,将消息顺序追加到K个分区,然后在消费者写K个内存队列,相同分区号的数据都存到一个内存Queue中,N个线程分别消费一个内存队列即可
Java序列化器太重,会增加额外的数据来保证安全传输。在大数据场景下,一般不会使用java原生的序列化器。
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧, 消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。
综上所述:生产者和消费者的序列化器必须是相同的,否则可能就会出现乱码的情况
1)Broker参数配置(server.properties)
1、日志保留策略配置
保留三天,也可以更短 (log.cleaner.delete.retention.ms)
log.retention.hours=72
2、Replica相关配置
default.replication.factor:1 默认副本1个
3、网络通信延时
replica.socket.timeout.ms:30000 #当集群之间网络不稳定时,调大该参数
replica.lag.time.max.ms= 600000# 如果网络不好,或者kafka集群压力较大,会出现副本丢失,然后会频繁复制副本,导致集群压力更大,此时可以调大该参数
2)Producer优化(producer.properties)
compression.type:none
#默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。
3)Kafka内存调整(kafka-server-start.sh)
默认内存1个G,生产环境尽量不要超过6个G。
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
一、1个Topic(主题)只创建1个Partition(分区),这样生产者的所有数据都发送到了一个Partition(分区),保证了消息的消费顺序。
二、生产者在发送消息的时候指定要发送到哪个Partition(分区)。
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值;
在Producer往Kafka插入数据时,控制同一Key分发到同一Partition,并且设置参数max.in.flight.requests.per.connection=1,也即同一个链接只能发送一条消息,如此便可严格保证Kafka消息的顺序
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后
面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition
值,也就是常说的 round-robin 算法。
第二步:
自定义分区器Partitioner ,重写partition 方法,将消息顺序追加到K个分区,然后在消费者写K个内存队列,相同分区号的数据都存到一个内存Queue中,N个线程分别消费一个内存队列即可
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。