赞
踩
上节完成了如下的内容:
Spark Streaming 集成Kafka,允许从Kafka中读取一个或者多个Topic的数据,一个Kafka Topic包含一个或者多个分区,每个分区中的消息顺序存储,并使用offset来标记消息位置,开发者可以在Spark Streaming应用中通过offset来控制数据的读取位置。
Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性是非常重要的,如果在应用停止或者报错退出之前将Offset持久化保存,该消息就会丢失,那么Spark Streaming就没有办法从上次停止或保存的位置继续消费Kafka中的消息。
Spark Streaming 与 Kafka 整合时,允许获取其消费的Offset,具体方法如下:
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
注意:对 HashOffsetRanges的类型转换只有在对 createDirectStream 调用的第一个方法中完成时才会成功,而不是在随后的方法链中。RDD分区和Kafka分区之间的对应关系在Shuffle或重分区后会丧失,如 reduceByKey 或 window。
在 Spark Streaming程序失败的情况下,Kafka交付语义取决于 如何、何时存储偏移量,Spark输出操作的语义为 at-least-once。
如果要实现EOS语义(Exactly Once Semantics),必须在幂等的输出之后存储偏移量或者将存储偏移量与输出放在一个事务中。可以按照增加可靠性(和代码复杂度)的顺序使用以下选项来存储偏移量。
CheckPoint 是 Spark Streaming 运行过程中的元数据和每RDDs的数据状态保存到一个持久化系统中,当然这里面也包含了Offset,一般是 HDFS、S3,如果应用程序或者集群挂了,可以迅速恢复。
如果 Spark Streaming程序代码变了,重新打包执行就会出现反序列化异常的问题。
这是因为 CheckPoint 首次持久化时会将整个Jar包序列化,以便重启时恢复,重新打包后,新旧逻辑不一致,导致报错。
要解决这个问题,只能将HDFS上的CheckPoint文件删除,但这样也会同时删除Kafka的Offset信息。
默认情况下,消费者定期自动提交偏移量,它将偏移量存储子啊一个特殊的Kafka主题中(_consumer_offsets),但在某些情况下,这将导致问题,因为消息可能已经被消费者从Kafka拉取了,但是还没有处理。
可以将 enable.auto.commit 设置为 false,在 Spark Streaming程序输出结果后,手动提交偏移。
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 在输出操作完成之后,手工提交偏移量;此时将偏移量提交到 Kafka 的消息队列中
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
与 HasOffsetRanges 一样,只有在 createDirectStream 的结果上调用时,转换到 CanCommitOffsets 才会成功,而不是在转换之后,commitAsync调用是线程安全的,但必须在输出之后执行。
Offsets可以通过多种方式来管理,但是一般来说遵循下面的步骤:
要想将 Offset 保存到外部的存储中,关键实现以下几个功能:
package icu.wzk
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object kafkaDStream2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("KafkaDStream2")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
val kafkaParams: Map[String, Object] = getKafkaConsumerParameters("wzkicu")
val topics: Array[String] = Array("spark_streaming_test01")
// 从指定位置获取Kafka数据
val offsets: collection.Map[TopicPartition, Long] = Map(
new TopicPartition("spark_streaming_test01",0) -> 100,
// 我这里只有一个分区 你可以多创建几个
// new TopicPartition("spark_streaming_test01", 1) -> 200,
// new TopicPartition("spark_streaming_test01", 2) -> 300,
)
// 从Kafka中获取数据
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
)
// DStream 输出
dstream.foreachRDD {
(rdd, time) => {
println(s"=========== rdd.count = ${rdd.count()}, time = $time ==============")
}
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition {
iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic}, ${o.partition}, ${o.fromOffset}, ${o.untilOffset}")
}
}
ssc.start()
ssc.awaitTermination()
}
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}
运行结果如下图所示:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。