当前位置:   article > 正文

Spark——消费Kafka数据保存Offset到Redis_spark offset

spark offset

主要内容:

  • Scala实现SparkStreaming消费Kafka数据保存Offset到Redis,实现自主维护Offset
  • 介绍部分源码

SparkStreaming自主维护Offset的流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-q52Z4ZVl-1587142975134)(C:\Users\86134\OneDrive\spark-offset-saved.png)]

  1. SparkStreaming启动时,先请求Redis或Hbase

  2. Redis或Hbase返回请求结果,将结果(Topic、Partition、Offset的组合)封装成collection.Map[TopicPartition, Long]返回给SparkStreaming;

  3. SparkStreaming采用createDirectStream方式连接Kafka,并根据请求Redis或Hbase的结果确定ConsumerStrategy策略,而ConsumerStrategy策略由Subscribe决定。具体说来,若collection.Map[TopicPartition, Long]对象为空或不存在时,则不指定offset消费kafka;若collection.Map[TopicPartition, Long]对象不为空,则指定offset消费kafka。下面对部分源码进行解释:

    createDirectStream函数需要三个参数:

    • ssc:SparkStreaming上下文
    • locationStrategy:源码中建议传入:LocationStrategies.PreferConsistent
    • consumerStrategy:源码中建议传入:ConsumerStrategies.Subscribe
    def createDirectStream[K, V](
          ssc: StreamingContext,
          locationStrategy: LocationStrategy,
          consumerStrategy: ConsumerStrategy[K, V]
        ): InputDStream[ConsumerRecord[K, V]] = {
         
        val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
        createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
      }

    Subscribe函数可传入两个或三个参数:

    • topics:Kafka对应topic
    • kafkaParams:Kafka相关配置
    • offsets:可传可不传,若传该参数,表示指定Offset消费Kafka
    def Subscribe[K, V](
          topics: Iterable[jl.String],
          kafkaParams: collection.Map[String, Object],
          offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
         
        new Subscribe[K, V](
          new ju.ArrayList(topics.asJavaCollection),
          new ju.HashMap[String, Object](kafkaParams.asJava),
          new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
      }
  4. SparkStreaming消费Kafka得到InputDStream[ConsumerRecord[K, V]]对象,其中ConsumerRecord对象:Topic、Partition、Offset等信息:

    /**
         * Creates a record to be received from a specified topic and partition
         *
         * @param topic The topic this record is received from
         * @param partition The partition of the topic this record is received from
         * @param offset The offset of this record in the corresponding Kafka partition
         * @param timestamp The timestamp of the record.
         * @param timestampType The timestamp type
         * @param checksum The checksum (CRC32) of the full record
         * @param serializedKeySize The length of the serialized key
         * @param serializedValueSize The length of the serialized value
         * @param key The key of the record, if one exists (null is allowed)
         * @param value The record contents
         * @param headers The headers of the record.
         */
        public ConsumerRecord(String topic,
                              int partition,
                              long offset,
                              long timestamp,
                              TimestampType timestampT
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号