当前位置:   article > 正文

Spark Streaming应用与实战全攻略_sparkstreaming应用

sparkstreaming应用

Spark Streaming应用与实战系列包括以下六部分内容:

  • 背景与架构改造
  • 通过代码实现具体细节,并运行项目
  • 对Streaming监控的介绍以及解决实际问题
  • 对项目做压测与相关的优化
  • Streaming持续优化之HBase
  • 管理Streaming任务

本篇为第一部分,包括背景与架构改造、通过代码实现具体细节并运行项目、对Streaming监控的介绍以及解决实际问题、对项目做压测与相关的优化。

一、背景与架构改造

1.1 问题描述

有一块业务主要是做爬虫抓取与数据输出,通过大数据这边提供的SOA服务入库到HBase,架构大致如下:

图片描述
架构改造之前

以对于以上的架构存在一些问题,我们可以看见数据在Dubbox服务阶段处理后直接通过HBase API入库了HBase,中间并没做任何缓冲,要是HBase出现了问题整个集群都完蛋,没法写入数据,数据还丢失,HBase这边压力也相当大,针对这一点,对入库HBase这个阶段做了一些改造。

1.2 架构改造

改造后的架构,爬虫通过接口服务,入库到Kafka,Spark streaming去消费kafka的数据,入库到HBase.核心组件如下图所示:

图片描述
架构改造图

为什么不直接入库到HBase,这样做有什么好处?

  • 缓解了HBase这边峰值的压力,并且流量可控;
  • HBase集群出现问题或者挂掉,都不会照成数据丢失的问题;
  • 增加了吞吐量。

1.3 为什么选择Kafka和Spark streaming

  • 由于Kafka它简单的架构以及出色的吞吐量;
  • Kafka与Spark streaming也有专门的集成模块;
  • Spark的容错,以及现在技术相当的成熟。

二、通过代码实现具体细节,并运行项目

然后就开始写代码了,总体思路就是:

  • put数据构造json数据,写入Kafka;
  • Spark Streaming任务启动后首先去Zookeeper中去读取offset,组装成fromOffsets;
  • Spark Streaming 获取到fromOffsets后通过KafkaUtils.createDirectStream去消费Kafka的数据;
  • 读取Kafka数据返回一个InputDStream的信息,foreachRDD遍历,同时记录读取到的offset到zk中;
  • 写入数据到HBase。

图片描述
详细一点的架构图

2.1 初始化与配置加载

下面是一些接收参数,加载配置,获取配置中的topic,还有初始化配置,代码如下:

  1. //接收参数
  2. val Array(kafka_topic, timeWindow, maxRatePerPartition) = args
  3. //加载配置
  4. val prop: Properties = new Properties()
  5. prop.load(this.getClass().getResourceAsStream("/kafka.properties"))
  6. val groupName = prop.getProperty("group.id")
  7. //获取配置文件中的topic
  8. val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic)
  9. if (kafkaTopics == null || kafkaTopics.length <= 0) {
  10. System.err.println("Usage: KafkaDataStream <kafka_topic> is number from kafka.properties")
  11. System.exit(1)
  12. }
  13. val topics: Set[String] = kafkaTopics.split(",").toSet
  14. val kafkaParams = scala.collection.immutable.Map[String, String](
  15. "metadata.broker.list" -> prop.getProperty("bootstrap.servers"),
  16. "group.id" -> groupName,
  17. "auto.offset.reset" -> "largest")
  18. val kc = new KafkaCluster(kafkaParams)
  19. //初始化配置
  20. val sparkConf = new SparkConf()
  21. .setAppName(KafkaDataStream.getClass.getSimpleName + topics.toString())
  22. .set("spark.yarn.am.memory", prop.getProperty("am.memory"))
  23. .set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead"))
  24. .set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead"))
  25. .set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition) //此处为每秒每个partition的条数
  26. .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  27. .set("spark.reducer.maxSizeInFlight", "1m")
  28. val sc = new SparkContext(sparkConf)
  29. val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt)) //多少秒处理一次请求

只是需要注意一下,这里的KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public即可。

2.2 链接ZK

