当前位置:   article > 正文

spark学习之SparkStreaming_spark任务执行进展日志隔几秒更新一次?

spark任务执行进展日志隔几秒更新一次?

SparkStreaming

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。

SparkStreaming概述

数据处理延迟方式
	实时:数据处理在毫秒级别,秒
	离线:数据处理延迟以小时,天为单位
数据处理的方式
	流式:一个一个数据进行处理
	批处理:一批一批数据进行处理
	
SparkCore:		 离线数据分析框架|批处理
SparkStreaming:	基于SparkCore来完成实时数据处理分析(执行场景在离线批处理和实时流式之间,可以称为准实				    时,微批次数据处理框架)
	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在这里插入图片描述

SparkStreaming架构

在这里插入图片描述

背压机制

调整数据采集能力与数据消费能力

Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。
  • 1
  • 2
  • 3
  • 4

Dstream入门

WordCount案例实操

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

添加依赖关系

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

idea代码

package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount02 {
  def main(args: Array[String]): Unit = {
    //TODO SparkStreaming流式数据处理

    //TODO 建立环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    //SparkStreaming环境对象的第二个参数表示数据采集周期
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行操作
    //创建采集器(一行一行)
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999) //监听本机的9999端口
    //拿到一个一个的单词
    val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))

    val wordToCountDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_+_)
    wordToCountDS.print()

    //启动采集器
    ssc.start()
    //等待结束
    ssc.awaitTermination()
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

在hadoop102启动nc服务,然后启动idea

# nc -lk 9999
# nc -lp 9999
[atguigu@hadoop102 ~]$ nc -lk 9999
pihao
hello 
world
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

Dstream创建

RDD队列

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

案例实操

循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount

object RDDStream {

  def main(args: Array[String]) {

    //1.初始化Spark配置信息
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")

    //2.初始化SparkStreamingContext
    val ssc = new StreamingContext(conf, Seconds(4))

    //3.创建RDD队列
    val rddQueue = new mutable.Queue[RDD[Int]]()

    //4.创建QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)

    //5.处理队列中的RDD数据
    val mappedStream = inputStream.map((_,1))
    val reducedStream = mappedStream.reduceByKey(_ + _)

    //6.打印结果
    reducedStream.print()

    //7.启动任务
    ssc.start()

//8.循环创建并向RDD队列中放入RDD
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }

    ssc.awaitTermination()

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

自定义数据源

自定义一个数据源,获取其中的数据

package com.pihao.streaming

import java.util.concurrent.TimeUnit

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver

import scala.util.Random

object SparkStreaming_DIY {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //执行操作
    val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver()) //使用自己定义的接收器
    ds.print()

    ssc.start()
    ssc.awaitTermination()
  }

  /**
   * 自定义数据采集器
   * 1.继承Receiver,定义泛型,并指定存储级别
   * 2.重写方法
   *    onStart
   *    onStop
   */
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
    private var flag = true

    //开启接受
    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {
          while(flag){
            val i: Int = new Random().nextInt(100)
            store(i+"")
            TimeUnit.NANOSECONDS.sleep(500)
          }
        }
      }).start()
    }
    //关闭接收
    override def onStop(): Unit = {
      flag = false
    }
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

Kafka数据源

版本选型
1.ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用

2.DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。(采集和计算都在一个节点,容易控制)
  • 1
  • 2
  • 3
Kafka 0-10 Direct 模式

需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做计算,最终打印到控制台。

提前创建kafka主题

启动zookeeper集群
启动kafka集群

