当前位置:   article > 正文

SparkStreaming的实战案例

sparkstreaming 案例

废话不多说,直接上干货!!!
相关依赖

  1. <properties>
  2. <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
  3. <maven.compiler.source>1.8</maven.compiler.source>
  4. <maven.compiler.target>1.8</maven.compiler.target>
  5. <encoding>UTF-8</encoding>
  6. <scala.version>2.11.8</scala.version>
  7. <spark.version>2.3.2</spark.version>
  8. <hadoop.version>2.7.6</hadoop.version>
  9. <scala.compat.version>2.11</scala.compat.version>
  10. </properties>
  11. <dependencies>
  12. <dependency>
  13. <groupId>org.scala-lang</groupId>
  14. <artifactId>scala-library</artifactId>
  15. <version>${scala.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.spark</groupId>
  19. <artifactId>spark-core_2.11</artifactId>
  20. <version>${spark.version}</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.spark</groupId>
  24. <artifactId>spark-sql_2.11</artifactId>
  25. <version>${spark.version}</version>
  26. </dependency>
  27. <!-- sparkStreaming -->
  28. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
  29. <dependency>
  30. <groupId>org.apache.spark</groupId>
  31. <artifactId>spark-streaming_2.11</artifactId>
  32. <version>${spark.version}</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.apache.spark</groupId>
  36. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  37. <version>${spark.version}</version>
  38. </dependency>
  39. <!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
  40. <dependency>
  41. <groupId>org.scalikejdbc</groupId>
  42. <artifactId>scalikejdbc_2.11</artifactId>
  43. <version>3.2.0</version>
  44. </dependency>
  45. <dependency>
  46. <groupId>org.apache.curator</groupId>
  47. <artifactId>curator-recipes</artifactId>
  48. <version>2.8.0</version>
  49. </dependency>
  50. <dependency>
  51. <groupId>junit</groupId>
  52. <artifactId>junit</artifactId>
  53. <version>4.12</version>
  54. <scope>compile</scope>
  55. </dependency>
  56. </dependencies>

(1)spark streaminging无状态计算的WordCount

编程架构
SparkStreaming的实战案例
在某个节点上中启动nc -lk 9999,然后用作数据源。编写程序实现网络的wordcount。
代码实现

  1. object NetWordCount {
  2. /**
  3. * 编程套路:
  4. * 1.获取编程入口,StreamingContext
  5. * 2.通过StreamingContext构建第一个DStream
  6. * 3.对DStream进行各种的transformation操作
  7. * 4.对于数据结果进行output操作
  8. * 5.提交sparkStreaming应用程序
  9. */
  10. def main(args: Array[String]): Unit = {
  11. //屏蔽日志
  12. Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
  13. Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
  14. Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
  15. //1.获取编程入口,StreamingContext
  16. val conf= new SparkConf().setMaster("local[2]")
  17. .setAppName("NetWordCount")
  18. //第二个参数,表示批处理时长
  19. val ssc=new StreamingContext(conf,Seconds(2))
  20. /**
  21. * 2.通过StreamingContext构建第一个DStream(通过网络去读数据)
  22. * 第一个参数:主机名
  23. * 第二个参数:端口号
  24. */
  25. val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test",9999)
  26. //3.对DStream进行各种的transformation操作
  27. val wordDS: DStream[String] = ReceiverInputDStream.flatMap(msg => {
  28. msg.split("\\s+")
  29. })
  30. val wordCountDS: DStream[(String, Int)] = wordDS.map(word=>(word,1)).reduceByKey(_+_)
  31. //4.对于数据结果进行output操作,这里是打印输出
  32. wordCountDS.print()
  33. //5.提交sparkStreaming应用程序
  34. ssc.start()
  35. ssc.awaitTermination()
  36. }
  37. }

使用nc -lk 9999在相应的节点上发出消息(每隔一个批处理时间发送一次),查看控制台打印:
batch1
SparkStreaming的实战案例
batch2
SparkStreaming的实战案例
结果发现:由于现在的操作时无状态的,所以每隔2s处理一次,但是每次的单词数不会统计,也就是说,只会统计当前批处理的单词,之前输入的则不会统计。


(2)spark streaminging有状态计算的WordCount

同样是wordCounte,这次要实现的效果是:到现在为止,统计过去时间段内的所有单词的个数。
代码

  1. object UpdateStateBykeyWordCount {
  2. /**
  3. * 编程套路:
  4. * 1.获取编程入口,StreamingContext
  5. * 2.通过StreamingContext构建第一个DStream
  6. * 3.对DStream进行各种的transformation操作
  7. * 4.对于数据结果进行output操作
  8. * 5.提交sparkStreaming应用程序
  9. */
  10. def main(args: Array[String]): Unit = {
  11. //屏蔽日志
  12. Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
  13. Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
  14. Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
  15. //1.获取编程入口,StreamingContext
  16. val conf = new SparkConf().setMaster("local[2]")
  17. .setAppName("NetWordCount")
  18. //第二个参数,表示批处理时长
  19. val ssc = new StreamingContext(conf, Seconds(2))
  20. ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
  21. /**
  22. * 2.通过StreamingContext构建第一个DStream(通过网络去读数据)
  23. * 第一个参数:主机名
  24. * 第二个参数:端口号
  25. */
  26. val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test", 9999)
  27. //3.对DStream进行各种的transformation操作
  28. val wordDS: DStream[(String,Int)] = ReceiverInputDStream.flatMap(msg => {
  29. msg.split("\\s+")
  30. }).map(word=>(word,1))
  31. /**
  32. * updateStateByKey是状态更新函数,
  33. * updateFunc: (Seq[V], Option[S]) => Option[S]
  34. * (U,C)=>C
  35. * values:Seq[Int],state:Option[Int]==>Option[Int]
  36. *
  37. * @param values :新值
  38. * @param state :状态值
  39. * @return
  40. */
  41. val updateDS: DStream[(String, Int)] = wordDS.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
  42. Option(values.sum + state.getOrElse(0))
  43. })
  44. //4.对于数据结果进行output操作,这里是打印输出
  45. updateDS.print()
  46. //5.提交sparkStreaming应用程序
  47. ssc.start()
  48. ssc.awaitTermination()
  49. }
  50. }