注意:这里的ZKStringSerializer,需要把源码拷贝过来,修改一下。

  1. //zk
  2. val zkClient = new ZkClient(prop.getProperty("zk.connect"), Integer.MAX_VALUE, 100000, ZKStringSerializer)
  3. val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())

2.3 组装fromOffsets

组装fromOffsets,createDirectStream接收的是一个map的结构,所以可以支持多个topic的消费。

  1. var fromOffsets: Map[TopicAndPartition, Long] = Map() //多个partition的offset
  2. //支持多个topic : Set[String]
  3. topics.foreach(topicName => {
  4. //去brokers中获取partition数量,注意:新增partition后需要重启
  5. val children = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName))
  6. for (i <- 0 until children) {
  7. //kafka consumer 中是否有该partition的消费记录,如果没有设置为0
  8. val tp = TopicAndPartition(topicName, i)
  9. val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i"
  10. if (zkClient.exists(path)) {
  11. fromOffsets += (tp -> zkClient.readData[String](path).toLong)
  12. } else {
  13. fromOffsets += (tp -> 0)
  14. }
  15. }
  16. })

2.4 通过createDirectStream接受数据

使用KafkaUtils里面的createDirectStream方法去消费kafka数据,createDirectStream使用的是kafka简单的Consumer API,所以需要自己去管理offset,我们把offset写入到zk中,这样也方便了一些监控软件读取记录。

  1. //创建Kafka持续读取流,通过zk中记录的offset
  2. val messages: InputDStream[(String, String)] =
  3. KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

2.5 入库

入库HBase:

  1. //数据操作
  2. messages.foreachRDD(rdd => {
  3. val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  4. //data 处理
  5. rdd.foreachPartition(partitionRecords => {
  6. //TaskContext 上下文
  7. val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
  8. logger.info(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")
  9. //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
  10. val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
  11. val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //
  12. if (either.isLeft) {
  13. logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}")
  14. }
  15. partitionRecords.foreach(data => {
  16. HBaseDao.insert(data)
  17. })
  18. })
  19. })

插入数据到具体HBase数据库:

  1. /**
  2. *
  3. * 插入数据到 HBase
  4. *
  5. * 参数( tableName , json ) ):
  6. *
  7. * Json格式:
  8. * {
  9. * "rowKey": "00000-0",
  10. * "family:qualifier": "value",
  11. * "family:qualifier": "value",
  12. * ......
  13. * }
  14. *
  15. * @param data
  16. * @return
  17. */
  18. def insert(data: (String, String)): Boolean = {
  19. val t: HTable = getTable(data._1) //HTable
  20. try {
  21. val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2)
  22. val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey
  23. val put = new Put(rowKey)
  24. for ((k, v) <- map) {
  25. val keys: Array[String] = k.split(":")
  26. if (keys.length == 2){
  27. put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes)
  28. }
  29. }
  30. Try(t.put(put)).getOrElse(t.close())
  31. true
  32. } catch {
  33. case e: Exception =>
  34. e.printStackTrace()
  35. false
  36. }
  37. }

2.6 运行并查看结果

运行命令:

/opt/cloudera/parcels/CDH/bin/spark-submit --master yarn-client --class com.xiaoxiaomo.streaming.KafkaDataStream hspark-1.0.jar 1 3 1000

运行后可以去spark UI中去查看相关运行情况,UI中具体细节见下文。

图片描述
Streaming Statistics数据统计图

图片描述
Completed Batches

三、对Streaming监控的介绍以及解决实际问题

这部分主要在代码运行起来的情况下来看一下任务的运行情况主要是streaming的监控界面,以及我们怎么去通过监控界面发现问题和解决问题。

3.1 监控

官网中指出,spark中专门为SparkStreaming程序的监控设置了额外的途径,当使用StreamingContext时,在WEB UI中会出现一个”Streaming”的选项卡:

图片描述
WEB UI中的“Streaming”选项卡

在此选项卡内,统计的内容展示如下:

图片描述
Streaming 状态图

Spark streaming 处理速度为3s一次,每次1000条。

Kafka product 每秒1000条数据,与上面spark consumer消费者恰好相等。结果:数据量大导致积压,这个过程中active Batches会越变越大。