# 创建sparkstreaming的topic
[atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic sparkstreaming --partitions 3 --replication-factor 2

[atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
__consumer_offsets
sparkstreaming
[atguigu@hadoop102 ~]$ 

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

添加依赖

<dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
     <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.10.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

编写代码

package com.pihao.streaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object KafkaStream {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行操作
    //定义kafka的连接配置
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "spark",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    //Kafka专门用于实时数据生成,提供了很多封装类
    //ConsumerRecord是一个KV
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
    KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个broker
      ConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara))

    //获取ConsumerRecord中的数据
    val kafkaData: DStream[String] = kafkaDStream.map(_.value())
    kafkaData.print()

    ssc.start()
    ssc.awaitTermination()
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

在这里插入图片描述

接收成功

Dstream转换

DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

无状态化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。

在这里插入图片描述

Transform

Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次(周期性执行)。其实也就是对DStream中的RDD应用转换。

object Transform {

  def main(args: Array[String]): Unit = {

    //创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    //创建DStream
    val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("host", 9999)

    //在某些情况,DS对象不能执行所有的操作
    //如果想要进行特殊操作,那么可以直接通过底层的RDD进行
    val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {

      val words: RDD[String] = rdd.flatMap(_.split(" "))

      val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

      val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

      value
    })

    //打印
    wordAndCountDStream.print

    //启动
    ssc.start()
    ssc.awaitTermination()

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

join

两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object JoinTest {

  def main(args: Array[String]): Unit = {

    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JoinTest")

    //2.创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    //3.从端口获取数据创建流
    val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
    val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888)

    //4.将两个流转换为KV类型
    val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
    val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))

    //5.流的JOIN
    val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)

    //6.打印
    joinDStream.print()

    //7.启动任务
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

有状态转化操作

有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。需要保存之前每次数据的聚合操作

UpdateStateByKey
package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming_State {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //检查点应该设置在hdfs中比较稳妥
    ssc.checkpoint("cp")
    //TODO 执行操作
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)
    val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
    val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))

    //reduceByKey方法属于无状态操作的方法
    //updateStateByKey方法属于有状态操作的方法
    //方法参数中的第一个参数,表示相同key的value序列
    //方法参数中的第二个参数,表示相同key缓冲区的数据值
    val state: DStream[(String, Int)] = wordToOneDS.updateStateByKey(
      (seq: Seq[Int], buffer: Option[Int]) => {
        val newValue: Int = seq.sum + buffer.getOrElse(0)
        Option(newValue)
      }
    )
    state.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
WindowOperations

所谓的窗口操作,其实就是将多个采集周期的数据一次性进行处理,而不是一个采集周期处理一次。

SparkStreaming总的窗口范围不能切断采集周期,可以是采集周期的整数倍

SparkStreaming总的窗口滑动幅度也应该是采集周期的整数倍

package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object SparkStreaming_window{
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行操作
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)
    val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
    val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))

    //window方法可以传递一个参数,表示窗口的范围(时间周期,应该为采集周期的整数倍)
    //window方法可以传递两个参数,
      //第一个表示窗口的范围(时间周期,应该为采集周期的整数倍)
      //第二个表示窗口滑动的步长(时间周期,也是采集周期的整数倍,不传默认是一个一个采集周期)
      //窗口计算的时间以窗口步长做基础的
    val windowDS: DStream[(String, Int)] = wordToOneDS.window(Seconds(6),Seconds(3))
    val resultDS: DStream[(String, Int)] = windowDS.reduceByKey(_+_)
    resultDS.print()


    ssc.start()
    ssc.awaitTermination()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
增量计算

增量计算 = 当前窗口的值+ 新进入到窗口的数据-排除掉窗口的数据

使用场合:适合在窗口范围大,滑动浮动小的场合,这样就会有大量的重复数据

package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object Sparkstream_window2 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    ssc.checkpoint("cp")
    //TODO 执行操作
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)
    val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
    val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))

    //增量计算
    //需要设定检查点路径
    val windowDS: DStream[(String, Int)] = wordToOneDS.reduceByKeyAndWindow(
      (x: Int, y: Int) => {
        x + y
      },
      (x: Int, y: Int) => {
        x - y
      },
      Seconds(6),
      Seconds(3)
    )
    windowDS.print()


    ssc.start()
    ssc.awaitTermination()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

Dstream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。

#1 print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。

#2 saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。

#3 saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
#4 saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。

#5 foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。

通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。 
注意:
1)连接不能写在driver层面(序列化)
2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
增加foreachPartition,在分区创建(获取)。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

优雅的关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。