使用 nc -kl 9999
SparkStreaming的实战案例
观察控制台:
batch1
SparkStreaming的实战案例
batch2
SparkStreaming的实战案例
发现:两次批处理的结果,进行了聚合,也就是所谓的有状态的计算。
注意
ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
上面这句代码一定要加,他会将上一次的批处理计算的结果保存起来,如果不加:
错误:requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().


(2)spark streaminging的HA

   在上述的updateStateByKey代码中如果当前程序运行异常时,会丢失数据(重启之后,找不回原来计算的数据),因为编程入口StreamingContext在代码重新运行的时候,是重新生成的,为了使程序在异常退出的时候,在下次启动的时候,依然可以获得上一次的StreamingContext对象,保证计算数据不丢失,此时就需要将StreamingContext对象存储在持久化的系统中。也就是说需要制作StreamingContext对象的HA。
代码

  1. object WC_DriverHA {
  2. def main(args: Array[String]): Unit = {
  3. //屏蔽日志
  4. Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
  5. Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
  6. Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
  7. /**
  8. * StreamingContext.getOrCreate()
  9. * 第一个参数:checkpointPath,和下面方法中的checkpointPath目录一致
  10. * 第二个参数:creatingFunc: () => StreamingContext:用于创建StreamingContext对象
  11. * 最终使用StreamingContext.getOrCreate()可以实现StreamingContext对象的HA,保证在程序重新运行的时候,之前状态仍然可以恢复
  12. */
  13. val ssc= StreamingContext.getActiveOrCreate("C:\\z_data\\checkPoint\\checkPoint_HA",functionToCreateContext)
  14. ssc.start()
  15. ssc.awaitTermination()
  16. }
  17. def functionToCreateContext():StreamingContext={
  18. //1.获取编程入口,StreamingContext
  19. val conf = new SparkConf().setMaster("local[2]")
  20. .setAppName("NetWordCount")
  21. //第二个参数,表示批处理时长
  22. val ssc = new StreamingContext(conf, Seconds(2))
  23. ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_HA")
  24. /**
  25. * 2.通过StreamingContext构建第一个DStream(通过网络去读数据)
  26. * 第一个参数:主机名
  27. * 第二个参数:端口号
  28. */
  29. val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test", 9999)
  30. //3.对DStream进行各种的transformation操作
  31. val wordDS: DStream[(String,Int)] = ReceiverInputDStream.flatMap(msg => {
  32. msg.split("\\s+")
  33. }).map(word=>(word,1))
  34. val updateDS: DStream[(String, Int)] = wordDS.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
  35. Option(values.sum + state.getOrElse(0))
  36. })
  37. //4.对于数据结果进行output操作,这里是打印输出
  38. updateDS.print()
  39. //5.提交sparkStreaming应用程序
  40. ssc.start()
  41. ssc.awaitTermination()
  42. ssc
  43. }
  44. }

测试
   - 先正常运行一段时间,计算出结果
   - 停止程序
   - 再次启动
   - 验证再次启动的程序,是否能够拿回停止前计算得到的结果
原理
  如果是第一次执行,那么在在这个checkpointDriectory目录中是不存在streamingContext对象的,所以要创建,第二次运行的时候,就不会在创建,则是从checkpointDriectory目录中读取进行恢复。
注意
  正常情况下,使用这种方式的HA,只能持久状态数据到持久化的文件中,默认情况是不会持久化StreamingContext对象到CheckPointDriectory中的。