因为忽略了实际的Processing time:

图片描述
Active Batches

图片描述
Completed Batches

图片描述
Streaming Batches对应的趋势图

这其中包括接受的记录数量,每一个batch内处理的记录数,处理时间,以及总共消耗的时间。在上述参数之中最重要的两个参数分别是Porcessing Time 以及 Scheduling Delay:

  • Porcessing Time用来统计每个batch内处理数据所消费的时间
  • Scheduling Delay用来统计在等待被处理所消费的时间

如果PT比SD大,或者SD持续上升,这就表明此系统不能对产生的数据实时响应,换句话来说就是,出现了处理时延,每个batch time 内的处理速度小于数据的产生速度。 
在这种情况下,读者需要想法减少数据的处理速度,即需要提升处理效率。

3.2 问题发现

在我做压测的时候, Spark streaming 处理速度为3s一次,每次1000条。

Kafka product每秒1000条数据, 与上面spark consumer消费者恰好相等。于是就会数据量大导致积压,这个过程中active Batches会越变越大。最后发现了一个问题:

图片描述
Streaming Batches对应的趋势图

当压测峰值过后Input Size=0 events,时间仍然不减,奇怪!

图片描述
Streaming Batches一些异常情况图

查看摸个具体stage:

图片描述
Streaming具体的stage信息

从图中, 我们可以看到Spark总共调度分发了两批次task set, 每个task set的处理(含序列化和压缩之类的工作)都不超过100毫秒,那么该Stage何来消耗4s呢?慢着,貌似这两批次的task set分发的时间相隔得有点长啊,隔了4秒左右。为什么会隔这么就才调度一次呢?

此处要引入一个配置项“spark.locality.wait”(默认等待3s),它配置了本地化调度降级所需要的时间。这里概要补充下Spark本地化调度的知识,Spark的task一般都会分发到它所需数据的那个节点,这称之为”NODE_LOCAL”,但在资源不足的情况下,数据所在节点未必有资源处理task,因此Spark在等待了“spark.locality.wait”所配置的时间长度后,会退而求其次,分发到数据所在节点的同一个机架的其它节点上,这是“RACK_LOCAL”。当然,也有更惨的,就是再等了一段“spark.locality.wait”的时间长度后,干脆随便找一台机器去跑task,这就是“ANY”策略了。

图片描述
Streaming 源码

官网解释:How long to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.点击查看来源。

而从上例看到, 即使用最差的”ANY”策略进行调度,task set的处理也只是花了100毫秒,因此,没必要非得为了”NODE_LOCAL”策略的生效而去等待那么长的时间,特别是在流计算这种场景上。所以把“spark.locality.wait”果断调小,从1秒到500毫秒,最后干脆调到100毫秒算了。

spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark-1.0.jar

调了之后的处理时间为0.7s:

图片描述
Streaming Completed Batches正常

具体耗时如下:

图片描述
Streaming 具体耗时信息图

四、对项目做压测与相关的优化

对项目做压测与相关的优化,主要从内存(executor-memory和driver-memory)、num-executors、executor-cores,以及代码层面做一些测试和改造。

4.1 压测

spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.xiaoxiaomo.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark.jar 3 1000

Spark streaming 处理速度为3s一次,每次1000条。

Kafka product 每秒1000条数据, 与上面spark consumer消费者恰好相等。结果:数据量大导致积压,这个过程中active Batches会越变越大。

调整Kafka product 每秒600条数据,存在积压,但已经不严重:

图片描述
Kafka product 每秒600条数据,存在积压

调整Kafka product 每秒500条数据,为消费者50%,测试结果显示正常,等待时间很稳定:

图片描述
Kafka product 每秒500条数据,正常

但是。此时每秒吞吐量为500显然不够,通过调整间歇实际等,发现并没有变化:

spark-submit –master yarn-client –conf spark.driver.memory=256m –class com.xiaoxiaomo.KafkaDataStream –num-executors 1 –executor-memory 256m –executor-cores 2 –conf spark.locality.wait=100ms hspark.jar 2 2000 Spark streaming 处理速度为2s一次,每次2000

