赞
踩
1、SparkStreaming的高级APi CreateDStream,容易发生数据多次读取,官方已经不推荐
2、SparkStreaming的低级APi createDirectStream 需要自己保存offset
保存方式有两大类,一类是Spark自带的checkpoint()但是由于其移植性比较差,影响代码升级,因此生产中很少使用。第二类就是将偏移量保存到外部数据库中如redis,hbase,zookeeper,mysql,oracle中,本文将偏移量保存到redis中
1、ConsumerStrategies.Subscribe()自动模式分区自动指定
2、ConsumerStrategies.Assign()手动模式分区手动指定
二者尽量不要混合使用,容易发生问题,生产中尽量使用自动模式
具体差别请参考:
https://www.jianshu.com/p/b09c28d45b82
https://www.cnblogs.com/dongxiao-yang/p/7200971.html
App
package com.xxy.app
import com.xxy.utils.JedisConnPool
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.{Jedis, Transaction}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try
object RedisApp {
def getOffset(topics: Array[String], groupId: String): mutable.Map[TopicPartition, Long] = {
//可变的Map,便于后面添加偏移量
val fromOffset = scala.collection.mutable.Map[TopicPartition, Long]()
// 获取redis存储的值
val jedis: Jedis = JedisConnPool.getConnections()
topics.foreach(topic => {
val keys = jedis.keys(s"offset_${groupId}_${topic}*")
if (! keys.isEmpty) {
keys.asScala.foreach(key => {
val offset = jedis.get(key)
val partition = Try(key.split(s"offset_${groupId}_${topic}_").apply(1)).getOrElse("0")
//输出
println(partition+":::"+offset)
fromOffset.put(new TopicPartition(topic, partition.toInt), offset.toLong)
})
}
})
jedis.close()
fromOffset
}
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("offSet Redis").setMaster("local[2]")
val context: SparkContext = new SparkContext(conf)
context.setLogLevel("WARN")
val ssc: StreamingContext = new StreamingContext(context,Seconds(10))
//kafka topic
val topics = Array("offset-redis-01")
//kafka params
val kafkaParams: Map[String, Object] = Map(
"bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "offSet-Redis-Test",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val groupId: String = kafkaParams.get("group.id").get.toString
//获取消费方式消费方式有三种
//earlist:当个分区下有已经提交的offset时候,从提交的offset开始消费,无提交的offset时,从头开始消费
//latest:当各分区下有已经提交的offset时候,从提交的offset开始消费,无提交的offset时,消费新产生的该分区下的数据
//none:topic各分区都存在已经提交的offset时,从offset后开始消费,只要有一个分区不存在已提交的offset,则抛出异常
val reset: String = kafkaParams.get("auto.offset.reset").get.toString
//获取偏移量
val offsets = getOffset(topics,groupId);
//spark读取分方式,均匀分布
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
val conumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(topics,kafkaParams,offsets)
//创建directstream
val kafkaInputStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,locationStrategy,conumerStrategy)
kafkaInputStream.foreachRDD(
rdd=>{
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
if (!rdd.isEmpty()) {
val jedis: Jedis = JedisConnPool.getConnections()
//开启jedis事务
val transaction: Transaction = jedis.multi()
//统计数据
val result: RDD[(String, Long)] = rdd.map(_.value).flatMap(_.split(" ")).map((_,1L)).reduceByKey(_+_)
result.foreach(println)
//重新设置redis值
offsetRanges.foreach(
iter=>{
val key = s"offset_${groupId}_${iter.topic}_${iter.partition}"
val value = iter.untilOffset
transaction.set(key,value.toString)
}
)
transaction.exec()
transaction.close()
jedis.close()
}
}
)
ssc.start()
ssc.awaitTermination()
}
}
JedisConnPool
package com.xxy.utils
import java.io.FileInputStream
import java.util.Properties
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object JedisConnPool {
//加载配置文件
val properties = new Properties()
val path = Thread.currentThread().getContextClassLoader.getResource("jedis.properties").getPath
properties.load(new FileInputStream(path))
val host: String = properties.getProperty("redis.host")
val auth: String = properties.getProperty("redis.auth")
val port: Int = properties.getProperty("redis.port").toInt
//连接配置
val config = new JedisPoolConfig
//最大连接数
config.setMaxTotal(properties.getProperty("redis.maxConn").toInt)
//最大空闲连接数
config.setMaxIdle(properties.getProperty("redis.maxIdle").toInt)
//设置连接池属性分别
val pool = new JedisPool(config,host,port,10000,auth)
//连接池
def getConnections():Jedis = {
pool.getResource
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。