(3)对checkpoint的总结:

 1)checkpoint的介绍:

  从故障中恢复checkpoint中有两种类型
   - Metadata checkpointing:driver节点中的元数据信息
     - Configuration:用于创建流式应用程序的配置
     - DStream:定义streaming程序的DStream操作
     - Incomplete batches:批量的job排队但尚未完成。(程序上次运行到的位置)
   - Data checkpointing:将生成的RDD保存到可靠的存储
     - 计算之后生成的RDD
     - 在receiver接收到数据,转化的RDD

 2)checkpoint的启动时机:

   - 从运行应用程序的driver的故障中恢复-元数据,(driver的HA)
   - 使用有状态计算的时候启动checkPoint:updateStateByKey或者reduceByKeyAndWindow…

 3)checkpoint的配置:

   - 有状态计算的时候:
    ssc.checkpoint("C:\\z_data\\checkpoint")
   - driver的HA的时候:

  1. ssc.checkpoint("C:\\z_data\\checkpoint")
  2. ssc =StreamingContext.getOrCreate("C:\\z_data\\checkpoint"
  3. ,functionToCreateContext)

(4)Spark Streaming 的 transform 操作

  在使用transform操作的时候介绍两个重要的概念:
  黑名单:如果允许的操作比不允许的操作多,那么将不允许的操作加入黑名单
  白名单:如果允许的操作比不允许的操作少,那么将允许的操作加入白名单
代码

  1. object _1Streaming_tranform {
  2. def main(args: Array[String]): Unit = {
  3. //定义黑名单
  4. val black_list=List("@","#","$","%")
  5. Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
  6. Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
  7. Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
  8. //1.获取编程入口,StreamingContext
  9. val conf = new SparkConf().setMaster("local[2]").setAppName("_1Streaming_tranform")
  10. val ssc=new StreamingContext(conf,Seconds(2))
  11. //2.从对应的网络端口读取数据
  12. val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test",9999)
  13. //2.1将黑名单广播出去
  14. val bc = ssc.sparkContext.broadcast(black_list)
  15. //2.2设置checkpoint
  16. ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
  17. //3业务处理
  18. val wordDStream: DStream[String] = inputDStream.flatMap(_.split(" "))
  19. //transform方法表示:从DStream拿出来一个RDD,经过transformsFunc计算之后,返回一个新的RDD
  20. val fileterdDStream: DStream[String] = wordDStream.transform(rdd=>{
  21. //过滤掉黑名单中的数据
  22. val blackList: List[String] = bc.value
  23. rdd.filter(word=>{
  24. !blackList.contains(word)
  25. })
  26. })
  27. //3.2统计相应的单词数
  28. val resultDStream = fileterdDStream.map(msg => (msg, 1))
  29. .updateStateByKey((values: Seq[Int], stats: Option[Int]) => {
  30. Option(values.sum + stats.getOrElse(0))
  31. })
  32. //4打印output
  33. resultDStream.print()
  34. //5.开启streaming流
  35. ssc.start()
  36. ssc.awaitTermination()
  37. }
  38. }

黑名单中的数据会被过滤:
SparkStreaming的实战案例


(5)Spark Streaming 的 window 操作

SparkStreaming的实战案例
注意
在做window操作时:
  - 窗口覆盖的数据流的时间长度,必须是批处理时间间隔的倍数
  - 前一个窗口到后一个窗口所经过的时间长度,必须是批处理时间间隔的倍数。
伪代码

  1. //1.获取编程入口,StreamingContext
  2. val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount_Window")
  3. val ssc=new StreamingContext(conf,Seconds(batchInvail.toLong))
  4. //2.从对应的网络端口读取数据
  5. val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(hostname,port.toInt)
  6. val lineDStream: DStream[String] = inputDStream.flatMap(_.split(" "))
  7. val wordDStream: DStream[(String, Int)] = lineDStream.map((_,1))
  8. /**
  9. * 每隔4秒,算过去6秒的数据
  10. * reduceFunc:数据合并的函数
  11. * windowDuration:窗口的大小(过去6秒的数据)
  12. * slideDuration:窗口滑动的时间(每隔4秒)
  13. */
  14. val resultDStream: DStream[(String, Int)] = wordDStream.reduceByKeyAndWindow((kv1:Int, kv2:Int)=>kv1+kv2,
  15. Seconds(batchInvail.toLong * 3),
  16. Seconds(batchInvail.toLong * 2))
  17. resultDStream.print()
  18. ssc.start()
  19. ssc.awaitTermination()

(6)Spark Streaming 的ForeachRDD 操作

概念

  • foreach: 遍历一个分布式集合(rdd)中的每一个元素
  • foreachPartition:遍历一个分布式集合(rdd)中的每一个分区
  • foreachRDD:遍历一个分布式集合(DStream)中的每一个RDD
    这个算子用的好,通常程序的性能会提升很多。
    伪代码
    1. //这个方法表示遍历DStream中的每一个rdd
    2. windowDS.foreachRDD(rdd=>{
    3. if(!rdd.isEmpty()){
    4. rdd.mapPartitions(ptn=>{
    5. if(!ptn.isEmpty){
    6. ptn.foreach(msg=>{
    7. //在这里做相应的操作
    8. })
    9. }
    10. })
    11. }
    12. })

转载于:https://blog.51cto.com/14048416/2339589

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/975643
推荐阅读
相关标签
  

闽ICP备14008679号