Kafka product 每秒500条数据,可以看见没有在指定时间内消费完数据,照成数据积压,并发下降了。

图片描述
Kafka product 每秒500条数据,没有在指定时间内消费完

4.2 分析原因

分析原因,发现大部分耗时都在处理数据这样一阶段,如下图所示:

图片描述
Streaming 时间分析图

4.3 调整参数

调整 executor-cores:

  • executor-cores 2 并发上升至700/s
  • executor-cores 3 并发上升至750/s

图片描述
调整executor-cores后

调整executor内存,并发没有增长,无效:

  • executor-memory 512m
  • conf spark.yarn.executor.memoryOverhead=512

调整am内存,并发没有增长,无效:

  • am-memory 512m
  • conf spark.yarn.am.memoryOverhead=512

4.4 代码调整

发现现在主要还是在处理数据的时候消耗时间一直没有减少,而处理数据查看后发现是一条一条的往HBase里面插入的,修改为批量插入,重新构建了json.性能猛增!!修改前的代码:

  1. /**
  2. *
  3. * 插入数据到 HBase
  4. *
  5. * 参数( tableName , json ) ):
  6. *
  7. * Json格式:
  8. * {
  9. * "rowKey": "00000-0",
  10. * "family:qualifier": "value",
  11. * "family:qualifier": "value",
  12. * ......
  13. * }
  14. *
  15. * @param data
  16. * @return
  17. */
  18. def insert(data: (String, String)): Boolean = {
  19. val t: HTable = getTable(data._1) //HTable
  20. try {
  21. val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2)
  22. val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey
  23. val put = new Put(rowKey)
  24. for ((k, v) <- map) {
  25. val keys: Array[String] = k.split(":")
  26. if (keys.length == 2){
  27. put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes)
  28. }
  29. }
  30. Try(t.put(put)).getOrElse(t.close())
  31. true
  32. } catch {
  33. case e: Exception =>
  34. e.printStackTrace()
  35. false
  36. }
  37. }

修改后的代码:

  1. //数据操作
  2. messages.foreachRDD(rdd => {
  3. val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  4. //data 处理
  5. rdd.foreachPartition(partitionRecords => {
  6. //TaskContext 上下文
  7. val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
  8. logger.debug(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")
  9. //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
  10. val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
  11. val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是
  12. if (either.isLeft) {
  13. logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}")
  14. }
  15. /** 解析PartitionRecords数据 */
  16. if (offsetRange.topic != null) {
  17. HBaseDao.insert(offsetRange.topic, partitionRecords)
  18. }
  19. })
  20. })

插入数据到HBase:

  1. /**
  2. *
  3. * 插入数据到 HBase
  4. *
  5. * 参数( tableName , [( tableName , json )] ):
  6. *
  7. * Json格式:
  8. * {
  9. * "r": "00000-0",
  10. * "f": "family",
  11. * "q": [
  12. * "qualifier",
  13. * "qualifier"
  14. * ...
  15. * ],
  16. * "v": [
  17. * "value",
  18. * "value"
  19. * ...
  20. * ],
  21. * }
  22. *
  23. * @return
  24. */
  25. def insert(tableName: String, array: Iterator[(String, String)]): Boolean = {
  26. try {
  27. /** 操作数据表 && 操作索引表 */
  28. val t: HTable = getTable(tableName) //HTable
  29. val puts: util.ArrayList[Put] = new util.ArrayList[Put]()
  30. /** 遍历Json数组 */
  31. array.foreach(json => {
  32. val jsonObj: JSONObject = JSON.parseObject(json._2)
  33. val rowKey: Array[Byte] = jsonObj.getString("r").getBytes
  34. val family: Array[Byte] = jsonObj.getString("f").getBytes
  35. val qualifiers: JSONArray = jsonObj.getJSONArray("q")
  36. val values: JSONArray = jsonObj.getJSONArray("v")
  37. val put = new Put(rowKey)
  38. for (i <- 0 until qualifiers.size()) {
  39. put.addColumn(family, qualifiers.getString(i).getBytes, values.getString(i).getBytes)
  40. }
  41. puts.add(put)
  42. })
  43. Try(t.put(puts)).getOrElse(t.close())
  44. true
  45. } catch {
  46. case e: Exception =>
  47. e.printStackTrace()
  48. logger.error(s"insert ${tableName} error ", e)
  49. false
  50. }
  51. }