使用外部文件系统来控制内部程序关闭。

package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
import org.apache.spark.streaming.dstream.{ ReceiverInputDStream}

object SparkStreaming_Close {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行操作
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)
    socketDS.print()
    //stop()方法主要用于停止数据采集和Driver的调度
    //不可能启动采集后马上停止,所以一般在业务更新或者逻辑更新的场合停止
    //在main线程中停止是不现实的。应该启动一个新的线程来执行停止操作

    new Thread(new Runnable {
      override def run(): Unit = {
        while(true){
          Thread.sleep(10000) //时间
          //这个时间应该是判断数据处理是否可以继续的标记
          //这个标记应该多线程都可以访问
          //这个标记一般情况放在第三方的系统中(mysql,redis)
          val state: StreamingContextState = ssc.getState() //装填
          if (state == StreamingContextState.ACTIVE){
            ssc.stop(true,true) //优雅的关闭
            System.exit(0) //关闭线程
          }
        }
      }
    }).start()


    ssc.start()
    ssc.awaitTermination()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

案例

模拟实时生成数据,然后统计分析

package com.pihao.main

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.collection.mutable.ListBuffer
import scala.util.Random

object SparkStreaming_MockData {
  def main(args: Array[String]): Unit = {
    // 创建配置对象
    val prop = new Properties()

    // 添加配置
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](prop)
    val topic = "sparkstreaming"
    val areas = ListBuffer("华北","华东","华南")
    val citys = ListBuffer("北京","上海","深圳")


    while(true){
      Thread.sleep(2000)
      //生成数据
      for (i <- 1 to new Random().nextInt(20)){
        val area: String = areas(new Random().nextInt(3))
        val city: String = citys(new Random().nextInt(3))
        val userId = new Random().nextInt(10)
        val adId = new Random().nextInt(10)

        val message = s"${System.currentTimeMillis()} ${area} ${city} ${userId} ${adId}"
        val record = new ProducerRecord[String,String](topic,message)
        producer.send(record)

      }

    }
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
package com.pihao.main


import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object KafkaStream {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行操作
    //定义kafka的连接配置
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "spark",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    //Kafka专门用于实时数据生成,提供了很多封装类
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个broker
        ConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara))

    //获取广告点击的数据
    val kafkaData: DStream[AdvClickData] = kafkaDStream.map(
      data => {
        val kafkaVal: String = data.value()
        val datas: Array[String] = kafkaVal.split(" ")
        AdvClickData(datas(0),datas(1),datas(2),datas(3),datas(4))
      }
    )

    // todo 1.周期性获取数据库中最新的黑名单数据
    val reduceDS: DStream[((String, String, String), Int)] = kafkaData.transform(
      rdd => {
        // todo 2.判断当前周期内的数据是否已经在黑名单中
        // 查询数据库操作select
        val blackList = List[String]()
        val filterRDD: RDD[AdvClickData] = rdd.filter(
          data => !blackList.contains(data.userId)
        )

        //todo 3.将一个周期内的数据进行统计
        filterRDD.map(
          data => {
            //将数据组装返回 timestamp这里要换成日期格式
            ((data.timestamp, data.userId, data.adId), 1)
          }
        ).reduceByKey(_ + _)
      }
    )

    //todo 4.判断统计的结果是否超过阈值
    reduceDS.foreachRDD(
      rdd => {
        rdd.foreach {
          case ((timestamp, userId, adId), sum) => {
              //如果当前采集周期内的sum都大于100,那么将userId就直接拉入黑名单
              if (sum>=100){
                //insert into
              }else{
                val oldStateVal = 0 //select 根据userId查询mysql中旧的值
                val newStateVal = 0 + sum
                if (newStateVal >= 100){
                  // insert into
                }else{
                  // update 根据数据库newStateVal
                }
              }
          }
        }
      }
    )


    ssc.start()
    ssc.awaitTermination()
  }


  case class AdvClickData(timestamp: String, area: String, city: String, userId: String, adId: String)
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/925726
推荐阅读
相关标签
  

闽ICP备14008679号