赞
踩
Kafka 性能优化与问题深究
一.Kafka深入探究
Kafka是一个分布式高吞吐量的消息系统,这里提下 Kafka 写数据的大致方式:先写操作系统的页缓存(Page Cache),然后由操作系统自行决定何时刷到磁盘。
因此 Kafka 达到高吞吐、低延迟的原因主要有以下 4 点:
PS: 使用页缓存而非堆内存还有一个好处:
Linux 总会把系统中还没被应用使用的内存挪来给 Page Cache,在命令行输入free,或者 cat /proc/meminfo ,“Cached”的部分就是 Page Cache。Page Cache 中每个文件是一棵 Radix 树(又称 PAT 位树, 一种多叉搜索树),节点由 4k 大小的 Page 组成,可以通过文件的偏移量(如 0x1110001)快速定位到某个Page。
当写操作发生时,它只是将数据写入 Page Cache 中,并将该页置上 dirty 标志。
当读操作发生时,它会首先在 Page Cache 中查找,如果有就直接返回,没有的话就会从磁盘读取文件写入 Page Cache 再读取。
可见,只要生产者与消费者的速度相差不大,消费者会直接读取之前生产者写入Page Cache的数据,大家在内存里完成接力,根本没有磁盘访问。
而比起在内存中维护一份消息数据的传统做法,这既不会重复浪费一倍的内存,Page Cache 又不需要 GC (可以放心使用60G内存了),而且即使 Kafka 重启了,Page Cache 还依然在。
就是当 Kafka broker 的进程崩溃时,堆内存的数据会丢失,但是页缓存的数据依然存在,重启 Kafka broker 后可以继续提供服务。
Kafka作为一个消息系统,kafka的consumer group概念引申出两个概念。作为队列时,consumer group支持多个进程的集合同时分担数据处理。作为发布-订阅模式时,kafka支持发布消息到多个consumer groups。
kafka模式的优点在于每个topic都有这两个特征-既可以扩展处理能力又可以支持多个订阅者---不需要选择这个却不得不舍弃那个。
kafka要比传统消息系统有更强的次序保证。通过并行处理机制-partition-在topic内部,kafka可以提供次序保证以及consumer 进程组之间负载均衡。这是通过分配topic的每个partition给consumer group中某个consumer,这样可以保证一个partition只会被一个consumer消费。这样就保证了consumer是partition唯一的消费者,即可以获得有序的消息。由于有多个partitions存在,也就需要多个consumer实例来保证负载均衡。注意,同一个consumer group中consumers实例的个数不能多于partitions的数目。
Fig1.Kafka整体架构图
Fig2. kafka Cluster 图
一个典型的 Kafka Cluster(集群)中包含:
1.管理 broker 与 consumer 的动态加入与离开。(Producer 不需要管理,随便一台计算机都可以作为Producer 向 Kakfa Broker 发消息)
2.触发负载均衡,当 broker 或 consumer 加入或离开时会触发负载均衡算法,使得一个 consumer group 内的多个 consumer 的消费负载平衡。
3.维护消费关系及每个 partition 的消费信息。
1.2.1 producer 工作流程
Fig3. Producer发送消息流程
Kafka的客户端发送数据到服务器,一般都是要经过缓冲的,也就是说,你通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的。
Producer的默认配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("buffer.memory", 33554432); //缓存区内存32M
props.put("batch.size", 16384); //批量发送消息的大小为16KB
props.put("linger.ms", 0); //batch发送时间
props.put("max.request.size", 1048576); //最大的请求大小为1M
props.put("acks", "1"); //持久化机制
props.put("retries", 0); //重试机制
props.put("max.block.ms", 6000);//阻塞时长为60s
重要参数解释:
避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况。
2."linger.ms":避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况比如正常情况下13ms凑够一个batch,那么设置就可以为20ms,哪怕遇到低峰时期,13ms凑不满一个Batch,还是会在20ms之后强制Batch发送出去。
acks = 0 : KafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,直接就认为这个消息发送成功了。
acks = all 或 -1: Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。(必须跟ISR列表里至少有2个以上的副本配合使用)。
acks = 1: 写入本地日志即可,Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。是上述二者的折衷方案,也是默认值。
1.2.2 如何保证宕机的时候数据不丢失?
Kafka的高可用架构:多副本冗余几乎是现在任何一个优秀的分布式系统都一般要具备的功能。
Broker是整个Kafka集群的核心引擎,负责消息的存储转发,并对外提供服务。我们可以看到,Kafka集群可以非常简单的通过增删Broker,实现整个集群的扩缩容。Kafka对外提供服务的基本单位是Topic,那么实现Topic级别的平行扩展能力,也就实现了应用级的平行扩展能力。Kafka的Broker集群中,每台机器上都存储了一些Partition,也就存放了Topic的一部分数据,每个Partition都有多个副本,其中一个副本叫做leader,其他的副本叫做follower,这样就实现了Topic的数据分布式存储在一个Broker集群上。
如上图所示,假设一个Topic拆分为了3个Partition,分别是Partition0,Partiton1,Partition2,此时每个Partition都有2个副本。这样的多副本冗余机制,可以保证任何一台机器挂掉,都不会导致数据彻底丢失,因为起码还是有副本在别的机器上的。
1.2.3多副本之间数据如何同步?
任何一个Partition,只有Leader是对外提供读写服务的
也就是说,如果有一个客户端往一个Partition写入数据,此时一般就是写入这个Partition的Leader副本。
然后Leader副本接收到数据之后,Follower副本会不停的给他发送请求尝试去拉取最新的数据,拉取到自己本地后,写入磁盘中。如下图所示:
1.2.4如何判断Follower 与Leader是否保持同步状态?
如果说某个Follower所在的Broker因为JVM FullGC之类的问题,导致自己卡顿了,无法及时从Leader拉取同步数据,那么是不是会导致Follower的数据比Leader要落后很多?
ISR全称是“In-Sync Replicas”,也就是保持同步的副本,他的含义就是,跟Leader始终保持同步的Follower有哪些。只要Follower一直及时从Leader同步数据,就可以保证他们是处于同步的关系的。
所以每个Partition都有一个ISR,这个ISR里一定会有Leader自己,因为Leader肯定数据是最新的,然后就是那些跟Leader保持同步的Follower,也会在ISR里。
1.2.5 Producer端 模拟实测
根据对实际生产环境进行过车数据的测试,模拟producer端以1000nMesg/s的数据量进行生产发送,其中一条过程数据的所占内存空间大小大概为250Byte。、
VehiclePass{deviceIndexCode='1', ip='10.194.224.111', port=0, crossingIndexCode='4', crossingId='0', directionIndex=1, laneNo=8, reciveTime='20190827183327', passTime='20190827183327', city='上海', quantity=1234567, ttlNum=9999},过车数据占用内存大小:230Byte
综上计算:
当producer创建20个topic时,则1s生产1000*20*250/1024=5M,batch.size =16K时,则一个batch中包含65条数据,1s要发送305个batch。当出现网络延迟或者producer端。
所以要适当的提高batch.size的大小,可以适当的调整为32K,,这样相当于一个batch包含130条数据,每13ms发送一次batch;同时buffer.memory 相应调整为64M。
分析结论:
参数 | 默认值 | 推荐值 | 说明 |
num.network.threads | 3 | 默认值 | server用来处理网络请求的网络线程数目;一般你不需要更改这个属性。 |
num.io.threads | 8 | 默认值 | server用来处理请求的I/O线程的数目;这个线程数目至少要等于硬盘的个数。 |
queued.max.requests | 500 | 默认值 | 在网络线程停止读取新请求之前,可以排队等待I/O线程处理的最大请求个数。 |
Fig.4. Broker 网络流程
其中num.network.threads是控制上图中的Processor Thread的个数, num.io.threads是控制API Thread的个数,queued.max_requests是控制Request Channel队列的容量。
1.3.3 实测分析
num.network.threads、num.io.threads和queued.max.requests 这三个参数是kafka网络模型的相关参数,所以这里一起测试。 模拟一定的压力,使得API threads线程处理不过来,request channel队列阻塞,性能开始下降。此时增大queue.max.requests或者增加API threads,查看性能情况 。
1G网卡下,10分区,2k数据,发送1000000, ack都为1, 3个Producer与3个Consumer一起跑, Consumer每次从队列开始读取数据。
测试数据如下:
num.network.threads | num.io.threads | queued.max.requests | Producer nMsg.sec | Consumer nMsg.sec |
3 | 8 | 1 | 48768.5443 | 48985.088 |
3 | 8 | 250 | 50942.3841 | 52804.523 |
3 | 8 | 500 | 51303.0474 | 55146.401 |
3 | 8 | 1000 | 51956.0971 | 54461.271 |
3 | 1 | 1 | 50699.6045 | 48399.7216 |
从queued.max.requests值由1增加到1000,可以看出通过调大queue个数,性能可以稍稍提高6%左右。
修改num.io.threads个数,性能影响不大。
10G网卡 下,100分区,2k数据,发送1000000, ack都为1, 10个Producer与10个Consumer一起跑, Consumer每次从队列开始读取数据,Producer BatchSize 为512M
num.network.threads | num.io.threads | queued.max.requests | Producer nMsg.sec | Consumer nMsg.sec |
3 | 8 | 1 | 203500.2035 | 139385.4383 |
3 | 8 | 1000 | 228206.2985 | 139567.3412 |
50 | 8 | 1 | 187441.4246 | 137869.9555 |
50 | 8 | 1000 | 229042.6019 | 151199.0081 |
取queued.max.requests的极端情况分别为1和1000,在不同的压力下, Producer性能能提升10%。 如果把num.network.threads调大,这两者的差距就更大达到22%
从数据得到的结论:
1.从上面的测试来看,3个线程的压力下,就算queued.max.requests设置为1,broker也能很快处理,不会造成性能剧烈下降。
2.10G 网卡下, queued.max.requests设置为1 与设置为1000比较,能提升22%。
3.按照目前的压力来看,用默认值就可以满足业务要求,发现性能瓶颈可以调大这三个参数就可以。
取queued.max.requests的极端情况分别为1和1000,在不同的压力下, Producer性能能提升10%。 如果把num.network.threads调大,这两者的差距就更大达到22%
从数据得到的结论:
1.从上面的测试来看,3个线程的压力下,就算queued.max.requests设置为1,broker也能很快处理,不会造成性能剧烈下降。
2.10G 网卡下, queued.max.requests设置为1 与设置为1000比较,能提升22%。
3.按照目前的压力来看,用默认值就可以满足业务要求,发现性能瓶颈可以调大这三个参数就可以。
1.4.1 Consumer Java实现
public static void main(String[] args){
String topicName = "test-topic";
String groupId = "test-group";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); //必须指定
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");//自动提交offset
props.put("auto.commit.interval.ms", "1000");//自动提交时间间隔
props.put("auto.offset.reset", "earliest");//重新读取offset位置
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);//轮询1000次
for(ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),record.value());
}
}
} finally {
consumer.close();
}
}
消费者 & 消费者组
kafka中有一个概念叫做consumer group,每个group 去订阅对应的topic,topic的每条消息只能发送到订阅它的消费者组的其中一个实例上,并且每个消费者至多使用一个消费者组来标示自己。
Fig.5. 单Comsumer Group多consumer
而当某个topic 被多个消费者组订阅,而每个组仅有一个消费者时,每条消息就会被广播到每个消费者上。
Fig.6. 多Comsumer Group
这里需要注意下,还有个叫做独立消费者(standalone consumer)的概念,对于consumer group 是以group 为单位进行消息消费的,而standalone 会单独的执行消费,以consumer 实例为单位进行消费的。
1.4.2 group 状态机 & group管理协议
coordinator 实现组的管理,依赖的主要是consumer group的状态,仅有 Empty(组内没有任何active consumer)、PreparingRebalance(group 正在准备进行rebalance)、AwaitingSync(所有组员已经加入组并等待leader consumer发送分区的分配方案)、Stable(group开始正常消费)、Dead(该group 已经被废弃)这五个状态,那他们是如何轮转的可以简单的看一下状态机.
Fig.7. group 状态机
就整个过程来说,可以大致分为加入组阶段、状态同步阶段。
加入组阶段:当明确group的coordinator之后,组内成员需要显式的发送JoinGroup请求(主要包括 订阅信息、成员id等元数据信息)给对应的coordinator,然后coordinator选择对应的consumer 作为leader,然后再给其他成员产生响应(一个空数组)。
1.4.3 offset & broker 中的offset
在Kafka 里面存在两个offset的概念,一个指的是consumer 中的offset,一个是broker中的offset.
concumer offset 用来记录当前消费了多少条消息,这个offset的状态是由consumer group来维护的,通过检查点机制对于offset的值进行持久化(内部就是一个map)
broker offset 消息在broker 端的位移值,根据之前说过的几个概念可以大致的理解为一个<topic,partition,offset>可以唯一的标示到一条消息。
1.4.4 _consumer_offset topic & zookeeper 位移管理
因为新版本和旧版本Kafka 所采用的offset保存策略是不同的,旧版本中主要依赖于Zookeeper,当kafka 在数量很大的消费发生时,zookeeper读写会异常的频繁,导致很容易成为整个Kafka系统的瓶颈。所以新版本(0.9版本之后)对这种方式作出了重大更新,不再依赖于Zookeeper 来进行状态的保存,而是在broker 端直接开一个内部使用的topic,也就是_consumer_offsets topic,并且kafka 为了兼容老版本的consumer 还提供了 offsets.storage=kafka这样一个适配参数。
1.4.5 Rebalance & 场景剖析
最后要说的一点就是consumer 端的Rebalance 过程(rebalance是针对consumer group来说的,如果是standalone consumer 则没有这个概念),rebalance也就是如何达成一致来分配订阅topic的所有分区。这个rebalance的代价还是不小的,我们是需要避免高频的rebalance的。
常见的rebalance 场景有:新成员加入组、组内成员崩溃(这种场景无法主动通知,需要被动的检测才行,并且需要一个session.timeout 才检测到)、成员主动离组。
consumer 是可以执行任意次rebalance的,为了区分两次rebalance上的数据(防止无效或者延迟的offset提交),consumer 设计了一个叫做rebalance generation的标示。
对应常见的rebalance请求有:
JoinGroup:consumer 请求加入组
SyncGroup:group leader把分配方案同步给组内所有成员
Heartbeat:consumer 定期向coordination汇报心跳表示自己还存活
LeaveGroup:consumer 主动通知coordinator该consumer即将离组
DescribeGroup:查看组的所有信息。
场景:I/O 线程发送消息之前,producer 崩溃, 则 producer 的内存缓冲区的数据将丢失。
解决方案:
consumer 端:不是严格意义的丢失,其实只是漏消费了。
场景:设置了 auto.commit.enable=true ,当 consumer fetch 了一些数据但还没有完全处理掉的时候,刚好到 commit interval 触发了提交 offset 操作,接着 consumer 挂掉。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。
解决方案:同步和异步组合提交
enable.auto.commit=false 关闭自动提交位移,在消息被完整处理之后再手动提交位移。
//设置手动提交消息偏移
properties.put("enable.auto.commit","false");
//一次拉取的最大消息条数
properties.put("max.poll.records",100);、consumer.subscribe(Collections.singletonList("Demo3"));
int count = 0;
try {
while (true){
ConsumerRecords<String,String> records = consumer.poll(10);
for(ConsumerRecord<String ,String> record : records){
count ++;
System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());
}
consumer.commitAsync();//只管发送提交请求无需等待broker返回
}
} finally {
try {
consumer.commitSync();//由poll()方法返回的最新偏移量,提交成功后马上返回,否则跑出异常。每处理一次消息提交一次offset。
} finally {
consumer.close();
}
//consumer.close();
}
对于offset的commit,Kafka Consumer Java Client支持两种模式:由KafkaConsumer自动提交,或者是用户通过调用commitSync、commitAsync方法的方式完成offset的提交。
在多partition多consumer的场景下自动提交总会发生一些不可控的情况。所以消费者API也为我们提供了另外一种提交偏移量的方式。开发者可以在程序中自己决定何时提交,而不是基于时间间隔。
1. 自动提交
最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是5s。消费者每次获取新数据时都会先把上一次poll()方法返回的最大偏移量提交上去。
可能造成的问题:数据重复读
假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s,所以在这 3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。
2. 手动提交
(1) 同步提交
// 把auto.commit.offset设为false,让应用程序决定何时提交偏移量
props.put("auto.commit.offset", false);
try{
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {
// 假设把记录内容打印出来就算处理完毕
System.out.println("value = " + record.value() + ", topic = " + record.topic() +
", partition = " + record.partition() + ", offset = " + record.offset());
}
try{
// 只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功
// 如果提交失败,我们也只能把异常记录到错误日志里
consumer.commitSync();
}catch(CommitFailedException e) {
System.err.println("commit failed!" + e.getMessage());
}
}}finally {
consumer.close();}
(2) 异步提交
手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。
这个时候可以使用异步提交,只管发送提交请求,无需等待 broker 的响应。
// 把auto.commit.offset设为false,让应用程序决定何时提交偏移量
props.put("auto.commit.offset", false);
try{
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {
System.out.println("value = " + record.value() + ", topic = " + record.topic() +
", partition = " + record.partition() + ", offset = " + record.offset());
}
// 提交最后一个偏移量,然后继续做其他事情。
consumer.commitAsync();
}}finally {
consumer.close();}
在成功提交或碰到无法恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会,这也是commitAsync()不好的一个地方。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。
假设我们发出一个请求用于提交偏移量2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量3000。如果commitAsync()重新尝试提交偏移量2000,它有可能在偏移量3000之后提交成功。这个时候如果发生再均衡,就会出现重复消息。
commitAsync()也支持回调,在broker作出响应时会执行回调:
// 把auto.commit.offset设为false,让应用程序决定何时提交偏移量
props.put("auto.commit.offset", false);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("value = " + record.value() + ", topic = " + record.topic() + ", partition = "
+ record.partition() + ", offset = " + record.offset());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(offsets != null) {
System.out.println("commit offset successful!");
}
if(exception != null) {
System.out.println("commit offset fail!" + exception.getMessage());
}
}
});
}} finally {
consumer.close();}
可以在回调中重试失败的提交,以下为思路: 使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试。如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。
(3) 同步和异步组合提交
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("value = " + record.value() + ", topic = " + record.topic() + ", partition = "
+ record.partition() + ", offset = " + record.offset());
}
// 如果一切正常,我们使用 commitAsync() 方法来提交
// 这样速度更快,而且即使这次提交失败,下一次提交很可能会成功
consumer.commitAsync();
}}catch (Exception e) {
e.printStackTrace();}finally {
try {
// 使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误
// 确保关闭消费者之前成功提交了偏移量
consumer.commitSync();
}finally {
consumer.close();
}}
broker failover机制
Follower 还没有来得及同步数据,Leader 挂了,然后选举某个 Follower 成 Leader 之后,这就会丢了一些数据。
解决方法:
1.给 Topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 Partition 必须有至少 2 个副本。
2.在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 Leader 至少感知到有至少一个 Follower 还跟自己保持联系,没掉队,这样才能确保 Leader 挂了还有一个 Follower 吧。
kafka 集群启动后,所有的 broker 都会被 controller 监控,一旦有 broker 宕机,ZK 的监听机制会通知到 controller, controller 拿到挂掉 broker 中所有的 partition,以及它上面的存在的 leader,然后从 partition的 ISR 中选择一个 follower 作为 leader,更改 partition 的 follower 和 leader 状态。
3.在 Producer 端设置 acks=all:要求每条数据,必须是写入所有 Replica 之后,才能认为是写成功了
4.在 Producer 端设置 retries=MAX:一旦写入失败,就无限重试。
Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
解决方法:
1.很可能是之前在server.properties配置文件夹里面和执行命令的zookeeper目录不一致。--zookeeper的值需要带上根目录,否则就会报这样的错误。例如配置文件里面写的连接目录是zookeeper.connect=master:2181,slave1:2181,slave3:2181/kafka,但是在执行命令时少写了kafka目录,写成一下
--zookeeper master:2181,slave1:2181,slave3:2181。就会报上述的错误,因此,务必要保证zookeeper的目录一致。
当Topic成功创建时,会输出Created topic “mobilePhone”,如上图。
注意:replication-factor不能大于broker数。
2.出现这个问题的原因是kafka没有启动的情况下想去create tpoic,所以应该是先去进到kafka安装目录的bin路径下,执行./kafka-server-start.sh ../config/server.properties 打开kafka即可!
cd /usr/kafka/bin
./kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave3:2181/kafka --topic mobilePhone
step1:
如果需要被删除topic 此时正在被程序 produce和consume,则这些生产和消费程序需要停止。
因为如果有程序正在生产或者消费该topic,则该topic的offset信息一致会在broker更新。调用kafka delete命令则无法删除该topic。
同时,需要设置 auto.create.topics.enable = false,默认设置为true。
step2:
server.properties 设置 delete.topic.enable=true
如果没有设置 delete.topic.enable=true,则调用kafka 的delete命令无法真正将topic删除,而是显示(marked for deletion)
step3:
调用命令删除topic:
./bin/kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】
step4:
删除kafka存储目录(server.properties文件log.dirs配置,默认为"/data/kafka-logs")相关topic的数据目录。
注意:如果kafka 有多个 broker,且每个broker 配置了多个数据盘(比如 /data/kafka-logs,/data1/kafka-logs ...),且topic也有多个分区和replica,则需要对所有broker的所有数据盘进行扫描,删除该topic的所有分区数据。
step5:
找一台部署了zk的服务器,使用命令:
bin/zkCli.sh -server 【zookeeper server:port】
登录到zk shell,然后找到topic所在的目录:ls /brokers/topics,找到要删除的topic,然后执行命令:
rmr /brokers/topics/【topic name】
即可,此时topic被彻底删除。
如果topic 是被标记为 marked for deletion,则通过命令 ls /admin/delete_topics,找到要删除的topic,然后执行命令:
rmr /admin/delete_topics/【topic name】
备注:
网络上很多其它文章还说明,需要删除topic在zk上面的消费节点记录、配置节点记录,比如:
rmr /consumers/【consumer-group】
rmr /config/topics/【topic name】
其实正常情况是不需要进行这两个操作的,
step6:
完成之后,调用命令:
./bin/kafka-topics.sh --list --zookeeper 【zookeeper server:port】
查看现在kafka的topic信息。正常情况下删除的topic就不会再显示。
但是,如果还能够查询到删除的topic,则重启zk和kafka即可。
出现问题:
kafka 发送消息卡顿 60s:Kafka 在 producer 调用 send 发送数据的时候卡住一分钟,精确的一分钟,然后函数返回,没有抛出异常
解决方案:
1.Try set the bootstrap.servers to 127.0.0.1:9092
2.发现是没有配置 host
把 kafka 的 broker list 从主机名改成了 ip
3.设置
props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, “0”);
这可能并不是IP或主机名的问题
4.引入jar包的版本问题,原来引入的kafka的版本是<version>0.10.0.0</version>修改之后的版本是<version>0.10.1.0</version>
5.因为producer端缓存区满了,,消息放入缓存的速度大于发送的速度,导致阻塞时长,因为默认max.block.ms是60s.
出现问题:
Kafka offset is out of range错误原因
使用Kafka的过程中,发现consumer在启动的时候常常会抛出“offset is out of range”的错误,然后将offset设置为0,然后正常运行。
原因如下:
当一个consumer启动的时候,它不知道从哪个offset开始读取数据,于是它使用一个最大的长整数,这样会引发OffsetOutOfRangeException,当接收到这个错误后,consumer会根据配置中“autooffset.reset”的值来重置offset的值。
最终解决方案
(1)调整以下2个参数,减低replica从leader同步数据的速度:
message.max.bytes=10000000
replica.fetch.max.bytes=10737418
num.replica.fetchers=2
此类场景为kafka的consumer会从broker里面取出一批数据,�给消费线程进行消费。
由于取出的一批消息数量太大,consumer在session.timeout.ms时间之内没有消费完成consumer coordinator 会由于没有接受到心跳而挂掉。
出现问题:
INFO[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator coordinatorDead 529: kafka-example|NTI|Marking the coordinator 2147483646 dead.
[rhllor]DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator sendGroupMetadataRequest 465: kafka-example|NTI|Issuing group metadata request to broker 1
[rhllor]ERROR[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1
日志的意思大概是coordinator挂掉了,然后自动提交offset失败,然后重新分配partition给客户端;由于自动提交offset失败,导致重新分配了partition的客户端又重新消费之前的一批数据;接着consumer重新消费,又出现了消费超时,无限循环下去。
解决方案:
1.提高partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力
2.对于单partition的消费线程,增加了一个固定长度的阻塞队列和工作线程池进一步提高并行消费的能力
3.enable.auto.commit设置成false,使用同步异步组合的方式进行offset提交。
测试过程中报错:
14:06:29.751[thread-36]错误O.A.K.C.C.I.ConsumerCoordinator-[consumer clientid=consumer-3,group id=group id dynamic feature interval]带偏移量的偏移量提交min5_feature_result-6=offsetandmetadata_offset=128783,metadata='',min5_feature_result-7=offsetandmetadata_offset=242999,元数据='',min5_feature结果-4=offsetandmetadata偏移量=42803,metadata='',min5_feature结果-5=offsetandmetadata偏移量=87464,metadata='',min5_feature结果-8=offsetandmetadata偏移量=0,metadata='',min5_feature结果-9=offsetandmetadata偏移量=0,metadata='',min5_feature结果-2=offsetandmetadata offset=70240,metadata='',min5_feature result-3=offsetandmetadata offset=275868,metadata='',min5_feature result-0=offsetandmetadata offset=401720,metadata='',min5_feature result-1=offsetandmetadata offset=139135,metadata=''失败 org.apache.kafka.clients.consumer.commitfailedException:无法完成提交,
问题分析:
因为组已重新平衡并将分区分配给另一个成员,这意味着随后调用poll()之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息,这可以通过增加会话超时或减少poll()中使用max.poll.records返回的批的最大大小来实现。
解决方案:
1.增大会话时间:默认:10000,现改为60000
session.timeout.ms:
当使用Kafkagroup管理用法时,这个超时时间用来检测consumer是否失效。consumer通过发送心跳信息给broker,用来表明自己还有效。如果broker在这个超时时间内没有收到来自consumer的心跳信息,则broker会从consumergroup中移除这个consumer,并重新进行负载均衡。注意,这个值必须在broker配置的允许范围之内:即group.min.session.timeout.ms和group.max.session.timeout.ms之间。(6000-300000之间)
2.减少max.poll.records返回的批的最大大小:默认500,改为:
max.poll.records:一次单独调用poll()可以返回的消息的最大条数。一次单独调用poll()可以返回的消息的最大条数。
个人认为:
3.应相应增大heartbeat.interval.ms;默认3000,改为 session.timeout.ms=session.timeout.ms/3
当使用Kafka的group管理用法时,consumer协作器两次心跳之间的时间间隔。心跳链接用来保证consumer的会话依然活跃,以及在新consumer加入consumergroup时可以重新进行负载均衡。这个值要比session.timeout.ms小,但是一般要比session.timeout.ms的1/3要大。这个值可以适当的减小,以控制重负载均衡的时间。
出现问题:
[2019-08-14 15:50:45,442] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
kafka.common.KafkaException: Failed to acquire lock on file .lock in /usr/lib/LOCALCLUSTER/kafka/kafka-logs2. A Kafka instance in another process or thread is using this directory.
分析问题:
错误日志意思为:Kafka-logs2日志文件在另一个进程中已被占用
解决方法:
ps -ef|grep kafka,杀掉使用该目录的进程即可;
分析问题:
message.max.size(默认是1048567及1M)
Exception in thread "main" org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {test-peng-0=51285860} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size on the client (using max.partition.fetch.bytes), or decrease the maximum message size the broker will allow (using message.max.bytes).
解决方案:
重新设置message.max.bytes",33554432 ,是为了限制batch在一次request中发送太大的信息量。
2.4.3.1启动时内存不足
##There is insufficient memory for the Java Runtime Environment to continue.
#Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory.
#An error report file with more information is saved as:
#//hs_err_pid6500.logOpenJDK
64-BitServer VM warning:INFO: os::commit_memory(0x00000000bad30000,
986513408,0)failed; error='Cannotallocate memory'(errno=12)
原因:kafka启动脚本kafka-server-start.sh中指定了kafka启动时需要的最小内存,默认为1G
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
虚拟机分配的虚拟内存在1G以下时就会出现该错误。
解决方法:修改脚本kafka-server-start.sh中的最小启动内存,设置为较小值。
2.4.3.2.启动时出现oom
[ FATAL ] Fatal error during KafkaServerStable startup. Prepare to shutdown
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:42)
..........
==========================================================
原因:kafka启动时分配的内存过小导致
解决方法:同样脚本kafka-server-start.sh中的最小启动内存,设置为较大值。
2.4.3.3.kafka 启动时报java.io.IOException: No space left on device
发现Kafka所在磁盘已满,清理磁盘
Linux 查看磁盘空间:
df
df 以磁盘分区为单位查看文件系统,可以获取硬盘被占用了多少空间,目前还剩下多少空间等信息。
例如,我们使用df -h命令来查看磁盘信息, -h 选项为根据大小适当显示:
显示内容参数说明:
Filesystem:文件系统
Size: 分区大小
Used: 已使用容量
Avail: 还可以使用的容量
Use%: 已用百分比
Mounted on: 挂载点
解决方案:
清理数据,释放空间:
linux删除某个文件下的所有文件:
进入这个文件夹
然后用命令 rm -rf *。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。