4.5 运行

刚测试时给它相对很小的内存跑一跑:

  1. [root@xiaoxiaomo.com ~]# /opt/cloudera/parcels/CDH/bin/spark-submit \
  2. --master yarn-client --num-executors 1 \
  3. --driver-memory 256m --conf spark.yarn.driver.memoryOverhead=256 \
  4. --conf spark.yarn.am.memory=256m --conf spark.yarn.am.memoryOverhead=256 \
  5. --executor-memory 256m --conf spark.yarn.executor.memoryOverhead=256 \
  6. --executor-cores 1 \
  7. --class com.creditease.streaming.KafkaDataStream hspark-1.0.jar 1 3 30000

五六万的插入没什么压力,但是到10万的时候,就有些卡顿了!!

图片描述
yarn 容器、cpu、内存大小

图片描述
五六万的插入没什么压力

当然是需要增大内存的,修改配置,都增加一倍:

  1. [root@xiaoxiaomo.com ~]# /opt/cloudera/parcels/CDH/bin/spark-submit \
  2. --master yarn-client --num-executors 2 \
  3. --driver-memory 512m --conf spark.yarn.driver.memoryOverhead=512 \
  4. --conf spark.yarn.am.memory=512m --conf spark.yarn.am.memoryOverhead=512 \
  5. --executor-memory 512m --conf spark.yarn.executor.memoryOverhead=512 \
  6. --executor-cores 1 \
  7. --class com.creditease.streaming.KafkaDataStream hspark-1.0.jar 1 3 30000

图片描述
yarn 容器、cpu、内存大小

图片描述
90000的插入没什么压力

查看插入数据量,能看到修改后插入数据10万是没有什么压力的:

图片描述
查看插入数据量,能看到修改后插入数据10万是没有什么压力的

当我们再继续加大压力测试的时候,性能下降:

图片描述
当我们再继续加大压力测试的时候,性能下降

查看统计信息:

图片描述

查看统计信息

五、Streaming持续优化之HBase

5.1 设置WALog

