赞
踩
题主在去年8月份首次使用spark streaming进行流式计算的时候遇到的一个问题,即spark streaming读取kafka消息进行流式计算, 但是在数据量比较大的情况下总会出现一些batch的process time比较长,但是大多数batch能在较短的时间内完成,而且全部的batch运行时间呈两个极端分布,要么很长要么很短。
如上图,运行时间曲线出现多处尖峰,而我们期望的一般是连续平滑的曲线。
先说明题主这边的运行环境状况:集群搭建是基于HDP2.6.3版本,其中spark版本是2.11, kafka版本是0.10,其中spark、kafka、HDFS共享集群资源,业务诉求是每5分钟触发一次batch进行统计,需要在5分钟内计算完成,因为从运行图上看卡顿时间大约几十秒,并不会影响最终5分钟内运行完的要求,因此前期阶段只是断断续续在找原因并没有纠缠,但是近期接到项目要求统计1分钟粒度数据,那这个问题就极有可能影响到最后的运行是否按时完成,经过一段颇费周折的排查,终于找到原因,并将过程记录下来希望帮到遇到类似问题的同学。
19/06/28 20:00:05 WARN TaskSetManager: Lost task 0.0 in stage 510.0 (TID 54064, HDP-datanode02, executor 11): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-online-pingan-spark-testtt pingan 4 849148846 after polling for 5000 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:100) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:92) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
很明显,大致意思是5000ms内未从kafka中获取到任何数据信息,接下来会进行重试,原因找到了,之前未获取到消息会一直阻塞到大约延迟40秒拿到消息运行成功,现在是5秒内拿不到消息就重试,不长时间等待。
我们可以先修改两个参数临时解决这个问题,第一,减小spark.streaming.kafka.consumer.poll.ms参数到3000ms以内,即3秒超时就重试,第二,将spark.task.maxFailures改为10,默认值是4,加大重试次数,修改完这两个参数后基本上解决了这个问题,多数批次在阻塞重连后都能很快读到消息并运行成功。但这只是临时解决方案,kafka集群不稳定是最根本的原因,最后我们还是建议将kafka集群和计算、存储集群分开部署,减少CPU、IO密集对消息队列带来的不稳定影响。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。