当前位置:   article > 正文

11SparkStreaming消费kafka以及offset提交_sparkstreaming消费kafka数据, 提交offset

sparkstreaming消费kafka数据, 提交offset

park streaming流式处理kafka中的数据,第一步是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。

一、Receiver方式消费kafka

这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,receiver从kafka接收来的数据会存储在spark的executor内存中(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),之后spark streaming提交的job会处理这些数据。
在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
Receiver方式如下图:
在这里插入图片描述
使用Receiver方式消费kafka需要注意的点:

  • 1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量(只是增加数据拉取的并行度,不是数据处理)。不会增加Spark处理数据的并行度。
  • 2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据,之后可以利用union来统一成一个Dstream
  • 3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER

二、Direct方式消费kafka

在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
Direct方式如下图:
在这里插入图片描述
使用Direct方式的优势:

  • 1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以Direct方式在Kafka partition和RDD partition之间,有一个一对一的映射关系
  • 2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复
  • 3、一次且仅一次的事务机制:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。(当然,offset自己记录的话,可以利用checkpoint、数据库或文件记录或者回写到zookeeper中或者调用API写入Kafka的topic中进行记录

三、消费kafka时offset提交

首先是思路,有好几种方式:

  • 1、将offset手动维护到外部介质中,如Zookeeper、mysql、redis。
  • 2、将offset通过foreachRDD的方式维护到kafka中。
  • 3、通过监听的方式将offset维护到kafka中,当然也可以是Zookeeper、mysql、redis等外部介质。
    在这里插入图片描述
    在这里插入图片描述
    建议使用监听的方式进行维护,因为kafka和SparkStreaming中维护的分区的对应关系,直接使用foreachRDD的当时也可以,但是这样会将业务逻辑置于RDD中进行处理,丧失了SparkStreaming特有的算子特性,例如窗口算子之类的,使用监听方式,继承Listene中的onBatchCompleted方法,在该方法中实现寿佛那个维护offset即可。
    注意:手动维护offset的时候,最好做一下任务是否有报错的判断,防止丢数,当任务有失败时,不提交offset。

四、手动维护offset简单代码演示

维护到zookeeper
  • 使用到了commitAsync() api直接操作Zookeeper

在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。Spark Streaming也专门提供了commitAsync() API用于提交offset。使用方法如下。

stream.foreachRDD {
    rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 确保结果都已经正确且幂等地输出了
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

维护到zookeeper

  • 使用到了KafkaCluster 中的api直接操作Zookeeper
package com.bigdata.spark
 
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{
   HasOffsetRanges, KafkaCluster, KafkaUtils}
import org.apache.spark.streaming.{
   Seconds, StreamingContext}
 
import scala.collection.mutable
 
/**
  * 
  *   kafka对接sparkstreaming,手动维护offset到zookeeper
  */
object KafkaStreaming {
   
 
  def main(args: Array[String]): Unit = {
   
 
    //初始化ssc
    val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
    ssc.sparkContext.setLogLevel("ERROR")
 
 
    //kafka参数
    val brokers = "linux1:9092,linux2:9092,linux3:9092"
    val topic = "first"
    val group = "bigdata"
    val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
 
    val kafkaParams = Map(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)
 
 
    //创建KafkaCluster对象,维护offset
    val cluster = new KafkaCluster(kafkaParams)
 
    //获取初始偏移量
    val fromOffset: Map[TopicAndPartition, Long] = getOffset(cluster, group, topic)
 
    //创建流
    val kafkaS
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号