赞
踩
park streaming流式处理kafka中的数据,第一步是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。
这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,receiver从kafka接收来的数据会存储在spark的executor内存中(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),之后spark streaming提交的job会处理这些数据。
在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
Receiver方式如下图:
使用Receiver方式消费kafka需要注意的点:
在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
Direct方式如下图:
使用Direct方式的优势:
首先是思路,有好几种方式:
在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。Spark Streaming也专门提供了commitAsync() API用于提交offset。使用方法如下。
stream.foreachRDD {
rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 确保结果都已经正确且幂等地输出了
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
package com.bigdata.spark
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{
HasOffsetRanges, KafkaCluster, KafkaUtils}
import org.apache.spark.streaming.{
Seconds, StreamingContext}
import scala.collection.mutable
/**
*
* kafka对接sparkstreaming,手动维护offset到zookeeper
*/
object KafkaStreaming {
def main(args: Array[String]): Unit = {
//初始化ssc
val conf: SparkConf = new SparkConf().setAppName("").setMaster("local[*]")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
ssc.sparkContext.setLogLevel("ERROR")
//kafka参数
val brokers = "linux1:9092,linux2:9092,linux3:9092"
val topic = "first"
val group = "bigdata"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val kafkaParams = Map(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)
//创建KafkaCluster对象,维护offset
val cluster = new KafkaCluster(kafkaParams)
//获取初始偏移量
val fromOffset: Map[TopicAndPartition, Long] = getOffset(cluster, group, topic)
//创建流
val kafkaS
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。