赞
踩
Spark Streaming应用与实战系列包括以下六部分内容:
本篇为第一部分,包括背景与架构改造、通过代码实现具体细节并运行项目、对Streaming监控的介绍以及解决实际问题、对项目做压测与相关的优化。
1.1 问题描述
有一块业务主要是做爬虫抓取与数据输出,通过大数据这边提供的SOA服务入库到HBase,架构大致如下:
以对于以上的架构存在一些问题,我们可以看见数据在Dubbox服务阶段处理后直接通过HBase API入库了HBase,中间并没做任何缓冲,要是HBase出现了问题整个集群都完蛋,没法写入数据,数据还丢失,HBase这边压力也相当大,针对这一点,对入库HBase这个阶段做了一些改造。
1.2 架构改造
改造后的架构,爬虫通过接口服务,入库到Kafka,Spark streaming去消费kafka的数据,入库到HBase.核心组件如下图所示:
为什么不直接入库到HBase,这样做有什么好处?
1.3 为什么选择Kafka和Spark streaming
然后就开始写代码了,总体思路就是:
2.1 初始化与配置加载
下面是一些接收参数,加载配置,获取配置中的topic,还有初始化配置,代码如下:
- //接收参数
- val Array(kafka_topic, timeWindow, maxRatePerPartition) = args
-
- //加载配置
- val prop: Properties = new Properties()
- prop.load(this.getClass().getResourceAsStream("/kafka.properties"))
-
- val groupName = prop.getProperty("group.id")
-
- //获取配置文件中的topic
- val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic)
- if (kafkaTopics == null || kafkaTopics.length <= 0) {
- System.err.println("Usage: KafkaDataStream <kafka_topic> is number from kafka.properties")
- System.exit(1)
- }
-
- val topics: Set[String] = kafkaTopics.split(",").toSet
-
- val kafkaParams = scala.collection.immutable.Map[String, String](
- "metadata.broker.list" -> prop.getProperty("bootstrap.servers"),
- "group.id" -> groupName,
- "auto.offset.reset" -> "largest")
-
- val kc = new KafkaCluster(kafkaParams)
-
- //初始化配置
- val sparkConf = new SparkConf()
- .setAppName(KafkaDataStream.getClass.getSimpleName + topics.toString())
- .set("spark.yarn.am.memory", prop.getProperty("am.memory"))
- .set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead"))
- .set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead"))
- .set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition) //此处为每秒每个partition的条数
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .set("spark.reducer.maxSizeInFlight", "1m")
-
- val sc = new SparkContext(sparkConf)
- val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt)) //多少秒处理一次请求
只是需要注意一下,这里的KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public即可。
2.2 链接ZK
注意:这里的ZKStringSerializer,需要把源码拷贝过来,修改一下。
- //zk
- val zkClient = new ZkClient(prop.getProperty("zk.connect"), Integer.MAX_VALUE, 100000, ZKStringSerializer)
- val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
2.3 组装fromOffsets
组装fromOffsets,createDirectStream接收的是一个map的结构,所以可以支持多个topic的消费。
- var fromOffsets: Map[TopicAndPartition, Long] = Map() //多个partition的offset
-
-
- //支持多个topic : Set[String]
- topics.foreach(topicName => {
-
- //去brokers中获取partition数量,注意:新增partition后需要重启
- val children = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName))
- for (i <- 0 until children) {
-
- //kafka consumer 中是否有该partition的消费记录,如果没有设置为0
- val tp = TopicAndPartition(topicName, i)
- val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i"
- if (zkClient.exists(path)) {
- fromOffsets += (tp -> zkClient.readData[String](path).toLong)
- } else {
- fromOffsets += (tp -> 0)
- }
- }
- })
2.4 通过createDirectStream接受数据
使用KafkaUtils里面的createDirectStream方法去消费kafka数据,createDirectStream使用的是kafka简单的Consumer API,所以需要自己去管理offset,我们把offset写入到zk中,这样也方便了一些监控软件读取记录。
- //创建Kafka持续读取流,通过zk中记录的offset
- val messages: InputDStream[(String, String)] =
- KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
2.5 入库
入库HBase:
-
- //数据操作
- messages.foreachRDD(rdd => {
- val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-
- //data 处理
- rdd.foreachPartition(partitionRecords => {
- //TaskContext 上下文
- val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
- logger.info(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")
-
- //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
- val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
- val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是
- if (either.isLeft) {
- logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}")
- }
-
- partitionRecords.foreach(data => {
- HBaseDao.insert(data)
- })
- })
-
- })
插入数据到具体HBase数据库:
- /**
- *
- * 插入数据到 HBase
- *
- * 参数( tableName , json ) ):
- *
- * Json格式:
- * {
- * "rowKey": "00000-0",
- * "family:qualifier": "value",
- * "family:qualifier": "value",
- * ......
- * }
- *
- * @param data
- * @return
- */
- def insert(data: (String, String)): Boolean = {
-
- val t: HTable = getTable(data._1) //HTable
- try {
- val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2)
- val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey
- val put = new Put(rowKey)
-
- for ((k, v) <- map) {
- val keys: Array[String] = k.split(":")
- if (keys.length == 2){
- put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes)
- }
- }
-
- Try(t.put(put)).getOrElse(t.close())
- true
- } catch {
- case e: Exception =>
- e.printStackTrace()
- false
- }
- }
2.6 运行并查看结果
运行命令:
/opt/cloudera/parcels/CDH/bin/spark-submit --master yarn-client --class com.xiaoxiaomo.streaming.KafkaDataStream hspark-1.0.jar 1 3 1000
运行后可以去spark UI中去查看相关运行情况,UI中具体细节见下文。
这部分主要在代码运行起来的情况下来看一下任务的运行情况主要是streaming的监控界面,以及我们怎么去通过监控界面发现问题和解决问题。
3.1 监控
官网中指出,spark中专门为SparkStreaming程序的监控设置了额外的途径,当使用StreamingContext时,在WEB UI中会出现一个”Streaming”的选项卡:
在此选项卡内,统计的内容展示如下:
Spark streaming 处理速度为3s一次,每次1000条。
Kafka product 每秒1000条数据,与上面spark consumer消费者恰好相等。结果:数据量大导致积压,这个过程中active Batches会越变越大。
因为忽略了实际的Processing time:
这其中包括接受的记录数量,每一个batch内处理的记录数,处理时间,以及总共消耗的时间。在上述参数之中最重要的两个参数分别是Porcessing Time 以及 Scheduling Delay:
如果PT比SD大,或者SD持续上升,这就表明此系统不能对产生的数据实时响应,换句话来说就是,出现了处理时延,每个batch time 内的处理速度小于数据的产生速度。
在这种情况下,读者需要想法减少数据的处理速度,即需要提升处理效率。
3.2 问题发现
在我做压测的时候, Spark streaming 处理速度为3s一次,每次1000条。
Kafka product每秒1000条数据, 与上面spark consumer消费者恰好相等。于是就会数据量大导致积压,这个过程中active Batches会越变越大。最后发现了一个问题:
当压测峰值过后Input Size=0 events,时间仍然不减,奇怪!
查看摸个具体stage:
从图中, 我们可以看到Spark总共调度分发了两批次task set, 每个task set的处理(含序列化和压缩之类的工作)都不超过100毫秒,那么该Stage何来消耗4s呢?慢着,貌似这两批次的task set分发的时间相隔得有点长啊,隔了4秒左右。为什么会隔这么就才调度一次呢?
此处要引入一个配置项“spark.locality.wait”(默认等待3s),它配置了本地化调度降级所需要的时间。这里概要补充下Spark本地化调度的知识,Spark的task一般都会分发到它所需数据的那个节点,这称之为”NODE_LOCAL”,但在资源不足的情况下,数据所在节点未必有资源处理task,因此Spark在等待了“spark.locality.wait”所配置的时间长度后,会退而求其次,分发到数据所在节点的同一个机架的其它节点上,这是“RACK_LOCAL”。当然,也有更惨的,就是再等了一段“spark.locality.wait”的时间长度后,干脆随便找一台机器去跑task,这就是“ANY”策略了。
官网解释:How long to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.点击查看来源。
而从上例看到, 即使用最差的”ANY”策略进行调度,task set的处理也只是花了100毫秒,因此,没必要非得为了”NODE_LOCAL”策略的生效而去等待那么长的时间,特别是在流计算这种场景上。所以把“spark.locality.wait”果断调小,从1秒到500毫秒,最后干脆调到100毫秒算了。
spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark-1.0.jar
调了之后的处理时间为0.7s:
具体耗时如下:
对项目做压测与相关的优化,主要从内存(executor-memory和driver-memory)、num-executors、executor-cores,以及代码层面做一些测试和改造。
4.1 压测
spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.xiaoxiaomo.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark.jar 3 1000
Spark streaming 处理速度为3s一次,每次1000条。
Kafka product 每秒1000条数据, 与上面spark consumer消费者恰好相等。结果:数据量大导致积压,这个过程中active Batches会越变越大。
调整Kafka product 每秒600条数据,存在积压,但已经不严重:
调整Kafka product 每秒500条数据,为消费者50%,测试结果显示正常,等待时间很稳定:
但是。此时每秒吞吐量为500显然不够,通过调整间歇实际等,发现并没有变化:
spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.xiaoxiaomo.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark.jar 2 2000 Spark streaming 处理速度为2s一次,每次2000条
Kafka product 每秒500条数据,可以看见没有在指定时间内消费完数据,照成数据积压,并发下降了。
4.2 分析原因
分析原因,发现大部分耗时都在处理数据这样一阶段,如下图所示:
4.3 调整参数
调整 executor-cores:
调整executor内存,并发没有增长,无效:
调整am内存,并发没有增长,无效:
4.4 代码调整
发现现在主要还是在处理数据的时候消耗时间一直没有减少,而处理数据查看后发现是一条一条的往HBase里面插入的,修改为批量插入,重新构建了json.性能猛增!!修改前的代码:
- /**
- *
- * 插入数据到 HBase
- *
- * 参数( tableName , json ) ):
- *
- * Json格式:
- * {
- * "rowKey": "00000-0",
- * "family:qualifier": "value",
- * "family:qualifier": "value",
- * ......
- * }
- *
- * @param data
- * @return
- */
- def insert(data: (String, String)): Boolean = {
- val t: HTable = getTable(data._1) //HTable
- try {
- val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2)
- val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey
- val put = new Put(rowKey)
- for ((k, v) <- map) {
- val keys: Array[String] = k.split(":")
- if (keys.length == 2){
- put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes)
- }
- }
- Try(t.put(put)).getOrElse(t.close())
- true
- } catch {
- case e: Exception =>
- e.printStackTrace()
- false
- }
- }
修改后的代码:
- //数据操作
- messages.foreachRDD(rdd => {
- val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- //data 处理
- rdd.foreachPartition(partitionRecords => {
- //TaskContext 上下文
- val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
- logger.debug(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")
- //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
- val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
- val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是
- if (either.isLeft) {
- logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}")
- }
- /** 解析PartitionRecords数据 */
- if (offsetRange.topic != null) {
- HBaseDao.insert(offsetRange.topic, partitionRecords)
- }
- })
- })
插入数据到HBase:
- /**
- *
- * 插入数据到 HBase
- *
- * 参数( tableName , [( tableName , json )] ):
- *
- * Json格式:
- * {
- * "r": "00000-0",
- * "f": "family",
- * "q": [
- * "qualifier",
- * "qualifier"
- * ...
- * ],
- * "v": [
- * "value",
- * "value"
- * ...
- * ],
- * }
- *
- * @return
- */
- def insert(tableName: String, array: Iterator[(String, String)]): Boolean = {
- try {
- /** 操作数据表 && 操作索引表 */
- val t: HTable = getTable(tableName) //HTable
- val puts: util.ArrayList[Put] = new util.ArrayList[Put]()
- /** 遍历Json数组 */
- array.foreach(json => {
- val jsonObj: JSONObject = JSON.parseObject(json._2)
- val rowKey: Array[Byte] = jsonObj.getString("r").getBytes
- val family: Array[Byte] = jsonObj.getString("f").getBytes
- val qualifiers: JSONArray = jsonObj.getJSONArray("q")
- val values: JSONArray = jsonObj.getJSONArray("v")
- val put = new Put(rowKey)
- for (i <- 0 until qualifiers.size()) {
- put.addColumn(family, qualifiers.getString(i).getBytes, values.getString(i).getBytes)
- }
- puts.add(put)
- })
- Try(t.put(puts)).getOrElse(t.close())
- true
- } catch {
- case e: Exception =>
- e.printStackTrace()
- logger.error(s"insert ${tableName} error ", e)
- false
- }
- }
4.5 运行
刚测试时给它相对很小的内存跑一跑:
- [root@xiaoxiaomo.com ~]# /opt/cloudera/parcels/CDH/bin/spark-submit \
- --master yarn-client --num-executors 1 \
- --driver-memory 256m --conf spark.yarn.driver.memoryOverhead=256 \
- --conf spark.yarn.am.memory=256m --conf spark.yarn.am.memoryOverhead=256 \
- --executor-memory 256m --conf spark.yarn.executor.memoryOverhead=256 \
- --executor-cores 1 \
- --class com.creditease.streaming.KafkaDataStream hspark-1.0.jar 1 3 30000
五六万的插入没什么压力,但是到10万的时候,就有些卡顿了!!
当然是需要增大内存的,修改配置,都增加一倍:
- [root@xiaoxiaomo.com ~]# /opt/cloudera/parcels/CDH/bin/spark-submit \
- --master yarn-client --num-executors 2 \
- --driver-memory 512m --conf spark.yarn.driver.memoryOverhead=512 \
- --conf spark.yarn.am.memory=512m --conf spark.yarn.am.memoryOverhead=512 \
- --executor-memory 512m --conf spark.yarn.executor.memoryOverhead=512 \
- --executor-cores 1 \
- --class com.creditease.streaming.KafkaDataStream hspark-1.0.jar 1 3 30000
查看插入数据量,能看到修改后插入数据10万是没有什么压力的:
当我们再继续加大压力测试的时候,性能下降:
查看统计信息:
5.1 设置WALog
put.setDurability(Durability.SKIP_WAL)/* 跳过写WALog /
关闭WALog后写入能到20万,但是发现还是不是特别稳定,有时耗时还是比较长的,发现此阶段正在做Compaction!!!
HBase是一种Log-Structured Merge Tree架构模式,用户数据写入先写WAL,再写缓存,满足一定条件后缓存数据会执行flush操作真正落盘,形成一个数据文件HFile。随着数据写入不断增多,flush次数也会不断增多,进而HFile数据文件就会越来越多。然而,太多数据文件会导致数据查询IO次数增多,因此HBase尝试着不断对这些文件进行合并,这个合并过程称为Compaction。
Compaction会从一个region的一个store中选择一些hfile文件进行合并。合并说来原理很简单,先从这些待合并的数据文件中读出KeyValues,再按照由小到大排列后写入一个新的文件中。之后,这个新生成的文件就会取代之前待合并的所有文件对外提供服务。
HBase根据合并规模将Compaction分为了两类:MinorCompaction和MajorCompaction。
更多Compaction信息参考:http://hbasefly.com/2016/07/13/hbase-compaction-1/
5.2 调整压缩
通常生产环境会关闭自动major_compact(配置文件中hbase.hregion.majorcompaction设 为0),选择一个晚上用户少的时间窗口手工major_compact。
手动:major_compact ‘testtable’
如果hbase更新不是太频繁,可以一个星期对所有表做一次 major_compact,这个可以在做完一次major_compact后,观看所有的storefile数量,如果storefile数量增加到 major_compact后的storefile的近二倍时,可以对所有表做一次major_compact,时间比较长,操作尽量避免高锋期。
查看统计信息
Compact触发条件:
周期为:Hbase.server.thread.wakefrequencyhbase.server.compactchecker.interval.multiplier触发compaction,后面还有一些其他的条件也可以在源码里面看看
条件的验证逻辑就是在这个时间范围:mcTime= 7-70.5天,7+70.5天=3.5-10.5;
是否有文件修改具体逻辑可见RatioBasedCompactionPolicy#isMajorCompaction方法。
5.3 Split
通过上面的截图我们可以看到,该表只有一个region,写入数据都集中到了一台服务器,这个远远没有发挥出HBase集群的能力呀,手动拆分吧!
拆分后:
这是Spark Streaming系列博客的最后一部分,主要讲一下我自己对Spark Streaming任务的一些划分,还有一个Spark Streaming任务的邮件监控。
6.1 Streaming 任务的划分
当Spark Streaming开发完成,测试完成之后,就发布上线了,Spark Streaming任务的划分,以及时间窗口调试多少这些都是更具业务划分的。
那一个Streaming消费者到底去对应哪些topic呢?还有为什么这么划分,以及这样划分有什么好处呢?
6.2 Streaming任务的监控
对于Spark Streaming job的监控,自带的Streaming UI能看到具体的一些流量,时间等信息,但是缺少了一个通知,于是简单的开发了一个。在监控这一块也想了不少方案,比如监控pid,通过shell去监控,或者直接调用源码里面的方法,都尝试过,有的要么没达到预期的效果,要么有的不是很好维护开发成本高。
最终选了一个比较简单的,但是又能达到一定效果的,通过py爬虫,到原始的streaming UI界面去获取到具体的信息,来监控,到达阈值就发送邮件,总体步骤如下:
具体代码:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。