put.setDurability(Durability.SKIP_WAL)/* 跳过写WALog /

关闭WALog后写入能到20万,但是发现还是不是特别稳定,有时耗时还是比较长的,发现此阶段正在做Compaction!!!

图片描述
查看streaming统计,发现耗时不稳定

图片描述
HBase界面统计信息

HBase是一种Log-Structured Merge Tree架构模式,用户数据写入先写WAL,再写缓存,满足一定条件后缓存数据会执行flush操作真正落盘,形成一个数据文件HFile。随着数据写入不断增多,flush次数也会不断增多,进而HFile数据文件就会越来越多。然而,太多数据文件会导致数据查询IO次数增多,因此HBase尝试着不断对这些文件进行合并,这个合并过程称为Compaction。

Compaction会从一个region的一个store中选择一些hfile文件进行合并。合并说来原理很简单,先从这些待合并的数据文件中读出KeyValues,再按照由小到大排列后写入一个新的文件中。之后,这个新生成的文件就会取代之前待合并的所有文件对外提供服务。

HBase根据合并规模将Compaction分为了两类:MinorCompaction和MajorCompaction。

  • Minor Compaction是指选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,在这个过程中不会处理已经Deleted或Expired的Cell。一次Minor Compaction的结果是更少并且更大的StoreFile。
  • Major Compaction是指将所有的StoreFile合并成一个StoreFile,这个过程还会清理三类无意义数据:被删除的数据、TTL过期数据、版本号超过设定版本号的数据。另外,一般情况下,Major Compaction时间会持续比较长,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会将关闭自动触发Major Compaction功能,改为手动在业务低峰期触发。

更多Compaction信息参考:http://hbasefly.com/2016/07/13/hbase-compaction-1/

5.2 调整压缩

通常生产环境会关闭自动major_compact(配置文件中hbase.hregion.majorcompaction设 为0),选择一个晚上用户少的时间窗口手工major_compact。

手动:major_compact ‘testtable’ 
如果hbase更新不是太频繁,可以一个星期对所有表做一次 major_compact,这个可以在做完一次major_compact后,观看所有的storefile数量,如果storefile数量增加到 major_compact后的storefile的近二倍时,可以对所有表做一次major_compact,时间比较长,操作尽量避免高锋期。 

图片描述
查看统计信息

Compact触发条件:

  • memstore flush之后触发
  • 客户端通过shell或者API触发
  • 后台线程CompactionChecker定期触发

图片描述
查看统计信息

图片描述
查看统计信息

周期为:Hbase.server.thread.wakefrequencyhbase.server.compactchecker.interval.multiplier触发compaction,后面还有一些其他的条件也可以在源码里面看看

条件的验证逻辑就是在这个时间范围:mcTime= 7-70.5天,7+70.5天=3.5-10.5;

是否有文件修改具体逻辑可见RatioBasedCompactionPolicy#isMajorCompaction方法

5.3 Split

通过上面的截图我们可以看到,该表只有一个region,写入数据都集中到了一台服务器,这个远远没有发挥出HBase集群的能力呀,手动拆分吧!

图片描述
通过hbase ui界面拆分Region

拆分后:

图片描述
Region拆分后

六、管理Streaming任务

这是Spark Streaming系列博客的最后一部分,主要讲一下我自己对Spark Streaming任务的一些划分,还有一个Spark Streaming任务的邮件监控。

6.1 Streaming 任务的划分

当Spark Streaming开发完成,测试完成之后,就发布上线了,Spark Streaming任务的划分,以及时间窗口调试多少这些都是更具业务划分的。

  • kafka 一个topic对应HBase里面的一张表
  • Kafka topic 里面的partition(3-5个不等)
  • Streaming job即Kafka消费者,消费一个或多个Kafka topic

那一个Streaming消费者到底去对应哪些topic呢?还有为什么这么划分,以及这样划分有什么好处呢?

  • 因为kafka topic对应了业务中的具体HBase表,然后就通过监控HBase表插入流量来判断该表插入情况
  • 对于HBase表数据的插入量划分了5种,插入量特别大、插入条数多每条数据量不大、每次插入数据量少数据大、比较均匀、插入少不频繁
  • 对于插入量特别大,比如该表都占了插入总量的10%、20%的这种就独立出来一张表对应一个streaming消费者
  • 插入条数多每条数据量不大,就是把插入比较频繁的可以放在一起,这时候可以调小timeWindow
  • 每次插入数据量少数据大,就是可以看见插入每次都是1000条,2000条,有些时间间隔,就可以调大timeWindow时间间隔,maxRatePerPartition设置大一点
  • 比较均匀就好办了,很好设置参数
  • 插入少不频繁,可以调大timeWindow到几秒,甚至太少,太不频繁可以继续调大
  • 好处大家应该也看出来了吧,资源的合理利用,对streaming的优化,timeWindow、maxRatePerPartition对应不同表,增加和控制了并发量

6.2 Streaming任务的监控

对于Spark Streaming job的监控,自带的Streaming UI能看到具体的一些流量,时间等信息,但是缺少了一个通知,于是简单的开发了一个。在监控这一块也想了不少方案,比如监控pid,通过shell去监控,或者直接调用源码里面的方法,都尝试过,有的要么没达到预期的效果,要么有的不是很好维护开发成本高。

最终选了一个比较简单的,但是又能达到一定效果的,通过py爬虫,到原始的streaming UI界面去获取到具体的信息,来监控,到达阈值就发送邮件,总体步骤如下:

  • 通过job name在yarn 8088界面/cluster/apps/RUNNING找到ApplicationMasterURL地址
  • 然后通过该地址到streaming界面监控具体Streaming job的Scheduling Delay、Processing Time值

图片描述
yarn 8088界面/cluster/apps/RUNNING

具体代码:

图片描述
Python 监控爬虫 邮件通知

参考:http://www.th7.cn/Program/Python/201612/1035126.shtml



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/975685
推荐阅读
相关标签
  

闽ICP备14008679号