赞
踩
主要内容:
SparkStreaming启动时,先请求Redis或Hbase;
Redis或Hbase返回请求结果,将结果(Topic、Partition、Offset的组合)封装成collection.Map[TopicPartition, Long]
返回给SparkStreaming;
SparkStreaming采用createDirectStream方式连接Kafka,并根据请求Redis或Hbase的结果确定ConsumerStrategy策略,而ConsumerStrategy策略由Subscribe决定。具体说来,若collection.Map[TopicPartition, Long]
对象为空或不存在时,则不指定offset消费kafka;若collection.Map[TopicPartition, Long]
对象不为空,则指定offset消费kafka。下面对部分源码进行解释:
createDirectStream函数需要三个参数:
ssc
:SparkStreaming上下文locationStrategy
:源码中建议传入:LocationStrategies.PreferConsistent
consumerStrategy
:源码中建议传入:ConsumerStrategies.Subscribe
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]] = {
val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
}
Subscribe函数可传入两个或三个参数:
topics
:Kafka对应topickafkaParams
:Kafka相关配置offsets
:可传可不传,若传该参数,表示指定Offset消费Kafkadef Subscribe[K, V](
topics: Iterable[jl.String],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}
SparkStreaming消费Kafka得到InputDStream[ConsumerRecord[K, V]]
对象,其中ConsumerRecord
对象:Topic、Partition、Offset等信息:
/**
* Creates a record to be received from a specified topic and partition
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param checksum The checksum (CRC32) of the full record
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
* @param headers The headers of the record.
*/
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampT
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。