赞
踩
上节完成了如下的内容:
Spark Streaming 集成Kafka,允许从Kafka中读取一个或者多个Topic的数据,一个Kafka Topic包含一个或者多个分区,每个分区中的消息顺序存储,并使用offset来标记消息位置,开发者可以在Spark Streaming应用中通过offset来控制数据的读取位置。
Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性是非常重要的,如果在应用停止或者报错退出之前将Offset持久化保存,该消息就会丢失,那么Spark Streaming就没有办法从上次停止或保存的位置继续消费Kafka中的消息。
Spark Streaming 可以通过 KafkaUtils.createDirectStream 直接与 Kafka 集成。这种方式不会依赖于 ZooKeeper,而是直接从 Kafka 分区中读取数据。
在这种直接方式下,Spark Streaming 依赖 Kafka 的 API 来管理和存储消费者偏移量(Offsets),默认情况下偏移量保存在 Kafka 自身的 __consumer_offsets 主题中。
Redis 作为一个高效的内存数据库,常用于存储 Spark Streaming 中的 Kafka 偏移量。
通过手动管理偏移量,你可以在每批次数据处理后,将当前批次的 Kafka 偏移量存储到 Redis 中。这样,在应用程序重新启动时,可以从 Redis 中读取最后处理的偏移量,从而从正确的位置继续消费 Kafka 数据。
应用启动时,从 Redis 中读取上次处理的偏移量,并从这些偏移量开始消费 Kafka 数据。
通过 Spark Streaming 处理从 Kafka 消费到的数据。
每处理完一批数据后,将最新的偏移量存储到 Redis 中。这样,如果应用程序崩溃或重启,可以从这个位置继续消费。
<!-- jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
服务器上我们需要有:
Redis服务启动
Kafka服务启动
编写代码,实现的主要逻辑如下所示:
package icu.wzk
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaDStream3 {
def main(args: Array[String]): Unit = {
Logger.getLogger("args").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("KafkaDStream3")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
val groupId: String = "wzkicu"
val topics: Array[String] = Array("spark_streaming_test01")
val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupId)
// 从 Kafka 获取 Offsets
val offsets: Map[TopicPartition, Long] = OffsetsRedisUtils.getOffsetsFromRedis(topics, groupId)
// 创建 DStream
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
)
// DStream 转换&输出
dstream.foreachRDD {
(rdd, time) =>
if (!rdd.isEmpty()) {
// 处理消息
println(s"====== rdd.count = ${rdd.count()}, time = $time =======")
// 将 Offsets 保存到 Redis
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
OffsetsRedisUtils.saveOffsetsToRedis(offsetRanges, groupId)
}
}
ssc.start()
ssc.awaitTermination()
}
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}
代码中我们封装了一个工具类:
package icu.wzk
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
import scala.collection.mutable
object OffsetsRedisUtils {
private val config = new JedisPoolConfig
private val redisHost = "h121.wzk.icu"
private val redisPort = 6379
config.setMaxTotal(30)
config.setMaxIdle(10)
private val pool= new JedisPool(config, redisHost, redisPort, 10000)
private val topicPrefix = "kafka:topic"
private def getKey(topic: String, groupId: String, prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"
private def getRedisConnection: Jedis = pool.getResource
// 从Redis中获取Offsets
def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
val jedis: Jedis = getRedisConnection
val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map {
topic =>
import scala.collection.JavaConverters._
jedis.hgetAll(getKey(topic, groupId))
.asScala
.map {
case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong
}
}
jedis.close()
offsets.flatten.toMap
}
// 将 Offsets 保存到 Redis
def saveOffsetsToRedis(ranges: Array[OffsetRange], groupId: String): Unit = {
val jedis: Jedis = getRedisConnection
ranges
.map(range => (range.topic, range.partition -> range.untilOffset))
.groupBy(_._1)
.map {
case (topic, buffer) => (topic, buffer.map(_._2))
}
.foreach {
case (topic, partitionAndOffset) =>
val offsets: Array[(String, String)] = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))
import scala.collection.JavaConverters._
jedis.hmset(getKey(topic, groupId), offsets.toMap.asJava)
}
jedis.close()
}
}
我们启动后,如图所示:
这里我使用Redis查看当前的存储情况:
可以看到当前已经写入了,我们继续启动 KafkaProducer工具,继续写入数据。
可以看到,已经统计到数据了。
我们继续查看当前的Redis中的数据:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。