当前位置:   article > 正文

Apache Spark基础知识

apache spark

我的spark学习笔记,基于Spark 2.4.0

目录

一、简介

Apache Spark是一个分布式计算框架,相比上一代计算框架MapReduce速度更快,且提供更多更方便的接口和函数实现。

Spark软件栈示意图

Spark包括Spark Core、Spark Sql、Spark Structured Streaming、Spark Streaming、Spark MLib、Spark GraghX。

Spark Core里包含任务调度、内存管理、错误恢复、与存储系统交互等模块。还包括RDD(Resilient Distributed Dateset 弹性分布式数据集)的定义。

Spark Sql是Spark提供的处理结构化数据的包,通过Spark Sql可以使用SQL或 hive sql来查询数据。

Spark Structured Streaming是Spark提供的以SQL的方式处理流式数据的包。

Spark Streaming是Spark 提供的处理流式数据的包。

Spark MLib是Spark封装好的机器学习包,封装了一些算法。

Spark GraghX是Spark提供的处理图数据的包。

PS:Spark编程 最好使用Scala语言,Scala Api比Java Api调用方便,Scala性能比Python更好

二、RDD编程

1 RDD介绍

Spark1.0开始就有的,弹性分布式数据集。数据的抽象模型,跨集群节点分区的元素集合,可以并行操作,且有容错恢复功能。不应该把RDD看做存放特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的,记录如何计算数据的指令列表。

2 RDD操作

RDD操作分为两类,一类是transformation(转换算子),转换算子会返回一个新的RDD,转换算子都是懒执行;另一类是action(行动算子),行动算子会返回汇总RDD或者输出数据,行动算子是立即执行的。
转化操作可能会多次执行,所以为了避免读取超时和重算消耗,读取时间较长或多次使用的RDD必须持久化。

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
val totoalCount = lineLengths.count()
  • 1
  • 2
  • 3
  • 4

这个例子里,因为有reduce()、count()两个行动算子,所以textFile()、map()会执行2次。
如果在第2行后插入一行如下代码

lineLengths.persist()
  • 1

这样的话每个算子都会只运行一次

2.0 读操作

RDD读操作的方法都在SparkContext里,下面是几个常用的

读操作作用
textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]从HDFS或本地文件系统或其他支持Hadoop文件接口的其他数据源读取文件,返回一个String类型的RDD
binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]读取二进制文件
objectFile[T](path: String, minPartitions: Int = defaultMinPartitions)(implicit arg0: ClassTag[T]): RDD[T]读取序列化后的Sequence文件
sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)]根据给定的key-value格式读取,返回一个String类型的RDD
wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]读取文件,一个文件会生成一个(文件名,文件内容)的k v ,适用于处理多个小文件或处理整个文件
2.1 常用Tramsformation算子

下表列出了 Spark 支持的一些常见转换。有关详细信息,请参阅 RDD API 文档(ScalaJavaPython)和配对 RDD 函数文档(ScalaJava)。

转换算子作用
map(func)返回使用传入函数转换每个元素后的RDD
filter(func)过滤操作,保留传入函数判断为true的数据
flatMap(func)类似map,不同的是,flatMap输入一个可迭代的列表,输出0到多个,这就要求func返回值必须是可迭代的数据结构
mapPartitions(func)类似map,不同的是mapPartitions是针对一个分区做一次操作,输入和输出都是可迭代的数据结构
mapPartitionsWithIndex ( func )通过对这个 RDD 的每个分区应用一个函数来返回一个新的 RDD,同时可以得到原始分区的索引。
sample(withReplacement, fraction, seed)使用给定的随机数生成器种子对数据的一小部分进行采样,无论是否有替换。
union(otherRDD)返回两个RDD合并后的RDD
intersection(otherRDD)返回两个RDD交集的RDD
distinct([numPartitions]))返回去除重复元素的RDD
groupByKey([numPartitions])在 (K, V) 对的数据集上调用时,返回 (K, Iterable) 对的数据集。如果分组是为了对每个键执行聚合(例如求和或平均值),使用reduceByKey或aggregateByKey将产生更好的性能。默认情况下,输出中的并行级别取决于父 RDD 的分区数。可以传递一个可选numPartitions参数来设置不同数量的任务。
reduceByKey(func, [numPartitions])当在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数func聚合,该函数必须是 (V,V) => V. 与groupByKey一样,reduce 任务的数量可通过可选的第二个参数进行配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])PairRDD才能调用,使用给定的聚合函数和中性的“零值”聚合每个键的值。这个函数可以返回与这个RDD V中的值类型不同的结果类型U。同样可以传入一个分区数来设置分区的数量
sortByKey([ascending], [numPartitions])PairRDD才能调用,根据key排序,特殊点在于会产生shuffle
join(otherDataset, [numPartitions])当调用类型为 (K, V) 和 (K, W) 的数据集时,返回一个 (K, (V, W)) 对的数据集,其中包含每个键的所有元素对。外连接通过leftOuterJoin,rightOuterJoin和fullOuterJoin实现。
cogroup(otherDataset, [numPartitions])当调用类型为 (K, V) 和 (K, W) 的数据集时,返回一个包含 (K, (Iterable, Iterable)) 元组的数据集。此操作和groupWith相同。
pipe(command, [envVars])通过 shell 命令(例如 Perl 或 bash 脚本)管理 RDD 的每个分区。RDD 元素被写入进程的标准输入,输出到标准输出的行作为字符串的 RDD 返回。
coalesce(numPartitions)将 RDD 中的分区数减少到 numPartitions。对过滤大型数据集后更有效地运行操作很有用。
repartition(numPartitions)随机重组 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡,会产生shuffle。repartion是coalesce的shuffle为true的调用
repartitionAndSortWithinPartitions(partitioner)根据给定的分区器对 RDD 进行重新分区,并在每个结果分区内,按键对记录进行排序。这比repartition在每个分区内调用然后排序更有效,因为它可以将排序下推到 shuffle 机器中。
2.2 常用Action算子

下表列出了 Spark 支持的一些常见操作。请参阅 RDD API 文档(ScalaJavaPython、 R)和Pair RDD 函数文档(ScalaJava )了解详细信息。

action算子作用
reduce(func)使用函数func(它接受两个参数并返回一个)聚合数据集的元素。该函数应该是可交换的和关联的,以便它可以被正确地并行计算。
collect()在驱动程序中将数据集的所有元素作为数组返回。这通常在过滤器或其他返回足够小的数据子集的操作之后很有用。
count()返回数据集中元素的数量。
first()返回数据集的第一个元素(类似于 take(1))。
take(n)返回一个包含数据集前n 个元素的数组。
takeSample(withReplacement, num, [seed])返回一个数组,其中包含数据集的num 个元素的随机样本,有或没有替换,可选择预先指定随机数生成器种子。
takeOrdered(n, [ordering])使用自然顺序或自定义比较器返回RDD的前n 个元素。
saveAsTextFile(path)将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 以将其转换为文件中的一行文本。
saveAsSequenceFile(path)将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对的 RDD 上可用。在 Scala 中,它也可用于隐式转换为 Writable 的类型(Spark 包括对 Int、Double、String 等基本类型的转换)。
saveAsObjectFile(path)使用 Java 序列化以简单格式编写数据集的元素,然后可以使用 SparkContext.objectFile()读取
countByKey ()仅适用于 (K, V) 类型的 RDD。返回 (K, Int) 对的HashMap,其中包含每个键的计数。
foreach(func)对数据集的每个元素运行函数func。
2.3 传递方法、对象、变量

scala传递方法k可以传递方法的引用或静态方法传递给Spark,引用类型必须实现序列化,尽量避免使用引用类型。变量尽量使用临时变量。

2.4 Shuffle操作

Spark 中的某些操作会触发一个称为 shuffle 的事件。shuffle 是 Spark 用于重新分配数据的机制,以便它跨分区进行不同的分组。这通常涉及在执行器和机器之间复制数据,从而使 shuffle 成为一项复杂且成本高昂的操作。

要了解在 shuffle 期间发生了什么,可以考虑reduceByKey操作示例 。该reduceByKey操作生成一个新的 RDD,其中单个键的所有值都组合成一个元组 - 键和针对与该键关联的所有值执行 reduce 函数的结果。挑战在于,单个键的所有值不一定位于同一分区,甚至同一台机器上,但它们必须位于同一位置以计算结果。

在 Spark 中,数据通常不会跨分区分布在特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,为了组织单个reduceByKeyreduce 任务执行的所有数据,Spark 需要执行一个 all-to-all 操作。它必须从所有分区中读取以找到所有键的所有值,然后将跨分区的值组合在一起以计算每个键的最终结果 - 这称为shuffle。

编码中要尽量避免shuffle

2.5 RDD持久化

Spark 中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当你持久化一个 RDD 时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从它派生的数据集)的其他操作中重用它们。这使得未来的动作可以更快(通常超过 10 倍)。缓存是迭代算法和快速交互使用的关键工具。

可以使用persist()、cache()、checkpoint()。其中cache()调用了persist(StorageLevel.MEMORY_ONLY)。

每个持久化的 RDD 都可以使用不同的存储级别来存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的 Java 对象(以节省空间),跨节点复制它。这些级别是通过将 StorageLevel对象(Scala、 Java、 Python)传递给persist()

在这里插入图片描述
持久化级别的选择

  • 如果硬件资源充足建议使用内存
  • 如果硬件资源不足,建议使用内存和磁盘
  • 硬件资源最不理想可以使用磁盘

Spark 还会在 shuffle 操作(例如reduceByKey)中自动保留一些中间数据,即使没有调用persist.。这样做是为了避免在 shuffle 期间节点失败时重新计算整个输入。

2.6 共享变量

通常,当传递给 Spark 操作(例如map或reduce)的函数在远程集群节点上执行时,它会处理函数中使用的所有变量的单独副本。这些变量被复制到每台机器上,并且对远程机器上的变量的更新不会传播回驱动程序。支持跨任务的通用读写共享变量将是低效的。但是,Spark 确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。

2.6.1 广播变量

一般情况下,传递一个变量给map()等方法时,会在每个task基本保存一个变量,如果变量很大时就会导致占用很大空间。

广播变量允许我们在每台机器上缓存一个只读变量,而不是随任务一起传送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark 还尝试使用高效的广播算法来分发广播变量以降低通信成本。

广播变量是通过调用SparkContext.broadcast(v)从变量创建的。广播变量是围绕变量的包装器,可以通过调用value 方法访问其值。下面的代码显示了这一点:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
  • 1
  • 2
  • 3
  • 4
  • 5

需要注意的是广播变量不应该在广播后修改。

2.6.2 累加器

累加器是可以在分布式执行的一个计数器。支持一些默认的累加器,也支持自定义累加器

累加器的例子

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

scala> accum.value
res2: Long = 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3 性能优化

3.1 RDD复用

相同算子相同计算逻辑的,一定要重用,不要重复计算。

3.2 尽可以提前filter

提前将不需要的数据过滤掉,可以减少后面计算的时间。如果先计算后过滤,则浪费了部分计算时间。

3.3 读取多个小文件

读取多个小文件或处理单个文件要用wholeTextFiles读取

3.4 map和mapPartition

普通的map算子每个元素做一次操作,如果是写数据库之类的成本比较高的操作成本就太高了。此时可以用mapPartition,mapPartition算子是一个分区的数据做一次操作。
数据量大可能会OOM,所以要适当调整内存。

其他的foreach和foreachPartition也是类似的。

3.5 filter+coalesce/repartition

filter后,有一些分区的数据可能会减少很多,一些分区减少不多,此时就会造成分区间数据分布不均匀。此时重分区可以适当提升性能。

使用filter后,可以用coalesce或者repartition调整分区数。

  1. 减少分区
    使用coalesce,shuffle设置为false
  2. 增多分区
    使用repartition,或者coalesce的shuffle设置为true

3.6 并行度设置

Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。这样可以更充分地利用硬件资源。
并行度可以通过SparkConf设置

val conf = new SparkConf().set("spark.default.parallelism", "300")
  • 1

3.7 聚合算子尽量使用reduceByKey

聚合算子尽可能少用groupByKey,尽可能多的用reduceByKey。
reduceByKey会在map端预聚合,每个key预先计算出一个值,到reduce再进行最终聚合。所以会节约IO,网络传输等时间。

groupByKey则是将所有数据都拉取到reduce端才会执行聚合,相比reduceByKey而言浪费了很多时间。

3.8 使用持久化+checkpoint

以下情况应该将RDD缓存起来

  1. RDD在后续会多次使用
  2. RDD来自数据库,可能会遇到超时问题
  3. RDD计算成本很高,遇到节点故障等问题,重算成本高

checkpoint可将数据缓存到HDFS等文件系统,如果缓存数据丢失可以读取checkpoint数据。缺点是与文件系统交互,io速度慢。

遇到这三种情况必须将RDD缓存

sc.setCheckpointDir('HDFS')
rdd.cache/persist(memory_and_disk)
rdd.checkpoint
  • 1
  • 2
  • 3

3.9 使用广播变量

默认情况下RDD中使用外部变量,会在每个task中生成一个副本,如果变量数据很大会占用很多内存。此时要使用广播变量,广播变量会在每个executor保存一个副本,这个executor的所有task都会引用这个副本,task很多的时候可以节约很多内存。

3.10 使用Kryo序列化

从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs已经默认是Kryo序列化了。

自定义的类需要实现

public class MyKryoRegistrator implements KryoRegistrator{
  @Override
  public void registerClasses(Kryo kryo){
    kryo.register(StartupReportLogs.class);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
//创建SparkConf对象
val conf = new SparkConf().setMaster().setAppName()
//使用Kryo序列化库
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化库中注册自定义的类集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3.11 join前先给两个PairRDD指定相同分区器

join前给两个RDD指定相同的分区器,可以避免昂贵的shuffle操作。
如果不指定就会产生shuffle。

3.12 数据倾斜

某个分区的数据显著多于其他分区的数据导致单个task执行时间远远超出平均执行时间。

表现为

  • 绝大多数 task 执行得都非常快,但个别 task 执行极慢,整体任务卡在某个阶段不能结束。
  • 原本能够正常执行的 Spark 作业,某天突然报出 OOM(内存溢出)异常,观察异常栈,是写的业务代码造成的。
3.12.1 数据源倾斜

例如某市不同街道上的过车记录,一定是十几条主要街道过车记录远远大于其他街道。
这种数据源本身就是倾斜的。

像这种数据可以针对性的采集时就将主要街道的记录针对性的打散,比如加1~8的前缀,聚合计算后,再次汇总聚合。
缺点:需要预处理数据,计算步骤增多

3.12.2 shuffle数据倾斜

场景:大量不同的Key被分配到了相同的Task造成该Task数据量过大。

  1. 调整shuffle算子并行度

例如原本有10个task,则可以尝试repartition(20),将多个key分不到其他分区,缓解shuffle。
也可以尝试减少并行度,coalesce(5,true)减少分区,使数据更加均匀,缓解shuffle。具体减少还是增加,需要测试对比。
劣势:适用场景少,只能将分配到同一Task的不同Key分散开,但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。

  1. 换分区器

默认情况下Spark使用HashPartitioner,HashPartitioner是对key求取hash值再对partitions 取余数的方法,因此如果大部分key是相同的话将会导致,各partition之间存在数据倾斜的问题,极端情况下,RDD的所有row被分配到了同一个partition中。

RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

因此可以尝试使用RangePartitioner来缓解数据倾斜。

  1. 自定义分区器

将多个key分布到不同分区

完全随机分区

class CustomerPartition(partitions: Int) extends Partitioner {

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = {

    (key.toString.charAt(0) + scala.util.Random.nextInt(10)) % numPartitions
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

只针对部分key随机分区

package org.apache.spark.examples
import org.apache.spark.{HashPartitioner, Partitioner}

class CustomerPartition(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => nonNegativeMod(key, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions

  def nonNegativeMod(x: Any, mod: Int): Int = {
    val skewKeys = Set("key1","key2","key3","key4","key5")

    val keyHashCode = x.hashCode
    val rawMod = keyHashCode % mod
    if(skewKeys.contains(x.toString)){
      val sourcePartitionNum = rawMod + (if (rawMod < 0) mod else 0)
      sourcePartitionNum + (scala.util.Random.nextInt(mod + skewKeys.size) % (mod + skewKeys.size))
    }else{
      rawMod + (if (rawMod < 0) mod else 0)
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
3.12.3 join数据倾斜
  1. 使用自定义分区器
    join前必须两个RDD设置相同的分区器(可以使用3.12.2 shuffle数据倾斜里的自定义分区器或其他自定义分区器)重分区

  2. 倾斜RDD加n个随机前缀,不倾斜RDD膨胀n倍
    将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。再做需要的聚合操作。

案例

package org.apache.spark.examples

import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.sql.SparkSession
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf}
import scala.util.Random

class DataSkew {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    sparkConf.setAppName("ResolveDataSkewWithNAndRandom")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc = sparkSession.sparkContext
    val leftRdd = sc.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/").map(row => (row.split(",")(0),row.split(",")(1)))
    val rightRdd = sc.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/").map(row => (row.split(",")(0),row.split(",")(1)))
    var addList: List[String] = List()
    var a:Int = 0
    for(a <- 1 to 20){
      addList = addList :+ a.toString
    }
    val addListKeys = sc.broadcast(addList)
    val newRandomPrefixLeftRDD = leftRdd.map(t2 => (Random.nextInt(20)+","+t2._1,t2._2)).partitionBy(new CustomerPartition(20))
    val rightPrefixRdd = rightRdd.flatMap(t2 => (addListKeys.value.toStream.map((s => (s + "," +t2._1,t2._2))).toList)).partitionBy(new CustomerPartition(20))
    val joinRDD = leftRdd.join(rightRdd).map(t2 => (t2._1.split(",")(1),t2._2._2))
    joinRDD.foreachPartition(iterable => {
      val atomicInteger = new AtomicInteger()
      iterable.toStream.foreach(t2 => atomicInteger.incrementAndGet())
    })
    sc.stop()
    sparkSession.stop()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

三、Spark SQL、DataFrame、Dataset

DataFrame是Dataset的一个子集,从源码来看DataFrame就是Row类型的Dataset。Dataset类似关系数据库的一张表。Dataset提供了编译时类型检查。

1 入门

1.1 SparkSession

sparksql的功能入口是SparkSession,SparkSession创建案例

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// RDD到DataFrame的隐式转换
import spark.implicits._
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

1.2 创建DataFrame

使用SparkSession通过已有RDD创建,也可以通过hive、读取其他数据源创建
读取操作都是通过DataFrameReader类读取的

读取Json案例

val df = spark.read.json("examples/src/main/resources/people.json")

df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

1.3 DataFrame操作

1.3.1 DSL操作

DataFrame提供一种DSL(特定领域语言)来操作数据,支持ScalaJavaPython API

由于2.0之后,DataFrame实际上就是Row类型的Dataset,所以这些操作也可适用于Dataset

案例

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

示例代码可以在spark源码仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"中找到完整的示例代码。

全部操作可以点上面蓝字部分的API文档

1.3.2 以编程方式运行SQL

SparkSession的sql允许适用SQL操作DataFrame和Dataset

案例

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1.4 全局临时视图

临时视图,根据DataFrame或Dataset可以创建临时视图,相当于一张表,临时视图有不同的作用域;session级别的临时视图只能被当前SparkSession看到,且会随当前SparkSession的消失而消失;如果希望创建一个所有会话共享的临时视图可以创建全局临时视图,全局级的的临时视图可以被所有SparkSession看到。

全局临时视图存在系统数据库global_temp中,对全局临时视图查询必须加数据库名,否则会报一个找不到临时视图的错。例如SELECT * FROM global_temp.view1

// 将 DataFrame 注册为全局临时视图
df . createGlobalTempView ( "people" )

// 全局临时视图绑定到系统保留的数据库 `global_temp` 
spark 。sql ( "SELECT * FROM global_temp.people" )。show () 
// +----+-------+ 
// | 年龄| 姓名| 
// +----+-------+ 
// |null|迈克尔| 
// | 30| 安迪| 
// | 19| 贾斯汀| 
// +----+-----+

// 全局临时视图是跨会话
spark . 新会话()。sql ( "SELECT * FROM global_temp.people" )。show () 
// +----+-------+ 
// | 年龄| 姓名| 
// +----+-------+ 
// |null|迈克尔| 
// | 30| 安迪| 
// | 19| 贾斯汀| 
// +----+-----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

在 Spark源码库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”中可以找到完整的示例代码。

1.5 创建DataSet

Dataset类似RDD,但Dataset不使用Java序列化或Kryo而是使用专门的编码器。 这种编码器动态生成代码的方式允许Spark执行许多操作,比如filter、sort、hash等,而无需反序列化成对象。

case class Person(name: String, age: Long)

// 编码器通过样例类创建
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// 对大部分类型编码器可以通过导入隐式转换实现即 importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrame转化为Dataset只需要提供一个类名即可
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

1.6 与RDD互操作

Spark SQL提供两种方式将现有RDD转化为DataFrame。第一种方法使用反射来推断包含特定类型对象的 RDD 的模式。在编写 Spark 应用程序时已经知道schema时,这种基于反射的方法会产生更简洁的代码并且效果很好。

创建数据集的第二种方法是通过编程接口,该接口允许构建schema,然后将其应用于现有 RDD。虽然此方法更加冗长,但它允许在列及其类型直到运行时才知道时构造数据集。

1.6.1 反射

// 用于从RDD到DataFrame的隐式转换
import spark.implicits._

// 从一个文本文件创建一个Person对象的RDD,将其转化为DataFrame
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// 通过Spark提供的sql方法可以执行sql语句
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// 可以通过索引访问列
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// 或者通过列名访问
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// 没有预先定义Dataset[Map[K,V]]的编码器, 使用kryo编码器显示定义
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// 原始类型和样例类也可以定义成下面的编码器
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// 这个方法 row.getValuesMap[T] 把所有的列和值放到一个 Map[String, T]里
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

全部示例代码位于Spark代码仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”

1.6.2 创建schema

如果预先无法得到schema,可以通过如下步骤将RDD转化为DataFrame

  1. 将RDD转化为Row类型RDD
  2. 创建和上面Row类型RDD相对应的StructType
  3. 使用Spark提供的createDataFrame方法根据schema和RowRDD创建DataFrame
import org.apache.spark.sql.types._

// 创建一个RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// schema定义
val schemaString = "name age"

// 根据上面string类型的schema创建StructType
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// 将People类型的RDD转化为Row类型的RDD
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// 根据schema和RDD创建DataFrame
val peopleDF = spark.createDataFrame(rowRDD, schema)

// 根据DataFrame创建临时视图
peopleDF.createOrReplaceTempView("people")

// 在临时视图上执行sql
val results = spark.sql("SELECT name FROM people")

// SQL查询返回一个DataFrame,支持所有RDD支持的操作
// 一行数据可通过索引和列名访问
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

在 Spark 源码仓库“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”中可以找到完整的示例代码。

1.7 聚合方法

内置聚合方法提供了包括count(), countDistinct(), avg(), max(), min()等等。这些方法在Spark Sql中也有对应的类型安全的ScalaJava实现。

如果预定义的无法满足需求可以自定义聚合函数。

1.7.1 弱类型自定义聚合函数

弱类型的自定义聚合函数适用于DataFrame。无类型的自定义聚合函数要继承UserDefinedAggregateFunction抽象类,实现其中的方法。一个求平均数弱类型用户自定义聚合函数案例

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

object MyAverage extends UserDefinedAggregateFunction {
  // 聚合函数输入参数数据类型
  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  // 聚合buffer的数据类型
  def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  // 返回值的类型
  def dataType: DataType = DoubleType
  // 输入相同时,函数是否总输出相同的结果
  def deterministic: Boolean = true
  // 初始化buffer,buffer本是是row对象。
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  // 根据新值修改buffer的值
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  // 聚合buffer的值,聚合后的值还存到buffer里
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  // 计算最终结果
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// 注册聚合函数来调用它
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

在 Spark 源码库"examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala"中可以找到完整的示例代码

1.7.2 类型安全的自定义聚合函数

强类型的自定义聚合函数要继承Aggregator

一个求平均数的类型安全的自定义聚合函数案例

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
  // 0值,要满足一个值加0还等于它本身
  def zero: Average = Average(0L, 0L)
  // 聚合两个值,返回聚合后的buffer对象
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }
  // 合并两个中间值
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // 转换为输出结果
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // 指定中间值类型编码器
  def bufferEncoder: Encoder[Average] = Encoders.product
  // 指定最终输出值类型的编码器
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

// 将函数应用到Dataset,并重命名为一个列
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

2 数据源读写

API案例

3 性能优化

3.1 缓存表

多次使用的表可以缓存起来
通过spark.catalog.cacheTable("tableName") 或 dataFrame.cache()
然后 Spark SQL 将只扫描需要的列并自动调整压缩以最小化内存使用和 GC 压力。可以调用spark.catalog.uncacheTable("tableName")从内存中删除该表。

可以使用SparkSession的setConf方法或通过SET key=value使用 SQL运行 命令来完成内存缓存的配置。

属性名默认值意义
spark.sql.inMemoryColumnarStorage.compressedtrue当设置为 true 时,Spark SQL 将根据数据统计自动为每列选择一个压缩编解码器。
spark.sql.inMemoryColumnarStorage.batchSize10000控制列缓存的批次大小。更大的批处理大小可以提高内存利用率和压缩率,但在缓存数据时会面临 OOM 的风险。

3.2 参数调整

以后的版本里这些参数可以会被干掉,未来将会自动优化

属性名默认值意义
spark.sql.files.maxPartitionBytes134217728 (128 MB)读取文件时打包到单个分区的最大字节数。
spark.sql.files.openCostInBytes4194304 (4 MB)打开一个文件的估计成本,以同时扫描的字节数来衡量。这在将多个文件放入一个分区时使用。最好是高估,那么小文件的分区会比大文件的分区(排在最前面)快。
spark.sql.broadcastTimeout300广播加入中广播等待时间的超时(以秒为单位)
spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)配置表的最大大小(以字节为单位),该表将在执行连接时广播到所有工作节点。通过将此值设置为 -1 可以禁用广播。请注意,当前仅支持ANALYZE TABLE COMPUTE STATISTICS noscan已运行该命令的 Hive Metastore 表的统计信息 。
spark.sql.shuffle.partitions200配置shulffle数据以进行连接或聚合时要使用的分区数。

3.3 BoradCast join

broadcast join模式,会将小于spark.sql.autoBroadcastJoinThreshold值(默认为10M)的表广播到其他计算节点,不走shuffle过程,所以会更加高效。

scala案例

import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), "key").show()
  • 1
  • 2

sql案例

-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
  • 1
  • 2

3.4 repartition/coalesce调节并行度

如果默认的SparkSQL生成的分区太少或太多,不能充分利用资源,可以读取数据后,调用repartition或者coalesce调整分区数。

四、Spark Streaming

Spark Streaming是基于Spark Core的扩展,用来处理流式数据的API。可以读取Kafka、Flume或者TCP套接字等多种数据源。 并且可以使用高级别功能表达复杂的算法map,reduce,join和window来处理数据。处理后可以将数据写入文件系统、数据库和实时仪表盘。

内部工作原理是接收实时数据,把数据切成一批一批的数据,一批数据作为一个RDD处理。
Spark Streaming 提供了一种称为离散流或DStream的高级抽象,它表示连续的数据流。DStream 可以从来自 Kafka、Flume 和 Kinesis 等来源的输入数据流创建,也可以通过对其他 DStream 应用高级操作来创建。在内部,DStream 表示为一系列 RDD。StreamingContext是所有流功能的主要入口点。

一个简单案例

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// 创建一个具有两个执行线程的本地 StreamingContext,批处理间隔为 1 秒。
// 必须至少是2个线程,一个接收数据,一个处理数据
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

// 创建一个DStream表示来自Socket的数据
val lines = ssc.socketTextStream("localhost", 9999)

// 将每一行用空格作为分隔符切割成一个个单词
val words = lines.flatMap(_.split(" "))

// 计算每个批次里的单词个数
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 打印前10个元素到控制台
wordCounts.print()

// 启动计算
ssc.start()
// 等待计算终止        
ssc.awaitTermination()  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

完整案例https://github.com/apache/spark/blob/v2.4.0/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala

1 初始化 StreamingContext

Spark Streaming API doc

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
  • 1
  • 2
  • 3
  • 4
  • 5

appName是应用程序在集群UI或本地UI上显示的应用名称。master是一个Spark、Mesos 或 YARN 集群 URL,或者是一个表示本地运行的字符串local[*]。上面硬编码只是作为案例,还可以通过运行Spark程序时指定

创建StreamingContext时,会默认创建一个SparkContext(所有Spark功能的起点),可以通过这样得到ssc.sparkContext

时间间隔必须根据程序的延迟要求和集群的可用资源设置,详情可看后面的性能调优。

定义上下文后,您必须执行以下操作。

  1. 通过创建输入 DStream 来定义输入源。
  2. 通过对 DStreams 应用转换和输出操作来定义流计算。
  3. 开始接收数据并使用streamingContext.start().
  4. 使用 等待处理停止(手动或由于任何错误)streamingContext.awaitTermination()。
  5. 可以使用 手动停止处理streamingContext.stop()。

重要的点:

  1. 一旦上下文启动,就不能设置或添加新的流计算。
  2. 上下文一旦停止,就无法重新启动。
  3. 一个 JVM 中只能同时激活一个 StreamingContext。
  4. StreamingContext 上的 stop() 也会停止 SparkContext。要仅停止 StreamingContext,请将stop()调用的可选参数设置stopSparkContext为 false。
  5. 只要在创建下一个 StreamingContext 之前停止前一个 StreamingContext(不停止 SparkContext),就可以重新使用 SparkContext 来创建多个 StreamingContext。

2 DStream

Discretized Stream或DStream是 Spark Streaming 提供的基本抽象。表示一个连续的数据流,可以是从源接收到的输入数据流,也可以是通过转换输入流生成的处理后的数据流。在内部,DStream 由一系列连续的 RDD 表示。DStream 中的每个 RDD 都包含来自某个区间的数据,如下图所示。
在这里插入图片描述
对 DStream 应用的任何操作都会转换为对底层 RDD 的操作。例如,在前面将行转换为单词的示例中,该flatMap操作应用于linesDStream 中的每个 RDD,以生成 DStream 的 wordsRDD。这如下图所示。
在这里插入图片描述
这些底层 RDD 转换由 Spark 引擎计算。DStream 操作隐藏了大部分这些细节,并为开发人员提供了更高级别的 API 以方便使用。

3 输入DStream和接收器

输入 DStreams 是表示从流源接收的输入数据流的 DStreams。在前面的简单案例中lines是一个输入 DStream,因为它表示从 netcat 服务器接收到的数据流。每个输入 DStream(除了文件流,本节稍后讨论)都与一个Receiver (Scala docJava doc)对象相关联,该对象从源接收数据并将其存储在 Spark 的内存中以供处理。

Spark Streaming提供了两种流式数据源连接方式

  1. 基础数据源。直接在 StreamingContext API 中可用的来源。例如文件系统和socket连接。
  2. 高级数据源。Kafka、Flume、Kinesis 等来源可通过额外的实用程序类获得。这些需要链接额外的依赖项,如 链接部分所述。

Spark Streaming支持读取多个流式数据源,这样会创建多个DStream和多个接收器。要集群资源核数是否够用。调试的话,建议用local[*]。集群运行的话,设置的核数必须大于程序中接收器的个数。

3.1 基本数据源

socket案例在1 里已经有了,这里不再赘述

3.1.1 文件流

文件流不需要接收器,所以不需要分配额外的核


streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

streamingContext.textFileStream(dataDirectory)
  • 1
  • 2
  • 3
  • 4
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

完整示例可以在spark源码库的examples\src\main\scala\org\apache\spark\examples\streaming\HdfsWordCount.scala中找到完整案例

Spark Streaming如何监控目录:

  • Spark Streaming 将监视给定目录并处理在该目录中创建的任何文件
  • 可以监控一个简单的目录,例如"hdfs://namenode:8040/logs/". 直接在此类路径下的所有文件将在发现时进行处理。
  • 支持通通配符,例如 “hdfs://namenode:8040/logs/2017/*”。在这里,DStream 将包含与模式匹配的目录中的所有文件。也就是说:它是一种目录模式,而不是目录中的文件。
  • 所有文件必须采用相同的数据格式。
  • 文件被视为基于其修改时间而非创建时间的时间段的一部分。
  • 处理后,对当前窗口中的文件所做的更改不会导致重新读取该文件。即:更新被忽略。
  • 录下的文件越多,扫描更改所需的时间就越长——即使没有文件被修改。
  • 如果使用通配符来标识目录,例如"hdfs://namenode:8040/logs/2016-*",重命名整个目录以匹配路径会将目录添加到受监视目录列表中。只有修改时间在当前窗口内的目录中的文件才会包含在流中。
  • 调用FileSystem.setTimes() 修复时间戳是一种在以后的窗口中提取文件的方法,即使它的内容没有改变。
3.1.2 使用对象存储数据源

“完整”文件系统(例如 HDFS)倾向于在创建输出流后立即对其文件设置修改时间。当一个文件被打开时,甚至在数据被完全写入之前,它可能被包含在DStream- 之后在同一窗口内对文件的更新将被忽略。为确保在窗口中获取更改,请将文件写入不受监视的目录,然后在关闭输出流后立即将其重命名为目标目录。

如果重命名的文件在其创建窗口期间出现在扫描的目标目录中,则将选取新数据。相比之下,Amazon S3 和 Azure Storage 等对象存储通常具有较慢的重命名操作,因为数据实际上是复制的。此外,重命名的对象可能会将rename()操作时间作为其修改时间,因此可能不会被视为原始创建时间所暗示的窗口的一部分。需要针对目标对象存储进行仔细测试,以验证存储的时间戳行为是否与Spark Streaming 预期的一致。直接写入目标目录可能是通过所选对象存储流式传输数据的适当策略。

3.1.3 基于自定义接收器的流

可以使用通过自定义接收器接收的数据流创建 DStream。有关更多详细信息,请参阅自定义接收器指南

3.1.4 根据RDD 队列创建流

为了使用测试数据测试 Spark Streaming 应用程序,还可以创建基于 RDD 队列的 DStream,使用streamingContext.queueStream(queueOfRDDs). 每个推入队列的 RDD 都会在 DStream 中被视为一批数据,并像流一样进行处理。

3.1.5 接收器的可靠性

根据其可靠性,可以有两种数据源。来源(如 Kafka 和 Flume)允许确认传输的数据。如果从这些可靠来源接收数据的系统正确确认接收到的数据,就可以确保不会因任何类型的故障而丢失数据。这导致了两种接收器:

  • 可靠接收器。当数据被接收并通过复制存储在 Spark 中时,可靠的接收器会正确地向可靠源发送确认。
  • 不可靠接收器。不会向源发送确认。这可以用于不支持确认的来源,甚至可以用于当人们不想或不需要进入确认的复杂性时的可靠来源。
    自定义接收器指南讨论了如何设计可靠接收器。

3.2 高级数据源

3.2.1 以Kafka为数据源

Spark Streaming 2.4.0 与 Kafka 代理版本 0.8.2.1 或更高版本兼容。

Spark Streaming提供了2种连接模式

  1. Direct直连模式
  2. Receiver模式
3.2.1.1 Direct直连模式

SparkStreaming会启动和Kafka分区数对应的Task数去读取Kafka,每个task读取对应的分区。读取到内存中后,Kafka的的一个分区数据会成为RDD的一个分区的数据。

  1. 手动维护偏移量

直连模式Spark任务的偏移量存储在内存中,一旦任务挂掉就要从头开始消费,所以需要用户自己维护偏移量。读取完毕将偏移量保存到Zookeeper或Redis或HDFS,这样当任务挂掉,可以从保存位置直接读取。SparkStreaming处理间隔在1秒以上可以使用Zookeeper,小于1秒建议使用Redis。

  1. 优势

处理流程简单,无需启动Receiver
读取数据自由,任务失败可以重新去Kafka读取

  1. 直连模式的问题

在这里插入图片描述

At-most-once就是最多一次。这种就是先提交偏移量后,任务挂掉。这种情况下有些数据未处理就丢失掉了,下次任务周期只会从已更新的偏移量开始消费。这就是最多一次,也就是最多只会处理一次。不在乎丢数据的情况下可以使用这种方案。

At-least-once就是最少一次。这种就是提交偏移量之前任务挂掉。这种情况一些已经处理过的数据已经消费过,但偏移量没来得及提交,所以下次任务周期就还是从上次任务的偏移量开始读,这样就出现了重复消费数据的问题。要求不能丢数据可以使用这种方案。重复消费可以通过幂等性设计来避免。

Direct模式读取数据本身就符合幂等性,Spark Streaming内部处理也复合幂等性。所以要保证幂等性的就是输出端,比如写到HDFS(HDFS写本身就具有幂等性)、Mysql、Hbase、Redis等。

通用思路:

  1. 查询已处理过且处理成功的数据
  2. 读取数据后,根据已处理且成功的数据集过滤数据,保留未处理过的。
    所有输出端都可以使用这种方式,不过随着数据越多,查询所需要的成本也越来越高。

Mysql幂等,前提要有主键和唯一索引:

  1. 保存时使用replace into或者insert into … on duplicate key update …
  2. 保存时使用事务,失败则回滚

接口幂等,可以通过token:

  1. 客户端请求获取token,服务端根据token生成全局唯一ID,将ID保存到Redis,再将ID返回给客户端
  2. 客户端调用请求必须携带token也就是上面获取到的ID,一般放到请求头
  3. 服务端校验token,如果Redis存在token则执行业务。 如果不存在token则说明是重复操作,返回自定义结果
3.2.1.2 Receiver模式

在这里插入图片描述
流程

  1. Spark Streaming启动时会开启一个单独的线程接收数据
  2. 接收到的数据会被拆分然后分配到其他节点上
  3. 分发完后,会把偏移量更新到Zookeeper
  4. 更新Zookeeper后,向Driver报告数据位置
  5. Driver会分发任务到数据结点上,数据结点执行task

问题
更新完偏移量后,如果Driver突然挂掉,数据未成功处理。下次再次执行时,读取的数据已经是新一批的数据了。这种情况就造成数据丢失了。

Kafka 项目在 0.8 和 0.10 版本之间引入了一个新的消费者 API,因此有 2 个单独的对应 Spark Streaming 包可用。0.8与0.9或0.10兼容,但0.10版的集成和之前版本不兼容。如果考虑兼容以前项目建议使用0.8,如果是新项目建议使用0.10。需要注意的是0.10版本只支持Direct直连模式,而不再支持Receivrer模式

有关更多详细信息,参考Kafka集成指南

3.2.2 以Flume为数据源

Spark Streaming 2.4.0 与 Flume 1.6.0 兼容。有关更多详细信息,参考Flume集成指南

4 DStream的转换操作

与 RDD 类似,转换允许修改来自输入 DStream 的数据。DStreams 支持许多普通 Spark RDD 上可用的转换。一些常见的如下

转换操作作用
map(func)通过将源 DStream 的每个元素传递给函数func 来返回一个新的 DStream 。
flatMap(func)与 map 类似,但每个输入项可以映射到 0 个或多个输出项。
filter(func)通过仅选择func返回 true的源 DStream 的记录来返回新的 DStream 。
repartition(numPartitions)通过创建更多或更少的分区来更改此 DStream 中的并行级别。
union(otherStream)返回一个新的 DStream,它包含源 DStream 和otherDStream 中元素的联合 。
count()通过计算源 DStream 的每个 RDD 中的元素数量,返回一个新的单元素 RDD 的 DStream。
reduce(func)通过使用函数func(它接受两个参数并返回一个)聚合源 DStream 的每个 RDD 中的元素,返回一个新的单元素 RDD 的 DStream 。该函数应该是关联的和可交换的,以便它可以并行计算。
countByValue()当在类型为 K 的元素的 DStream 上调用时,返回一个 (K, Long) 对的新 DStream,其中每个键的值是它在源 DStream 的每个 RDD 中的频率。
reduceByKey(func, [numTasks])当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对 DStream,其中使用给定的 reduce 函数聚合每个键的值。注意:默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,在集群模式下,数量由 config 属性决定spark.default.parallelism)进行分组。您可以传递一个可选numTasks参数来设置不同数量的任务。
join(otherStream, [numTasks])当在 (K, V) 和 (K, W) 对的两个 DStream 上调用时,返回一个新的 (K, (V, W)) 对的 DStream,其中包含每个键的所有元素对。
cogroup(otherStream, [numTasks])当在 (K, V) 和 (K, W) 对的 DStream 上调用时,返回 (K, Seq[V], Seq[W]) 元组的新 DStream。
transform(func)通过将 RDD-to-RDD 函数应用于源 DStream 的每个 RDD,返回一个新的 DStream。这可用于在 DStream 上执行任意 RDD 操作。
updateStateByKey(func)返回一个新的“状态”DStream,其中每个键的状态通过将给定的函数应用于键的先前状态和键的新值来更新。这可用于维护每个键的任意状态数据。

4.1 UpdateStateByKey 操作

updateStateByKey操作允许保持任意状态,同时使用新信息不断更新它。要使用它,必须执行两个步骤。

  1. 定义状态 - 状态可以是任意数据类型
  2. 定义状态更新函数 - 使用函数指定如何使用先前状态和输入流中的新值更新状态。

在每个批次中,Spark会对所有的Key-Value都会应用状态更新函数,无论该批次中是否有新数据。如果针对某对Key-value状态更新函数返回None,该K-V将会被淘汰。

让我们用一个例子来说明这一点。假设要维护在文本数据流中看到的每个单词的运行计数。这里运行计数是状态,它是一个整数。我们将更新函数定义为:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}
  • 1
  • 2
  • 3
  • 4

这适用于包含单词的 DStream(例如,在前面的示例中pairs包含(word, 1)对的DStream )。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
  • 1

将为每个单词调用更新函数,其中newValues包含一系列 1(来自(word, 1)对)和runningCount具有先前计数的 1。

请注意,使用updateStateByKey需要配置检查点目录,这将在检查点部分详细讨论。

4.2 Transform 操作

transform操作(及其类似的变体transformWith)允许将任意 RDD-to-RDD 函数应用于 DStream。它可用于应用任何未在 DStream API 中公开的 RDD 操作。例如,将数据流中的每个批次与另一个数据集连接的功能并未直接在 DStream API 中公开。但是,可以轻松地使用它transform来执行此操作。这实现了非常强大的可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可能由 Spark 生成)连接,然后根据它进行过滤来进行实时数据清理。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

每个批次间隔都会调用该函数。这里允许做一些操作,比如在批处理中,操作RDD、修改分区数、使用广播变量等。

4.3 Window 操作

Spark Streaming 还提供窗口计算,允许在数据的滑动窗口上应用转换。下图展示了滑动窗口。
在这里插入图片描述
如图所示,每次窗口滑过一个源 DStream 时,落入窗口内的源 RDD 被组合并操作以产生窗口化 DStream 的 RDD。在这种特定情况下,该操作应用于最后 3 个时间单位的数据,并滑动 2 个时间单位。这说明任何窗口操作都需要指定两个参数。

  • windowLength 窗口长度,窗口的持续时间(途中为3)
  • slideInterval 滑动间隔,执行窗口操作的间隔(图中为 2)

这两个参数必须是源DStream的batch间隔的倍数(图中为1)。

用一个例子来说明窗口操作。假设想通过每 10 秒生成过去 30 秒数据的字数来扩展 前面的示例。为此,必须在过去 30 秒的数据reduceByKey对的pairsDStream 上应用操作(word, 1)。使用reduceByKeyAndWindow操作实现。

// 每10秒聚合过去30秒的数据
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
  • 1
  • 2

一些常见的窗口操作如下。所有这些操作都采用上述两个参数 windowLength和slideInterval。

转换算子作用
window(windowLength, slideInterval)返回一个新的 DStream,它是根据源 DStream 的窗口批次计算的。
countByWindow(windowLength, slideInterval)每隔一段时间(滑动间隔),返回指定窗口间隔(窗口长度)中元素的个数
reduceByWindow(func, windowLength, slideInterval)返回一个新的单元素流,它是通过使用func在滑动间隔内聚合流中的元素而创建的。该函数应该是关联的和可交换的,以便它可以被正确地并行计算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对 DStream,其中每个键的值使用给定的 reduce 函数func 在滑动窗口中按批次聚合。注意:默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,在集群模式下,数量由 config 属性决定spark.default.parallelism)进行分组。可以传递一个可选 numTasks参数来设置不同数量的任务。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])是上面reduceByKeyAndWindow的更高效的版本。不会每次重新计算每个窗口的数据。而是使用上一个窗口的数据减去离开窗口的数据,加上新进入窗口的数据计算的。但是,它仅适用于“可逆减函数(invertible reduce functions)”,即具有相应“反减”功能的减函数(作为参数invFunc)。 像reduceByKeyAndWindow一样,通过可选参数可以配置reduce任务的数量。请注意,必须启用检查点才能使用此操作。
countByValueAndWindow(windowLength, slideInterval, [numTasks])当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, Long) 对 DStream,其中每个键的值是其在滑动窗口内的频率。与reduceByKeyAndWindow一样,reduce 任务的数量可通过可选参数进行配置。

4.4 join操作

4.4.1 流和流join

流和流的join很简单

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
  • 1
  • 2
  • 3

在这里,在每个批次间隔中,由stream1生成的 RDD将与由stream2生成的 RDD 连接。也可以做leftOuterJoin, rightOuterJoin, fullOuterJoin。此外,在流的窗口上进行连接通常非常有用。这也很容易。

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
  • 1
  • 2
  • 3
4.4.2 流和数据集join

前面transform操作已经有过案例,这里是另一个案例

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
  • 1
  • 2
  • 3

事实上,还可以动态更改要加入的数据集。提供给的函数在transform每个批次间隔进行评估,因此将使用dataset引用指向的当前数据集。

API 文档中提供了 DStream 转换的完整列表。对于 Scala API,参考DStreamPairDStreamFunctions。有关 Java API,参考JavaDStreamJavaPairDStream。有关 Python API,请参阅DStream

5 DStream的输出操作

输出操作允许将 DStream 的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统使用转换后的数据,因此它们会触发所有 DStream 转换的实际执行(类似于 RDD 的操作)。目前,定义了以下输出操作:

输出操作作用
print()在运行流应用程序的驱动程序节点上的 DStream 中打印每批数据的前十个元素。这对于开发和调试很有用。
saveAsTextFiles(prefix, [suffix])将此 DStream 的内容保存为文本文件。每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix-TIME_IN_MS[.suffix]”。
saveAsObjectFiles ( prefix , [ suffix ])将此 DStream 的内容保存为SequenceFiles序列化的 Java 对象。每个批处理间隔的文件名是根据前缀和 后缀生成的:“prefix-TIME_IN_MS[.suffix]”。
saveAsHadoopFiles ( prefix , [ suffix ])将此 DStream 的内容保存为 Hadoop 文件。每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix-TIME_IN_MS[.suffix]”。
foreachRDD(func)将函数func应用于从流生成的每个 RDD的最通用的输出运算符。这个函数应该将每个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件,或者通过网络将其写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中包含 RDD 操作,这些操作将强制计算流 RDD。
#### 5.1 使用 foreachRDD 的设计模式
为避免为每条记录创建和销毁连接对象,建议使用foreachPartition来创建连接。使得每个分区的数据使用一个连接。可以类似这样
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

可以通过跨多个 RDD/batch 重用连接对象来进一步优化。可以维护一个静态的连接对象池,当多个批次的 RDD 被推送到外部系统时可以重用,从而进一步减少开销。类似这样

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

请注意,池中的连接应该按需延迟创建,如果一段时间不使用则超时。这样可以最有效地将数据发送到外部系统。

其他要点

  1. DStreams 被输出操作延迟执行,就像 RDDs 被 RDD 操作延迟执行一样。具体来说,DStream 输出操作中的 RDD 操作会强制处理接收到的数据。因此,如果您的应用程序没有任何输出操作,或者其中dstream.foreachRDD()没有任何 RDD操作等输出操作,那么将不会执行任何操作。系统将简单地接收数据并丢弃它。
  2. 默认情况下,输出操作一次执行一个。它们按照在应用程序中定义的顺序执行。
5.2 DataFrame和SQL操作

可以轻松地对流数据使用DataFrames 和 SQL操作。必须使用 StreamingContext 正在使用的 SparkContext 创建一个 SparkSession。此外,必须这样做才能在驱动程序故障时重新启动。这是通过创建一个延迟实例化的 SparkSession 单例实例来完成的。这在以下示例中显示。它修改了前面的wordcount示例以使用 DataFrames 和 SQL 生成字数。每个 RDD 被转换为一个 DataFrame,注册为临时表,然后使用 SQL 进行查询。

/** 在流式处理中使用DataFrame和SQL */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // 获取单例的SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // 将String类型RDD转换为DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // 创建一个临时视图 
  wordsDataFrame.createOrReplaceTempView("words")

  // 使用SQL做wordcount操作,然后打印结果
  val wordCountsDataFrame = 
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

完整源码

还可以对在来自不同线程的流数据上定义的表运行 SQL 查询(即与正在运行的 StreamingContext 异步)。只需确保将 StreamingContext 设置为记住足够数量的流数据,以便查询可以运行。否则,不知道任何异步 SQL 查询的 StreamingContext 将在查询完成之前删除旧的流数据。例如,如果您想查询最后一批,但您的查询可能需要 5 分钟才能运行,然后调用streamingContext.remember(Minutes(5))(在 Scala 中,或其他语言中的等效项)。

可以参考三、Spark SQL、DataFrame、Dataset

6 缓存/持久化

与 RDD 类似,DStreams 也允许开发人员将流的数据保存在内存中。也就是说,persist()在 DStream 上使用该方法将自动将该 DStream 的每个 RDD 持久化在内存中。如果 DStream 中的数据将被多次计算(例如,对同一数据进行多次操作),这将非常有用。对于像reduceByWindow和这样的基于窗口的操作和 像 那样reduceByKeyAndWindow的基于状态的操作updateStateByKey,这是隐含的。因此,由基于窗口的操作生成的 DStream 会自动保存在内存中,无需开发人员调用persist().

对于通过网络接收数据的输入流(如 Kafka、Flume、sockets 等),默认的持久化级别设置为将数据复制到两个节点以实现容错。

与 RDD 不同,DStreams 的默认持久化级别将数据序列化在内存中。

7 检查点

流式应用程序必须 24/7 全天候运行,因此必须能够应对与应用程序逻辑无关的故障(例如,系统故障、JVM 崩溃等)。为了使这成为可能,Spark Streaming 需要将足够的信息检查点到容错存储系统,以便它可以从故障中恢复。检查点有两种类型的数据。

  • 元数据检查点- 将定义流计算的信息保存到容错存储,如 HDFS。这用于从运行流应用程序驱动程序的节点故障中恢复(稍后详细讨论)。元数据包括:
    • 配置- 用于创建流应用程序的配置。
    • DStream 操作- 定义流应用程序的一组 DStream 操作。
    • 不完整的批次- 作业已排队但尚未完成的批次。、
  • 数据检查点- 将生成的 RDD 保存到可靠的存储中。这在一些跨多个批次组合数据的有状态转换中是必要的。在这样的转换中,生成的 RDD 依赖于之前批次的 RDD,这导致依赖链的长度随时间不断增加。为了避免恢复时间的这种无限增加(与依赖链成比例),有状态转换的中间 RDD 会定期通过检查点持久化到可靠存储(例如 HDFS)以切断依赖链。

总而言之,元数据检查点主要用于从驱动程序故障中恢复,而如果使用有状态转换,即使对于基本功能,数据或 RDD 检查点也是必要的。

7.1 什么时候使用checkpoint

必须为具有以下任何要求的应用程序启用检查点:

  • 有状态转换的使用- 如果应用程序中使用了updateStateByKey或reduceByKeyAndWindow(具有反函数),则必须提供检查点目录以允许定期 RDD 检查点。
  • 从运行应用程序的驱动程序的故障中恢复- 元数据检查点用于恢复进度信息。

请注意,没有上述状态转换的简单流应用程序可以在不启用检查点的情况下运行。在这种情况下,驱动程序故障的恢复也将是部分的(一些已接收但未处理的数据可能会丢失)。这通常是可以接受的,许多人以这种方式运行 Spark Streaming 应用程序。对非 Hadoop 环境的支持有望在未来得到改善。

7.2 如何配置checkpoint

可以通过在容错、可靠的文件系统(例如 HDFS、S3 等)中设置一个目录来启用检查点,检查点信息将保存到该目录中。这是通过使用streamingContext.checkpoint(checkpointDirectory). 这将允许您使用上述有状态转换。此外,如果想让应用程序从驱动程序故障中恢复,应该重写流应用程序以具有以下行为。

  • 当程序第一次启动时,它会创建一个新的 StreamingContext,设置所有的流,然后调用 start()。
  • 当程序在失败后重新启动时,它会从检查点目录中的检查点数据重新创建一个 StreamingContext。

使用StreamingContext.getOrCreate可以更简单一些

// 创建新StreamingContext实例的方法
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// 从checkpoint获取StreamingContext或者创建一个新的StreamingContext
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// 配置需要配置的项
// 不论第一次启动还是重启
context. ...

// 启动context
context.start()
context.awaitTermination()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

如果checkpointDirectory存在,则将从检查点数据重新创建上下文。如果该目录不存在(即第一次运行),则将functionToCreateContext调用该函数以创建新的上下文并设置 DStream。参考 Scala 示例 RecoverableNetworkWordCount。此示例将网络数据的字数附加到文件中。

除了使用getOrCreate一个还需要确保驱动程序进程在失败时自动重新启动。这只能由运行应用程序的部署基础结构来完成。具体参考部署部分。

请注意,RDD 的检查点会产生保存到可靠存储的成本。这可能会导致 有检查点的RDD 的那些批次的处理时间增加。因此,需要仔细设置检查点的间隔。在小批量(比如 1 秒)下,每批检查点可能会显着降低操作吞吐量。相反,检查点太少会导致谱系和任务大小增加,这可能会产生不利影响。对于需要 RDD 检查点的有状态转换,默认间隔是批处理间隔的倍数,至少为 10 秒。可以使用dstream.checkpoint(checkpointInterval)进行设置 。通常,DStream 的 5 - 10 个滑动间隔的检查点间隔是一个很好的尝试设置。

8 累加器、广播变量和检查点

无法从 Spark Streaming 中的检查点恢复累加器和广播变量。如果启用检查点并使用 累加器或广播变量 ,则必须为累加器和广播变量创建延迟实例化的单例实例, 以便在驱动程序出现故障后重新启动后重新实例化它们。以下示例中展示了这一点。

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // 获取或注册一个黑名单广播变量
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // 获取或注册一个移除单词个数的累加器
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

查看完整源码

9 部署Spark Streaming程序

部署Spark Streaming程序的步骤

9.1 部署应用程序的要求

要运行 Spark Streaming 应用程序,需要具备以下条件。

  • 必须要有集群管理器的集群。这是Spark应用程序的一般要求,不论批还是流。
  • 打包应用程序 JAR - 必须将流式应用程序编译为 JAR。如果您使用spark-submit启动应用程序,那么您将不需要在 JAR 中提供 Spark 和 Spark Streaming。但是,如果应用程序使用高级源(例如 Kafka、Flume),那么必须将它们链接到的额外工件及其依赖项打包在用于部署应用程序的 JAR 中。例如,使用的应用程序KafkaUtils 必须spark-streaming-kafka-0-10_2.11在应用程序 JAR 中包含其所有可传递的依赖项。
  • 为执行器配置足够的内存- 由于接收到的数据必须存储在内存中,执行器必须配置有足够的内存来保存接收到的数据。请注意,如果您正在进行 10 分钟的窗口操作,则系统必须在内存中保留至少最近 10 分钟的数据。因此,应用程序的内存要求取决于其中使用的操作。
  • 配置应用程序驱动程序的自动重启——为了从驱动程序故障中自动恢复,用于运行流应用程序的部署基础设施必须监控驱动程序进程并在它失败时重新启动驱动程序。大部分使用YARN。少部分使用K8S
  • 预写日志。预写日志实现强大的容错保证。如果启用,从接收器接收的所有数据都会写入配置检查点目录中的预写日志。这可以防止驱动程序恢复时丢失数据,从而确保零数据丢失(在容错语义部分详细讨论 )。这可以通过设置参数来启用配置spark.streaming.receiver.writeAheadLog.enable=true。然而,这些更强的语义可能以单个接收器的接收吞吐量为代价。这可以通过并行运行更多接收器来纠正 以提高总吞吐量。此外,建议在启用预写日志时禁用 Spark 中接收数据的复制,因为日志已存储在复制的存储系统中。这可以通过将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER来完成。在使用 S3(或任何不支持刷新的文件系统)进行预写日志时,请记住启用 spark.streaming.driver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有关更多详细信息,请参阅 Spark 流配置。请注意,当启用 I/O 加密时,Spark 不会加密写入预写日志的数据。如果需要对预写日志数据进行加密,则应将其存储在本机支持加密的文件系统中。
  • 设置最大接收速率- 如果集群资源不足以让流应用程序以接收数据的速度处理数据,则可以通过设置以记录/秒为单位的最大速率限制来限制接收器的速率。请参阅接收器和 直接 Kafka 方法的配置参数 。在 Spark 1.5 中,引入了一个称为背压的功能,无需设置此速率限制,因为 Spark Streaming 会自动计算出速率限制并在处理条件发生变化时动态调整它们。这个背压可以通过设置来启用配置参数spark.streaming.backpressure.enabled=true来开启。

9.2 升级应用程序代码

如果需要使用新的应用程序代码升级正在运行的 Spark Streaming 应用程序,则有两种可能的机制。

  • 升级后的 Spark Streaming 应用程序启动并与现有应用程序并行运行。一旦新的(接收到与旧的相同的数据)预热并准备好迎接黄金时段,就可以关闭旧的。请注意,这可以用于支持将数据发送到两个目的地的数据源(即,较早的和升级的应用程序)。
  • 现有应用程序正常关闭(请参阅 StreamingContext.stop(…) 或JavaStreamingContext.stop(…) 用于正常关闭选项),确保在关闭之前完全处理已接收的数据。然后可以启动升级的应用程序,它将从较早的应用程序停止的同一点开始处理。请注意,这只能使用支持源端缓冲的输入源(如 Kafka 和 Flume)来完成,因为数据需要在前一个应用程序关闭且升级的应用程序尚未启动时进行缓冲。并且无法从升级前代码的较早检查点信息重新启动。检查点信息本质上包含序列化的 Scala/Java/Python 对象,尝试使用新的、修改过的类反序列化对象可能会导致错误。在这种情况下,要么使用不同的检查点目录启动升级后的应用程序,要么删除之前的检查点目录。

9.3 监控应用程序

除了 Spark 的监控功能之外,还有其他特定于 Spark Streaming 的功能。使用 StreamingContext 时, Spark Web UI 会显示一个附加Streaming选项卡,其中显示有关正在运行的接收器(接收器是否处于活动状态、接收到的记录数、接收器错误等)和已完成批次(批处理时间、排队延迟等)的统计信息。 )。这可用于监视流应用程序的进度。

Web UI 中的以下两个指标特别重要:

  • 处理时间- 处理每批数据的时间。
  • 调度延迟- 一个批次在队列中等待前一批处理完成的时间。

如果批处理时间始终大于批处理间隔和/或排队延迟不断增加,则表明系统无法像生成批处理一样快地处理批处理,并且正在落后。在这种情况下,请考虑 减少批处理时间。

也可以使用StreamingListener接口监控 Spark Streaming 程序的进度,该接口允许获取接收器状态和处理时间。请注意,这是一个开发人员 API,将来可能会对其进行改进(即报告更多信息)。

10 性能调优

从集群上的 Spark Streaming 应用程序中获得最佳性能需要进行一些调整。本节介绍了许多可以调整以提高应用程序性能的参数和配置。在高层次上,需要考虑两件事:

  1. 有效利用集群资源,减少每批数据的处理时间。
  2. 设置正确的批次大小,以便数据批次可以在接收时尽快处理(即数据处理跟上数据摄取)。

10.1 减少批处理时间

可以在 Spark 中进行许多优化以最小化每个批次的处理时间。这里可以参考通用-性能调优。

10.1.1 数据接收的并行度

通过网络接收数据(如 Kafka、Flume、socket 等)需要将数据反序列化并存储在 Spark 中。如果数据接收成为系统的瓶颈,则考虑并行化数据接收。请注意,每个输入 DStream 都会创建一个接收器(在工作机器上运行),用于接收单个数据流。因此,可以通过创建多个输入 DStream 并将它们配置为从源接收数据流的不同分区来实现接收多个数据流。例如,接收两个主题数据的单个 Kafka 输入 DStream 可以拆分为两个 Kafka 输入流,每个输入流仅接收一个主题。这将运行两个接收器,允许并行接收数据,从而提高整体吞吐量。这些多个 DStream 可以结合在一起以创建单个 DStream。然后可以将应用于单个输入 DStream 的转换应用于统一流。案例:

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
  • 1
  • 2
  • 3
  • 4

另一个应该考虑的参数是接收方的块间隔,由配置参数 spark.streaming.blockInterval决定. 对于大多数接收器,接收到的数据在存储在 Spark 的内存中之前被合并成数据块。每批中的块数决定了将用于在类似地图的转换中处理接收到的数据的任务数。每批每个接收器的任务数约为(批间隔/块间隔)。例如,200 ms 的块间隔将每 2 秒创建 10 个任务批次。如果任务数量太少(即小于每台机器的核心数量),那么效率会很低,因为所有可用的核心都不会用于处理数据。要增加给定批处理间隔的任务数,请减少块间隔。但是推荐的block interval最小值为50ms左右,低于这个值可能会导致任务启动开销问题。

10.1.2 数据处理中的并行级别

如果在计算的任何阶段使用的并行任务数量不够多,则集群资源可能未得到充分利用。对于像reduceByKey和reduceByKeyAndWindow这样的分布式reduce操作,并行任务的默认数量由spark.default.parallelism 配置属性控制。以将并行级别作为参数传递(请参阅 PairDStreamFunctions 文档),或设置spark.default.parallelism 配置属性以更改默认值。

10.4 数据序列化

通过调整序列化格式可以减少数据序列化的开销。在流的情况下,有两种类型的数据正在被序列化。

  • 输入数据:默认情况下,通过接收器接收的输入数据存储在执行器的内存中,存储级别为 StorageLevel.MEMORY_AND_DISK_SER_2。也就是说,数据被序列化为字节以减少 GC 开销,并复制以容忍执行程序故障。此外,数据首先保存在内存中,只有在内存不足以容纳流式计算所需的所有输入数据时才会溢出到磁盘。这种序列化显然有开销——接收方必须反序列化接收到的数据并使用 Spark 的序列化格式重新序列化它。
  • 流操作生成的持久化 RDD:流计算生成的 RDD 可以持久化在内存中。例如,窗口操作将数据保存在内存中,因为它们将被多次处理。但是,与 Spark Core 默认的StorageLevel.MEMORY_ONLY 不同,由流计算生成的持久化 RDD默认使用StorageLevel.MEMORY_ONLY_SER(即序列化)持久化,以最小化 GC 开销。

在这两种情况下,使用 Kryo 序列化都可以减少 CPU 和内存开销。有关更多详细信息,可以参考Spark 调优指南。对于 Kryo,应该注册自定义类,并禁用对象引用跟踪(请参阅配置指南中的 Kryo 相关配置)。

在流应用程序需要保留的数据量不大的特定情况下,将数据(两种类型)作为反序列化对象持久化而不产生过多的 GC 开销可能是可行的。例如,如果您使用几秒钟的批处理间隔并且没有窗口操作,那么您可以尝试通过相应地显式设置存储级别来禁用持久数据中的序列化。这将减少由于序列化而导致的 CPU 开销,从而潜在地提高性能而不会产生过多的 GC 开销。

10.5 任务启动开销

如果每秒启动的任务数量很高(例如,每秒 50 个或更多),那么向从站发送任务的开销可能很大,并且很难实现亚秒级延迟。此时可以减少任务数量,也就是减少并行度。
这样可能会将处理时间减少一些,从而使亚秒级的批处理大小可行。

10.6 设置正确的批处理间隔

了使在集群上运行的 Spark Streaming 应用程序稳定,系统应该能够在接收数据时尽快处理数据。换句话说,批量数据的处理速度应与生成它们的速度一样快。可以通过在流式 Web UI 中监视处理时间来确定应用程序是否如此 ,其中批处理时间应小于批处理间隔。

根据流计算的性质,所使用的批处理间隔可能会对应用程序在一组固定集群资源上维持的数据速率产生重大影响。例如,让我们考虑较早的 WordCountNetwork 示例。对于特定的数据速率,系统可能能够每 2 秒(即 2 秒的批处理间隔)跟上报告字数,但不是每 500 毫秒。因此,需要设置批处理间隔,以便能够维持生产中的预期数据速率。

确定应用程序正确批处理大小的一个好方法是使用保守的批处理间隔(例如 5-10 秒)和低数据速率对其进行测试。要验证系统是否能够跟上数据速率,您可以检查每个处理批次所经历的端到端延迟的值(在 Spark 驱动程序 log4j 日志中查找“总延迟”,或使用 流媒体监听器 界面)。如果延迟保持与批量大小相当,则系统是稳定的。否则,如果延迟不断增加,则表示系统跟不上,因此不稳定。一旦您有了稳定配置的想法,您就可以尝试增加数据速率和/或减少批量大小。请注意,由于临时数据速率增加而导致的瞬时延迟增加可能没问题,只要延迟减少回较低的值(即小于批量大小)。

10.7 内存调优

Spark Streaming 应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,如果您想对最近 10 分钟的数据使用窗口操作,那么您的集群应该有足够的内存来在内存中保存 10 分钟的数据。或者如果要使用updateStateByKey大量键,那么必要的内存会很高。反之,如果要做简单的map-filter-store操作,那么所需的内存就会很低。

一般情况下,由于通过接收器接收到的数据存储在 StorageLevel.MEMORY_AND_DISK_SER_2 中,因此无法放入内存的数据会溢出到磁盘。这可能会降低流式应用程序的性能,因此建议您根据流式应用程序的需要提供足够的内存。最好尝试在小范围内查看内存使用情况并进行相应估计。

内存调优的另一个方面是垃圾收集。对于需要低延迟的流式应用来说,JVM 垃圾回收导致的??大停顿是不可取的。

有几个参数可以帮助调整内存使用和 GC 开销:

  • DStreams 的持久化级别:如前面数据序列化部分所述,默认情况下输入数据和 RDD 作为序列化字节持久化。与反序列化持久化相比,这减少了内存使用和 GC 开销。启用 Kryo 序列化可进一步减少序列化大小和内存使用量。可以通过压缩(请参阅 Spark 配置spark.rdd.compress)以 CPU 时间为代价进一步减少内存使用。

  • 清除旧数据:默认情况下,所有输入数据和 DStream 转换生成的持久化 RDD 都会自动清除。Spark Streaming 根据使用的转换决定何时清除数据。例如,如果您使用 10 分钟的窗口操作,那么 Spark Streaming 将保留最近 10 分钟的数据,并主动丢弃旧数据。通过设置 ,可以将数据保留更长的时间(例如交互式查询旧数据)streamingContext.remember。

  • CMS 垃圾收集器:强烈建议使用并发标记和清除 GC 以保持与 GC 相关的暂停始终较低。尽管已知并发 GC 会降低系统的整体处理吞吐量,但仍建议使用它来实现更一致的批处理时间。确保在驱动程序(使用–driver-java-optionsin spark-submit)和执行程序(使用Spark 配置 spark.executor.extraJavaOptions)上都设置了 CMS GC 。

  • 其他提示:为了进一步减少 GC 开销,这里有一些更多的提示可以尝试。

    • 使用OFF_HEAP存储级别持久化 RDD 。在Spark 编程指南 中查看更多详细信息。
    • 使用更多具有较小堆大小的执行程序。这将降低每个 JVM 堆内的 GC 压力。

内存相关的要点:

  • DStream 与单个接收器相关联。为了实现读取并行性,需要创建多个接收器,即多个 DStream。接收器在执行器中运行。它占据一个核心。确保在预订接收器插槽后有足够的内核进行处理,即spark.cores.max应考虑接收器插槽。接收者以循环方式分配给执行者。
  • 当从流源接收数据时,接收器创建数据块。每 blockInterval 毫秒生成一个新的数据块。在batchInterval 期间创建了N 个数据块,其中N = batchInterval/blockInterval。这些块由当前执行器的 BlockManager 分发给其他执行器的块管理器。之后,在驱动程序上运行的网络输入跟踪器会被告知块位置以供进一步处理。
  • 在驱动程序上为 batchInterval 期间创建的块创建一个 RDD。batchInterval 期间生成的块是 RDD 的分区。每个分区都是 spark 中的一个任务。blockInterval== batchinterval 意味着创建了一个分区,并且可能在本地进行处理。
  • 块上的映射任务在具有块的执行器(一个接收块,另一个块被复制)中处理,无论块间隔如何,除非非本地调度开始。具有更大的块间隔意味着更大的块。较高的值会spark.locality.wait增加在本地节点上处理块的机会。需要在这两个参数之间找到平衡,以确保在本地处理较大的块。
  • 可以通过调用inputDstream.repartition(n)来定义分区数,而不是依赖于 batchInterval 和 blockInterval 。这会随机重新排列 RDD 中的数据以创建 n 个分区。是的,为了更大的并行性。虽然是以洗牌为代价的。RDD 的处理由驱动程序的作业调度程序作为作业进行调度。在给定的时间点,只有一项作业处于活动状态。因此,如果一个作业正在执行,其他作业将排队。
  • 如果您有两个 dstream,则会形成两个 RDD,并且会创建两个作业,这些作业将一个接一个地安排。为避免这种情况,您可以合并两个 dstream。这将确保为 dstream 的两个 RDD 形成单个 unionRDD。这个 unionRDD 然后被认为是一个单一的工作。但是,RDD 的分区不受影响。
  • 如果批处理时间超过batchinterval,那么显然接收器的内存将开始填满并最终抛出异常(很可能是BlockNotFoundException)。目前,没有办法暂停接收器。使用 SparkConf 配置spark.streaming.receiver.maxRate,可以限制接收器的速率。

11 容错语义

Spark Streaming 应用程序在发生故障时的行为

11.1 背景

为了理解 Spark Streaming 提供的语义,让我们记住 Spark 的 RDD 的基本容错语义。

  1. RDD 是不可变的、确定性可重新计算的分布式数据集。每个 RDD 都会记住用于创建容错输入数据集的确定性操作的沿袭。
  2. 如果 RDD 的任何分区由于工作节点故障而丢失,则可以使用操作沿袭从原始容错数据集重新计算该分区。
  3. 假设所有 RDD 转换都是确定性的,最终转换后的 RDD 中的数据将始终相同,而不管 Spark 集群中的故障如何。

Spark 对 HDFS 或 S3 等容错文件系统中的数据进行操作。因此,从容错数据生成的所有 RDD 也是容错的。但是,Spark Streaming 的情况并非如此,因为在大多数情况下,数据是通过网络接收的(使用时除外 fileStream)。为了为所有生成的 RDD 实现相同的容错特性,接收到的数据在集群中工作节点的多个 Spark 执行器之间复制(默认复制因子为 2)。这导致系统中有两种数据需要在发生故障时恢复:

  1. 接收和复制的数据 - 该数据在单个工作节点发生故障后仍然存在,因为它的副本存在于其他节点之一上。
  2. 已接收为了复制而缓存的数据- 由于这还未复制成功,因此恢复此数据的唯一方法是从源再次获取它。

此外,还有两种故障值得我们关注:

  1. 工作节点故障- 任何运行执行程序的工作节点都可能发生故障,并且这些节点上的所有内存数据都将丢失。如果任何接收器在故障节点上运行,则它们的缓冲数据将丢失。
  2. 驱动程序节点故障- 如果运行 Spark Streaming 应用程序的驱动程序节点出现故障,则显然 SparkContext 丢失,所有执行器及其内存数据都将丢失。

有了这些基础知识,让我们来了解一下 Spark Streaming 的容错语义。

11.2 定义

流系统的语义通常根据系统可以处理每条记录的次数来捕获。系统可以在所有可能的操作条件下(尽管出现故障等)提供三种类型的保证

  1. 最多一次:每条记录将被处理一次或根本不处理。
  2. 至少一次:每条记录将被处理一次或多次。这比最多一次强,因为它确保不会丢失任何数据。但可能有重复。
  3. Exactly once:每条记录只会被处理一次 - 不会丢失任何数据,也不会多次处理数据。这显然是三者中最强的保证。

11.3 基本语义

在任何流处理系统中,从广义上讲,处理数据都分为三个步骤。

  1. 接收数据:使用接收器或其他方式从源接收数据。
  2. 转换数据:接收到的数据使用 DStream 和 RDD 转换进行转换。
  3. 推送数据:最终转换后的数据被推送到外部系统,如文件系统、数据库、仪表板等。

如果流应用程序必须实现端到端的恰好一次保证,那么每个步骤都必须提供一个恰好一次的保证。也就是说,每条记录必须只接收一次,只转换一次,并且只推送到下游系统一次。让我们在 Spark Streaming 的上下文中理解这些步骤的语义。

  1. 接收数据:不同的输入源提供不同的保证。这将在下一小节详细讨论。
  2. 转换数据:由于 RDD 提供的保证,所有接收到的数据都将被处理一次。即使出现故障,只要接收到的输入数据是可访问的,最终转换后的 RDD 将始终具有相同的内容。
  3. 推送数据:输出操作默认确保至少一次语义,因为它取决于输出操作的类型(幂等)和下游系统的语义(是否支持事务)。但是用户可以实现自己的事务机制来实现一次性语义。这将在本节后面更详细地讨论。

11.4 接收数据的语义

不同的输入源提供不同的保证,范围从至少一次到恰好一次。阅读更多详情。

11.4.1 文件

如果所有输入数据都已经存在于像 HDFS 这样的容错文件系统中,Spark Streaming 总是可以从任何故障中恢复并处理所有数据。这给出 了一次性语义,这意味着无论什么失败,所有数据都将被处理一次。

11.4.2 基于Receiver的数据源

对于基于接收器的输入源,容错语义取决于故障场景和接收器的类型。正如我们之前讨论的,有两种类型的接收器:

  1. 可靠接收器- 这些接收器仅在确保接收到的数据已被复制后才确认可靠来源。如果这样的接收器失败,源将不会收到对缓冲(未复制)数据的确认。因此,如果接收器重新启动,源将重新发送数据,不会因为失败而丢失数据。
  2. 不可靠的接收器- 此类接收器不发送确认,因此在由于工作程序或驱动程序故障而失败时可能会丢失数据。

根据使用的接收器类型,我们实现了以下语义。如果工作节点发生故障,那么可靠的接收器不会丢失数据。对于不可靠的接收器,接收到但未复制的数据可能会丢失。如果驱动节点发生故障,那么除了这些损失之外,所有过去在内存中接收和复制的数据都将丢失。这将影响有状态转换的结果。

为了避免丢失过去接收到的数据,Spark 1.2 引入了预写日志,将接收到的数据保存到容错存储中。通过启用预写日志和可靠的接收器,数据丢失为零。在语义方面,它提供了至少一次的保证。

带有预写日志的 Spark 1.2 或更高版本,Worker节点可以支持可靠接收器的零数据丢失
实现至少一次语义,Driver支持可靠的接收器和文件实现零数据丢失的至少一次语义

11.4.3 Kafka Direct API

在 Spark 1.3 中,我们引入了一个新的 Kafka Direct API,它可以确保 Spark Streaming 只接收一次所有 Kafka 数据。除此之外,如果您实现恰好一次输出操作,则可以实现端到端的恰好一次保证。有关更多详细信息,参考Kafka集成指南

11.5 输出操作的语义

输出操作(如foreachRDD)具有至少一次语义,也就是说,如果发生工作故障,转换后的数据可能会多次写入外部实体。虽然这对于使用saveAs***Files操作保存到文件系统是可以接受的 (因为文件只会被相同的数据覆盖),但可能需要额外的努力来实现精确一次语义。有两种方法。

  • 幂等更新:多次尝试总是写入相同的数据。例如,saveAs***Files始终将相同的数据写入生成的文件。
  • 事务性更新:所有更新都是以事务性方式进行的,因此更新只以原子方式进行一次。执行此操作的一种方法如下。
    • 使用批处理时间(在 中可用foreachRDD)和 RDD 的分区索引来创建标识符。此标识符唯一标识流应用程序中的 blob 数据。
    • 使用标识符以事务方式(即,以原子方式)使用此 blob 更新外部系统。也就是说,如果标识符尚未提交,则以原子方式提交分区数据和标识符。否则,如果这已经提交,请跳过更新。
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // 使用这个uniqueId唯一id事务性的提交整个分区的数据
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

五、Structured Streaming

1 概述

Structured Streaming 是一种基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎。您可以像在静态数据上表达批处理计算一样表达流式计算。Spark SQL 引擎将负责以增量方式连续运行它,并随着流数据的不断到达更新最终结果。您可以使用Scala、Java、Python 或 R 中的Dataset/DataFrame API来表达流聚合、事件时间窗口、流到批处理连接等。计算在同一个优化的 Spark SQL 引擎上执行。最后,系统通过检查点和预写日志确保端到端的一次性容错保证。简而言之,Structured Streaming 提供快速、可扩展、容错、端到端的一次性流处理,用户无需对流进行推理。

在内部,默认情况下,Structured Streaming 查询使用微批处理引擎进行处理,该引擎将数据流处理为一系列小批量作业,从而实现低至 100 毫秒的端到端延迟和仅一次容错保证. 但是,从 Spark 2.3 开始,Spark引入了一种新的低延迟处理模式,称为连续处理,它可以在至少一次保证的情况下实现低至 1 毫秒的端到端延迟。在不更改查询中的 Dataset/DataFrame 操作的情况下,可以根据的应用程序要求选择模式。

这部分主要介绍编程模型和 API,主要使用默认的微批处理模型来解释这些概念。然后再讨论连续处理模型。首先,让我们从结构化流查询的简单示例WordCount开始。

2 快速示例

假设要维护从侦听 TCP 套接字的数据服务器接收的文本数据的运行字数。让我们看看如何使用结构化流来表达这一点。可以在Scala / Java / Python 中查看完整代码 。如果下载 Spark,则可以直接运行该示例。无论如何,让我们一步一步地浏览示例并了解它是如何工作的。首先,我们必须导入必要的类并创建一个本地 SparkSession,这是与 Spark 相关的所有功能的起点。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
  
import spark.implicits._
//创建一个流式 DataFrame,它表示从侦听 localhost:9999 的服务器接收的文本数据
//转换 DataFrame 以计算字数。
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// 将一行切割成单词
val words = lines.as[String].flatMap(_.split(" "))

// 统计word个数
val wordCounts = words.groupBy("value").count()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

这个lines DataFrame 表示一个包含流文本数据的无界表。该表包含一列名为“value”的字符串,流文本数据中的每一行都成为表中的一行。请注意,这当前没有接收任何数据,因为我们只是在设置转换,还没有启动它。接下来,我们使用 将 DataFrame 转换为 String 的 Dataset .as[String],以便我们可以应用flatMap操作将每一行拆分为多个单词。结果words数据集包含所有单词。最后,我们wordCounts通过按 Dataset 中的唯一值进行分组并对其进行计数来定义DataFrame。请注意,这是一个流数据帧,它表示流的运行字数。

我们现在已经设置了对流数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们将其设置outputMode(“complete”)为在每次更新时将完整的计数集(由 指定)打印到控制台。然后使用start().

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

执行此代码后,流式计算将在后台启动。该query对象是该活动流查询的句柄,使用 awaitTermination() 等待查询终止,以防止进程在查询处于活动状态时退出。

要实际执行此示例代码,可以在自己的Spark 应用程序中编译代码,也可以在 下载 Spark 后直接 运行该示例。这里展示的是后者。首先需要使用 Netcat(在大多数类 Unix 系统中的一个小实用程序,windows也有类似工具)作为数据服务器运行

$ nc -lk 9999
  • 1
# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

使用以下命令启动示例

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
  • 1
# TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

3 编程模型

3.1 基础概念

Structured Streaming 的关键思想是将实时数据流视为一个不断追加的表。这导致了一种与批处理模型非常相似的新流处理模型。将流计算表示为标准的类似批处理的查询,就像在静态表上一样,Spark在无界输入表上将其作为增量查询运行。让我们更详细地了解这个模型。

将输入数据流视为“输入表”。到达流的每个数据项就像是添加到输入表的新行。

在这里插入图片描述
基于输入数据的查询将生成“结果表”。每个触发间隔(例如,每 1 秒),新行都会附加到输入表,最终更新结果表。每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
在这里插入图片描述
“输出”定义为写入外部存储的内容。输出可以定义为不同的模式:

  1. 完整模式- 整个更新的结果表将写入外部存储。由存储连接器决定如何处理整个表的写入。
  2. 追加模式- 只有自上次触发后追加到结果表中的新行才会写入外部存储。这仅适用于预期结果表中的现有行不会更改的查询。
  3. 更新模式- 只有自上次触发后结果表中更新的行才会写入外部存储(自 Spark 2.1.1 起可用)。请注意,这与完整模式的不同之处在于,此模式仅输出自上次触发以来发生更改的行(既包括数据更新的行也包括新增的行)。如果查询不包含聚合,则相当于追加模式。这种适合写数据库

请注意,每种模式都适用于某些类型的查询。这将在后面详细讨论。

为了说明该模型的使用,让我们在上面的快速示例的上下文中理解该模型。第一个lines DataFrame是输入表,最后一个wordCounts DataFrame是结果表。需要注意的是在流媒体的查询lines数据帧生成wordCounts是完全一样的,因为它是一个静态的数据帧。然而,当这个查询开始时,Spark 会不断检查来自套接字连接的新数据。如果有新数据,Spark 将运行一个“增量”查询,将之前的运行计数与新数据结合起来计算更新的计数,如下所示。
在这里插入图片描述

请注意,Structured Streaming 不会具体化整个表。它从流数据源读取最新的可用数据,增量处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。

该模型与许多其他流处理引擎明显不同。很多要求用户自己维护正在运行的聚合,因此必须考虑容错和数据一致性(至少一次,或最多一次,或恰好一次)。在这个模型中,Spark 负责在有新数据时更新 Result Table,从而免除用户的推理。

3.2 处理时间事件和延迟数据

事件时间是嵌入数据本身的时间。对于许多应用程序,可能希望在此事件时间上进行操作。比如你想获取IoT设备每分钟产生的事件数,那么你可能想使用数据产生的时间(即数据中的event-time),而不是Spark接收他们的时间。这个事件时间在这个模型中非常自然地表达——来自设备的每个事件都是表中的一行,而事件时间是该行中的一个列值。这允许基于窗口的聚合(例如每分钟的事件数)只是事件时间列上的一种特殊类型的分组和聚合——每个时间窗口都是一个组,每一行都可以属于多个窗口/组。

此外,该模型自然会根据其事件时间处理比预期晚到达的数据。由于 Spark 正在更新结果表,因此它可以完全控制在有延迟数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。从 Spark 2.1 开始,我们支持水印,它允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。这些将在稍后的窗口操作部分中进行更详细的解释。

3.3 容错语义

提供端到端的恰好一次语义是结构化流设计背后的关键目标之一。为了实现这一点,我们设计了结构化流源、接收器和执行引擎来可靠地跟踪处理的确切进度,以便它可以通过重新启动和/或重新处理来处理任何类型的故障。假设每个流源都有偏移量(类似于 Kafka 偏移量或 Kinesis 序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移范围。流接收器被设计为幂等处理重复处理。结合使用可重放源和幂等接收器,在任何故障下结构化流可以确保端到端的恰好一次语义 。

4 使用DataFrame和DataSet的API

从 Spark 2.0 开始,DataFrames 和 Datasets 可以表示静态的、有界的数据,以及流式的、无界的数据。与静态数据集/数据帧类似,可以使用通用入口点SparkSession (Scala / Java / Python 文档)从流源创建流数据帧/数据集,并对它们应用与静态数据帧/数据集相同的操作。

4.1 创建Streaming DataFrame 和 Streaming DataSet

Streaming DataFrame可以通过SparkSession.readStream()返回的 DataStreamReader接口(Scala / Java / Python docs)创建

4.1.1 输入源

有一些内置源。

  1. File source,文件源。读取写入目录中的文件作为数据流。支持的文件格式为 text、csv、json、orc、parquet。请参阅 DataStreamReader 接口的文档以获取更新的列表以及每种文件格式的支持选项。请注意,文件必须原子地放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作来实现。
    支持容错,支持 glob 路径,但不支持多个逗号分隔的路径/glob。
  2. Kafka source,Kafka 源。从 Kafka 读取数据。它与 Kafka 代理版本 0.10.0 或更高版本兼容。
    支持容错
  3. Socket source,套接字源(用于测试) - 从套接字连接读取 UTF8 文本数据。侦听服务器套接字位于驱动程序中。请注意,这应该仅用于测试,因为这不提供端到端的容错保证。
    不支持容错
  4. Rate source,以每秒指定的行数生成数据,每个输出行包含一个timestamp和value。其中timestamp是Timestamp包含消息发送时间的类型,value是Long包含消息计数的类型,从0开始作为第一行。此源用于性能测试和压力测试。
    支持容错

某些源不具有容错能力,因为它们不能保证在发生故障后可以使用检查点偏移量重放数据。请参阅前面关于容错语义的部分。以下是 Spark 中所有源的详细信息。

4.1.1.1 文件源

参数

  • path: 输入目录的路径,所有文件格式通用。
  • maxFilesPerTrigger: 每次触发时要考虑的新文件的最大数量 (默认: 无最大值)
  • latestFirst: 是否先处理最新的新文件, 当文件积压较大时很有用 (默认: false)
  • fileNameOnly: 是否根据新文件检查新文件只有文件名而不是完整路径(默认值:false)。将此设置为 true,以下文件将被视为同一文件,因为它们的文件名“dataset.txt”是相同的:
    “file:///dataset.txt”
    “s3://a/ dataset.txt"
    “s3n://a/b/dataset.txt”
    “s3a://a/b/c/dataset.txt”

对于特定于文件格式的选项,DataStreamReader接口(Scala / Java / Python docs)。例如,对于parquet格式选项,请参阅DataStreamReader.parquet()。

此外,还有会影响某些文件格式的会话配置。有关更多详细信息,请参阅Spark SQL部分。例如,对于“parquet”,请参阅Parquet 配置部分。

读取文件源的案例

val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // 如果有流式数据返回true

socketDF.printSchema

// 从文件夹中自动读取所有csv文件
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // 指定csv文件的schema
  .csv("/path/to/directory")    // 等价于
  format("csv").load("/path/to/directory")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
4.1.1.2 Socket源
  • host: 要连接的主机,必须指定
  • port: 要连接的端口,必须指定
4.1.1.3 Kafka源

Structured Streaming集成Kafka

4.1.1.3.1 导入依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version> 2.4.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
4.1.1.3.2 从Kafka读数据

流式读取

// 从一个topic读取
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 从多个topic读取
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 使用模式匹配读取
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

批量读取

如果使用场景适合批量读取,也可以根据偏移量范围创建DataFrame和DataSet

// 从一个topic读取,默认是从开始偏移量到结尾偏移量
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 从多个topic读取,指定偏移量范围
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 根据模式匹配读取topic, 偏移量从开始到结束
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

源中的每一行都具有以下schema

列名类型
keybinary
valuebinary
topicstring
partitionint
offsetlong
timestamplong
timestampTypeint

必须为 Kafka 源为批处理和流查询设置以下选项。

选项意义
assignjson string {“topicA”:[0,1],“topicB”:[2,4]}要使用的特定主题分区。只能为 Kafka 源指定“assign”、“subscribe”或“subscribePattern”选项之一。
subscribe以逗号分隔的主题列表要订阅的主题列表。只能为 Kafka 源指定“assign”、“subscribe”或“subscribePattern”选项之一。
subscribePatternJava 正则表达式字符串用于订阅主题的模式。只能为 Kafka 源指定“assign、“subscribe”或“subscribePattern”选项之一。
kafka.bootstrap.servers逗号分隔的主机列表:端口Kafka“bootstrap.servers”配置。

可选配置

选项默认查询类型意义
startingOffsets“earliest”、“latest”(仅限流媒体)或 json 字符串 “”" {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-2} } “”"latest用于流式传输,earliest用于批处理流和批处理查询开始时的起始点,可以是来自最早偏移量的“earliest”、仅来自最新偏移量的“latest”,或者是指定每个 TopicPartition 的起始偏移量的 json 字符串。在json中,-2作为偏移量可以用来指最早的,-1指的是最新的。注意:对于批量查询,latest(隐式或在 json 中使用 -1)是不允许的。对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询停止的位置开始。查询期间新发现的分区将最早开始。
endingOffsets最新或 json 字符串 {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}}latest批量查询批处理查询结束时的结束点,可以是“最新”,它只是指最新的,或者是指定每个 TopicPartition 的结束偏移量的 json 字符串。在json中,-1作为偏移量可以用来引用latest,-2(earlyest)作为偏移量是不允许的。
failOnDataLosstrue 或者 falsetrue流查询当数据可能丢失(例如,主题被删除,或偏移量超出范围)时是否使查询失败。这可能是误报。当它不能按预期工作时,您可以禁用它。如果由于丢失数据而无法从提供的偏移量中读取任何数据,则批处理查询将始终失败。
kafkaConsumer.pollTimeoutMslong512流和批处理在执行程序中从 Kafka 轮询数据的超时时间(以毫秒为单位)。
fetchOffset.numRetriesint3流和批处理在放弃获取 Kafka 偏移量之前重试的次数。
fetchOffset.retryIntervalMslong10流和批处理在重试获取 Kafka 偏移量之前等待的毫秒数
maxOffsetsPerTriggerlongnone流和批处理每个触发间隔处理的最大偏移量的速率限制。指定的总偏移量将在不同卷的主题分区之间按比例分配。
4.1.1.3.3 向Kafka写入数据

请注意,Apache Kafka 仅支持至少一次写入语义。因此,在向 Kafka 写入(流式查询或批量查询)时,某些记录可能会重复;例如,如果 Kafka 需要重试一条未被 Broker 确认的消息,即使该 Broker 接收并写入了消息记录,也会发生这种情况。由于这些 Kafka 写语义,结构化流无法防止此类重复的发生。但是,如果写入查询成功,那么您可以假设查询输出至少写入了一次。在读取写入的数据时删除重复项的可能解决方案可能是引入一个主(唯一)键,该键可用于在读取时执行重复数据删除。

写入 Kafka 的数据帧应该在架构中具有以下列:

类型
key (可选的)string or binary
value 必须的)string or binary
topic (*可选的)string
  • 如果未指定“主题”配置选项,则主题列是必需的。

值列是唯一必需的选项。如果未指定null键列,则将自动添加值键列(请参阅有关如何null处理值键值的Kafka 语义)。如果存在主题列,则在将给定行写入 Kafka 时将其值用作主题,除非设置了“主题”配置选项,即“主题”配置选项覆盖主题列。

必须为 批处理和流查询的 Kafka 接收器设置以下选项。

Optionvaluemeaning
kafka.bootstrap.servers逗号分隔的主机列表:端口Kafka“bootstrap.servers”配置。
以下配置是可选的:
Optionvaluedefaultquery typemeaning
topicstringnonestreaming and batch设置所有行将写入 Kafka 的主题。此选项会覆盖数据中可能存在的任何主题列。

流式数据写入Kafka

// 从DataFrame写key-value数据到Kafka,通过option指定topic
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// 从DataFrame写key-value数据到Kafka,数据中指定topic
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

将批量查询的输出写入 Kafka

// 从DataFrame写key-value数据到Kafka,通过option指定topic
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// 从DataFrame写key-value数据到Kafka,数据中指定topic
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
4.1.1.3.4 Kafka 特定配置

卡夫卡自己的配置可以通过设置DataStreamReader.option与kafka.前缀,例如 stream.option(“kafka.bootstrap.servers”, “host:port”)。对于可能的 kafka 参数,有关读取数据的参数参考Kafka 消费者配置文档, 有关写入数据的参数参考 Kafka生产者配置文档

请注意,无法设置以下 Kafka 参数,并且 Kafka 源或接收器将抛出异常:

  • group.id:Kafka 源会自动为每个查询创建一个唯一的组 ID。
  • auto.offset.reset:设置源选项startingOffsets以指定从哪里开始。Structured Streaming 管理内部消耗哪些偏移量,而不是依赖 kafka Consumer 来完成。这将确保在动态订阅新主题/分区时不会丢失任何数据。请注意,startingOffsets仅在启动新的流查询时适用,并且恢复将始终从查询停止的位置开始。
  • key.deserializer:键总是使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化键。
  • value.deserializer:值总是使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化值。
  • key.serializer:键始终使用 ByteArraySerializer 或 StringSerializer 进行序列化。使用DataFrame 操作将键显式序列化为字符串或字节数组。
  • value.serializer:值始终使用 ByteArraySerializer 或 StringSerializer 进行序列化。使用 DataFrame 操作将值显式序列化为字符串或字节数组。
  • enable.auto.commit:Kafka 源不提交任何偏移量。
  • interceptor.classes:Kafka 源总是将键和值作为字节数组读取。使用 ConsumerInterceptor 是不安全的,因为它可能会破坏查询。
4.1.1.3.5 部署

部署时要注意,必须把spark-sql-kafka-0-10_2.1打到依赖里,否则需要显示指定。

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 ...
  • 1
4.1.1.4 生成数据源
  • rowsPerSecond(例如 100,默认值:1):每秒应生成多少行。

  • rampUpTime(例如 5s,默认值:0s):在生成速度变为 之前需要多长时间斜坡上升rowsPerSecond。使用比秒更细的粒度将被截断为整数秒。

  • numPartitions(eg 10, default: Spark’s default parallelism): 生成行的分区号。

源将尽力达到rowsPerSecond,但查询可能受到资源限制,并且numPartitions可以进行调整以帮助达到所需的速度。

4.1.2 模式推断和分区

默认情况下,来自基于文件的源的结构化流需要您指定架构,而不是依赖 Spark 自动推断它。此限制确保一致的架构将用于流式查询,即使在失败的情况下也是如此。对于临时用例,您可以通过设置spark.sql.streaming.schemaInference为来重新启用模式推断true

当指定的子目录/key=value/存在并且列表将自动递归到这些目录中时,确实会发生分区发现。如果这些列出现在用户提供的模式中,Spark 将根据正在读取的文件的路径填充它们。组成分区方案的目录在查询开始时必须存在并且必须保持静态。例如,存在/data/year=2016/时添加是可以的/data/year=2015/,但更改分区列(即通过创建目录/data/date=2016-04-17/)是无效的。

5 对流式DataFrame/DataSet的操作

可以在流式数据帧/数据集上应用各种操作——从无类型、类似 SQL 的操作(例如selectwheregroupBy)到类型化的 RDD 类操作(例如mapfilterflatMap)。有关更多详细信息,请参阅Spark SQL部分。让我们看一下可以使用的一些示例操作。

5.1 基本操作 - 选择、投影、聚合

DataFrame/Dataset 上的大多数常见操作都支持流式传输。本节稍后将讨论少数不受支持的操作。

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // IOT设备的流式数据,schema为 { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // IOT 设备的流式数据

// 查询信号数大于10的设备
df.select("device").where("signal > 10")      // 使用弱类型 API   
ds.filter(_.signal > 10).map(_.device)         // 使用强类型 API

// 获得各种设备类型的个数
df.groupBy("deviceType").count()                          // 使用弱类型 API

// 获得每种设备的平均数
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // 使用强类型 API
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

还可以将流式数据帧/数据集注册为临时视图,然后对其应用 SQL 命令。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // 返回sql查询结果的DataFrame
  • 1
  • 2

请注意,可以使用df.isStreaming来判断是否是流。

df.isStreaming
  • 1

5.2 事件时间的窗口操作

滑动事件时间窗口上的聚合对于结构化流很简单,并且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。在基于窗口的聚合的情况下,为行的事件时间所属的每个窗口维护聚合值。让我们通过一个插图来理解这一点。

想象一下我们的快速示例被修改,流现在包含行以及生成行的时间。我们不想计算字数,而是要计算 10 分钟窗口内的字数,每 5 分钟更新一次。也就是说,在 10 分钟窗口 12:00 - 12:10、12:05 - 12:15、12:10 - 12:20 等之间接收到的字数中的字数。请注意,12:00 - 12:10 表示数据在 12:00 之后但在 12:10 之前到达。现在,考虑在 12:07 收到的一个词。这个字应该增加对应于两个窗口 12:00 - 12:10 和 12:05 - 12:15 的计数。因此计数将由分组键(即单词)和窗口(可以从事件时间计算)两者进行索引。

结果表将类似于以下内容。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WbQ6q4LX-1631754842072)(http://spark.apache.org/docs/2.4.0/img/structured-streaming-window.png)]

由于这种加窗类似于分组,在代码中,可以使用groupBy()window()操作来表达加窗聚合。您可以在Scala / Java / Python 中查看以下示例的完整代码 。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// 根据窗口和word聚合 计算每组的word个数
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

5.3 处理延迟数据和水印

现在考虑如果其中一个事件迟到应用程序会发生什么。例如,假设在 12:04(即事件时间)生成的单词可以在 12:11 被应用程序接收。应用程序应该使用时间 12:04 而不是 12:11 来更新窗口12:00 - 12:10的旧计数。这在我们基于窗口的分组中很自然地发生——结构化流可以在很长一段时间内保持部分聚合的中间状态,以便后期数据可以正确更新旧窗口的聚合,如下图所示。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x1TYbuIa-1631754842075)(http://spark.apache.org/docs/2.4.0/img/structured-streaming-late-data.png)]

但是,要运行此查询数天,系统必须限制其累积的中间内存状态量。这意味着系统需要知道何时可以从内存状态中删除旧聚合,因为应用程序将不再接收该聚合的迟到数据。为了实现这一点,在 Spark 2.1 中,引入了 水印,它可以让引擎自动跟踪数据中的当前事件时间并尝试相应地清理旧状态。您可以通过指定事件时间列和数据在事件时间方面的预期延迟阈值来定义查询的水印。对于从T时间开始的特定窗口,引擎将保持状态并允许延迟数据更新状态直到(max event time seen by the engine - late threshold > T). 换句话说,阈值内的迟到数据将被聚合,但迟于阈值的数据将开始被丢弃( 有关确切保证,请参阅本节后面的部分)。让我们通过一个例子来理解这一点。我们可以使用withWatermark()如下所示轻松地在前面的示例中定义水印。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// 根据窗口和word聚合 计算每组的word个数
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在这个例子中,我们在timestamp列的值上定义查询的水印,并定义“10分钟”作为允许数据延迟多长时间的阈值。如果此查询在更新输出模式下运行(稍后在输出模式部分讨论),引擎将不断更新结果表中窗口的计数,直到窗口比水印更旧,这滞后于列“中的当前事件时间”时间戳”前 10 分钟。这是一个插图。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u28z65MD-1631754842076)(http://spark.apache.org/docs/2.4.0/img/structured-streaming-watermark-update-mode.png)]

如图,引擎跟踪的最大事件时间为 蓝色虚线(max event time - '10 mins') 每次触发开始时设置的水印为红线。例如,当引擎观察到数据时 (12:14, dog),它将下一个触发器的水印设置为12:04。此水印让引擎保持中间状态额外 10 分钟,以允许对延迟数据进行计数。比如数据(12:09, cat)乱序、迟到,落在windows12:00 - 12:1012:05 - 12:15. 由于它仍然12:04在触发器中的水印之前,引擎仍然将中间计数保持为状态并正确更新相关窗口的计数。但是,当水印更新为12:11,窗口的中间状态(12:00 - 12:10)被清除,所有后续数据(例如(12:04, donkey))都被认为“太晚了”,因此被忽略。请注意,在每次触发后,更新的计数(即紫色行)被写入接收器作为触发输出,如更新模式所指示的那样。

某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。为了与它们一起工作,我们还支持追加模式,其中仅将最终计数写入接收器。这如下图所示。

请注意,withWatermark在非流式数据集上使用是无操作的。由于水印不应该以任何方式影响任何批量查询,我们将直接忽略它。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U98X8Khb-1631754842079)(http://spark.apache.org/docs/2.4.0/img/structured-streaming-watermark-append-mode.png)]

与之前的更新模式类似,引擎维护每个窗口的中间计数。但是,部分计数不会更新到结果表中,也不会写入接收器。引擎等待“10 分钟”以计算延迟日期,然后丢弃窗口 < 水印的中间状态,并将最终计数附加到结果表/接收器。例如,窗口的最终计数12:00 - 12:10仅在水印更新为 后才附加到结果表中12:11

5.3.1 水印清洁聚合状态的条件

需要注意的是,水印必须满足以下条件才能清除聚合查询中的状态*(从 Spark 2.1.1 开始,将来可能会发生变化)*。

  • **输出模式必须是追加或更新。**完整模式要求保留所有聚合数据,因此不能使用水印来删除中间状态。有关 每种输出模式语义的详细说明,请参阅输出模式部分。
  • 聚合必须具有事件时间列或window事件时间列上的 a。
  • withWatermark必须在与聚合中使用的时间戳列相同的列上调用。例如, df.withWatermark("time", "1 min").groupBy("time2").count()在追加输出模式下无效,因为水印是在与聚合列不同的列上定义的。
  • withWatermark必须在要使用的水印详细信息的聚合之前调用。例如,df.groupBy("time").count().withWatermark("time", "1 min")在追加输出模式下无效。
5.3.2 带水印聚合的语义保证
  • withWatermark“2 小时”的水印延迟(设置为)可确保引擎永远不会丢弃任何延迟小于 2 小时的数据。换句话说,任何比到那时处理的最新数据晚不到 2 小时(就事件时间而言)的数据都可以保证聚合。
  • 但是,保证只在一个方向上是严格的。延迟超过2小时的数据不保证掉线;它可能会或可能不会聚合。数据延迟越多,引擎处理它的可能性就越小。

5.4 join操作

结构化流支持将流数据集/数据帧与静态数据集/数据帧以及另一个流数据集/数据帧连接起来。流式连接的结果是增量生成的,类似于上一节中流式聚合的结果。在本节中,我们将探讨在上述情况下支持哪种类型的连接(即内部、外部等)。请注意,在所有支持的连接类型中,与流数据集/数据帧的连接结果将与在流中包含相同数据的静态数据集/数据帧完全相同。

5.4.1 流静态连接

自从在 Spark 2.0 中引入以来,Structured Streaming 已经支持流和静态 DataFrame/Dataset 之间的连接(内连接和某种类型的外连接)。这是一个简单的例子。

val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // 和静态DF内连接
streamingDf.join(staticDf, "type", "right_join")  // 和静态DF右外连接  
  • 1
  • 2
  • 3
  • 4
  • 5

请注意,流静态连接不是有状态的,因此不需要状态管理。但是,尚不支持几种类型的流静态外部联接。这些都列在此join部分的末尾。

5.4.2 流-流连接

在 Spark 2.3 中,增加了对流-流连接的支持,即可以连接两个流数据集/数据帧。在两个数据流之间生成连接结果的挑战在于,在任何时间点,连接两侧的数据集视图都是不完整的,这使得在输入之间找到匹配变得更加困难。从一个输入流接收的任何行都可以与来自另一个输入流的任何未来的、尚未接收的行匹配。因此,对于两个输入流,我们将过去的输入缓冲为流状态,以便我们可以将每个未来的输入与过去的输入相匹配,并相应地生成连接结果。此外,类似于流聚合,我们自动处理迟到的乱序数据,并可以使用水印限制状态。让我们讨论支持的流-流连接的不同类型以及如何使用它们。

5.4.2.1 带有可选水印的内部连接

支持任何类型的列上的内部联接以及任何类型的联接条件。但是,随着流的运行,流状态的大小将无限增长,因为 必须保存所有过去的输入,因为任何新输入都可以与过去的任何输入匹配。为了避免无界状态,您必须定义额外的连接条件,以便无限期的旧输入无法与未来输入匹配,因此可以从状态中清除。换句话说,您必须在联接中执行以下附加步骤。

  1. 时间范围连接条件(例如...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
  2. 加入事件时间窗口(例如...JOIN ON leftTimeWindow = rightTimeWindow)。

让我们通过一个例子来理解这一点。

假设我们想要将一个广告展示流(当广告展示时)与另一个用户点击广告流连接起来,以关联展示何时导致可获利的点击。要在此流-流连接中进行状态清理,您必须指定水印延迟和时间限制,如下所示。

  1. 水印延迟:比如说,展示次数和相应的点击次数在事件时间中最多可能分别延迟/失序最多 2 小时和 3 小时。
  2. 事件时间范围条件:比如说,点击可以在相应展示后的 0 秒到 1 小时的时间范围内发生。

代码看起来像这样。

import org.apache.spark.sql.functions.expr

val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// 在事件时间上使用水印
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// 使用事件时间约束join
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

带水印的流-流内部连接的语义保证

这类似于在聚合上加水印提供保证。“2 小时”的水印延迟保证引擎永远不会丢弃任何延迟小于 2 小时的数据。但延迟超过 2 小时的数据可能会也可能不会得到处理。

5.4.2.2 带水印的外部连接

虽然水印 + 事件时间约束对于内连接是可选的,但对于左外连接和右外连接,它们必须被指定。这是因为为了在外连接中生成 NULL 结果,引擎必须知道输入行将来何时不会与任何内容匹配。因此,必须指定水印 + 事件时间约束才能生成正确的结果。因此,带有外连接的查询看起来与之前的广告货币化示例非常相似,只是会有一个附加参数将其指定为外连接。

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // 可以是 "inner", "leftOuter", "rightOuter"
 )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

带水印的流-流外部连接的语义保证

关于水印延迟以及数据是否会被丢弃,外连接与内连接具有相同的保证。

注意事项

关于如何生成外部结果,有几个重要的特征需要注意。

  • *外部 NULL 结果的生成延迟取决于指定的水印延迟和时间范围条件。*这是因为引擎必须等待那么长时间才能确保没有匹配项,并且将来不会再有匹配项。
  • 在目前微批处理引擎的实现中,水印在一个微批处理结束时提前,下一个微批处理使用更新后的水印清理状态并输出外部结果。由于我们仅在有新数据要处理时触发微批处理,如果流中没有接收到新数据,则外部结果的生成可能会延迟。 简而言之,如果加入的两个输入流中的任何一个在一段时间内没有接收到数据,则外部(左或右)输出可能会延迟。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0iF8BKBg-1631754842082)(C:\Users\my\AppData\Roaming\Typora\typora-user-images\image-20210915221221093.png)]

有关支持的连接的其他详细信息:

  • 联接可以级联,也就是说,可以执行df1.join(df2, ...).join(df3, ...).join(df4, ....).
  • 从 Spark 2.3 开始,只能在查询处于追加输出模式时使用连接。尚不支持其他输出模式。
  • 从 Spark 2.3 开始,不能在连接之前使用其他非类似映射的操作。以下是一些不能使用的示例。
    • 在加入之前不能使用流聚合。
    • 连接前不能在更新模式下使用 mapGroupsWithState 和 flatMapGroupsWithState。

5.5 流重复数据删除

使用事件中的唯一标识符对数据流中的记录进行重复数据删除。这与使用唯一标识符列的静态重复数据删除完全相同。该查询将存储来自先前记录的必要数据量,以便它可以过滤重复记录。与聚合类似,可以使用带有或不带有水印的重复数据删除。

  • 带水印- 如果重复记录到达的时间有上限,那么可以在事件时间列上定义水印,并使用 guid 和事件时间列进行重复数据删除。查询将使用水印从过去的记录中删除旧的状态数据,这些数据预计不会再有任何重复。这限制了查询必须维护的状态量。
  • 无水印- 由于重复记录可能到达的时间没有限制,因此查询将所有过去记录中的数据存储为状态。
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

5.6 处理多个水印的策略

流查询可以有多个联合或连接在一起的输入流。每个输入流都可以具有不同的延迟数据阈值,有状态操作需要容忍这些延迟数据。您可以withWatermarks("eventTime", delay)在每个输入流上使用这些阈值 。例如,考虑在inputStream1和之间使用流-流连接的查询inputStream2

inputStream1.withWatermark(“eventTime1”, “1 hours”) .join( inputStream2.withWatermark(“eventTime2”, “2 hours”), joinCondition)

在执行查询时,Structured Streaming 单独跟踪在每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择单个全局水印与它们一起用于有状态操作。默认情况下,选择最小值作为全局水印,因为它可以确保如果其中一个流落后于其他流(例如,其中一个流由于上游故障而停止接收数据),则不会因为太晚而意外丢弃数据。换句话说,全局水印将安全地以最慢流的速度移动,查询输出将相应延迟。

但是,在某些情况下,您可能希望获得更快的结果,即使这意味着从最慢的流中删除数据。从 Spark 2.4 开始,您可以通过将 SQL 配置设置spark.sql.streaming.multipleWatermarkPolicymax(默认为min)来设置多水印策略以选择最大值作为全局水印 。这让全局水印以最快的流速度移动。然而,作为副作用,来自较慢流的数据将被积极丢弃。因此,请谨慎使用此配置。

5.7 任意状态操作

许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,必须从事件数据流中跟踪会话。为了进行这种会话化,必须将任意类型的数据保存为状态,并使用每个触发器中的数据流事件对状态执行任意操作。从 Spark 2.2 开始,这可以使用操作mapGroupsWithState和更强大的操作来完成flatMapGroupsWithState。这两种操作都允许在分组数据集上应用用户定义的代码以更新用户定义的状态。有关更具体的详细信息,请查看 API 文档 ( Scala / Java ) 和示例 ( Scala / Java )。

5.8 不支持的操作

流式数据帧/数据集不支持一些数据帧/数据集操作。其中一些如下。

  • 流式数据集尚不支持多个流式聚合(即流式 DF 上的聚合链)。
  • 流式数据集不支持限制和取前 N 行。
  • 不支持对流式数据集的不同操作。
  • 仅在聚合后且在完整输出模式下,流式数据集才支持排序操作。
  • 不支持流数据集上的几种类型的外连接。有关 更多详细信息,请参阅连接操作部分中的 支持矩阵。

此外,还有一些 Dataset 方法不适用于流式数据集。它们是将立即运行查询并返回结果的操作,这在流式数据集上没有意义。相反,这些功能可以通过显式启动流式查询来完成(请参阅下一节关于此的内容)。

  • count()- 无法从流式数据集返回单个计数。相反,使用ds.groupBy().count()它返回一个包含运行计数的流数据集。
  • foreach()- 改为使用ds.writeStream.foreach(...)(见下一节)。
  • show() - 而是使用控制台接收器(请参阅下一节)。

如果您尝试这些操作中的任何一个,您将看到AnalysisException类似“流式数据帧/数据集不支持操作 XYZ”。虽然其中一些可能在 Spark 的未来版本中得到支持,但还有一些从根本上很难有效地在流数据上实现。例如,不支持对输入流进行排序,因为它需要跟踪流中接收到的所有数据。因此,这从根本上很难有效执行。

6 启动流式查询

一旦定义了最终结果 DataFrame/Dataset,剩下的就是开始流式计算。要做到这一点,你必须使用通过Dataset.writeStream()返回的DataStreamWriterScala/ Java的/ Python文档)。必须在此界面中指定以下一项或多项。

  • *输出接收器的详细信息:*数据格式、位置等。
  • *输出模式:*指定写入输出接收器的内容。
  • *查询名称:(*可选)指定查询的唯一名称以进行标识。
  • *触发间隔:*可选地,指定触发间隔。如果未指定,系统将在前一处理完成后立即检查新数据的可用性。如果因为之前的处理没有完成而错过了一个触发时间,那么系统会立即触发处理。
  • *检查点位置:*对于一些可以保证端到端容错的输出接收器,指定系统将写入所有检查点信息的位置。这应该是兼容 HDFS 的容错文件系统中的目录。检查点的语义将在下一节中更详细地讨论。
6.1 输出模式

有几种类型的输出模式。

  • 附加模式(默认) - 这是默认模式,只有自上次触发后添加到结果表中的新行才会输出到接收器。这仅支持添加到结果表中的行永远不会改变的那些查询。因此,这种模式保证每一行只输出一次(假设容错接收器)。例如,查询只selectwheremapflatMapfilterjoin,等会支持追加模式。
  • 完全模式- 每次触发后,整个结果表将输出到接收器。这支持聚合查询。
  • 更新模式-(自 Spark 2.1.1 起可用)只有自上次触发后更新的结果表中的行才会输出到接收器。更多信息将在未来版本中添加。

不同类型的流查询支持不同的输出模式。这是兼容性矩阵。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HP2GN6kl-1631754842083)(C:\Users\my\AppData\Roaming\Typora\typora-user-images\image-20210915223027459.png)]

6.2 输出接收器

有几种类型的内置输出接收器。

  • 文件接收器- 将输出存储到目录中

    writeStream
        .format("parquet")        // can be "orc", "json", "csv", etc.
        .option("path", "path/to/destination/dir")
        .start()
    
    • 1
    • 2
    • 3
    • 4
  • Kafka sink - 将输出存储到 Kafka 中的一个或多个主题。

    writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
        .option("topic", "updates")
        .start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • Foreach sink - 对输出中的记录运行任意计算。有关更多详细信息,请参阅本节后面部分。

    writeStream
        .foreach(...)
        .start()
    
    • 1
    • 2
    • 3
  • 控制台接收器(用于调试) - 每次触发时将输出打印到控制台/标准输出。支持追加和完成输出模式。这应该用于低数据量的调试目的,因为每次触发后都会收集整个输出并存储在驱动程序的内存中。

    writeStream
        .format("console")
        .start()
    
    • 1
    • 2
    • 3
  • 内存接收器(用于调试) - 输出作为内存表存储在内存中。支持追加和完成输出模式。这应该用于低数据量的调试目的,因为整个输出都被收集并存储在驱动程序的内存中。因此,请谨慎使用。

    writeStream
        .format("memory")
        .queryName("tableName")
        .start()
    
    • 1
    • 2
    • 3
    • 4

一些接收器不是容错的,因为它们不保证输出的持久性并且仅用于调试目的。请参阅前面关于 容错语义的部分。以下是 Spark 中所有接收器的详细信息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-54gwFYpC-1631754842085)(C:\Users\my\AppData\Roaming\Typora\typora-user-images\image-20210915223422407.png)]

请注意,必须调用start()才能真正开始执行查询。这将返回一个 StreamingQuery 对象,该对象是持续运行的执行的句柄。可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个例子来理解这一切。

// ========== 无聚合的DataFrame ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")   

// 打印新数据到控制台
noAggDF
  .writeStream
  .format("console")
  .start()

// 将新数据以Parquet格式写到文件
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== 有聚合的DataFrame ==========
val aggDF = df.groupBy("device").count()

// 打印新数据到控制台
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// 所有聚合结果保存在一张内存表
aggDF
  .writeStream
  .queryName("aggregates")    // 这里指定的查询名将会是表名
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // 交互式查询内存表
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
6.2.1 使用 Foreach 和 ForeachBatch

foreachforeachBatch操作让应用在流媒体查询的输出任意操作和写的逻辑。它们的用例略有不同——虽然foreach 允许在每一行上自定义写入逻辑,但foreachBatch允许对每个微批次的输出进行任意操作和自定义逻辑。让我们更详细地了解它们的用法。

ForeachBatch

foreachBatch(...)允许指定一个函数,该函数对流式查询的每个微批次的输出数据执行。从 Spark 2.4 开始,Scala、Java 和 Python 都支持此功能。它需要两个参数:具有微批次输出数据的 DataFrame 或 Dataset 和微批次的唯一 ID。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // Transform and write batchDF 
}.start()
  • 1
  • 2
  • 3

使用foreachBatch,可以执行以下操作。

  • 重用现有的批处理数据源- 对于许多存储系统,可能还没有可用的流接收器,但可能已经存在用于批处理查询的数据编写器。使用foreachBatch,您可以在每个微批次的输出上使用批次数据编写器。

  • 写入多个位置- 如果要将流式查询的输出写入多个位置,则只需多次写入输出 DataFrame/Dataset。但是,每次写入尝试都会导致重新计算输出数据(包括可能重新读取输入数据)。为避免重新计算,您应该缓存输出 DataFrame/Dataset,将其写入多个位置,然后取消缓存。这是一个大纲。

    streamDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.persist() batchDF.write.format(…).save(…) // location 1 batchDF.write.format(…).save( …) // 位置 2 batchDF.unpersist() }

  • 应用额外的 DataFrame 操作- 流式 DataFrame 不支持许多 DataFrame 和 Dataset 操作,因为 Spark 不支持在这些情况下生成增量计划。使用foreachBatch,您可以对每个微批次输出应用其中一些操作。但是,您必须自己推理执行该操作的端到端语义。

注意:

  • 默认情况下,foreachBatch仅提供至少一次写入保证。但是,您可以使用提供给函数的 batchId 作为对输出进行重复数据删除并获得精确一次保证的方法。
  • foreachBatch不适用于连续处理模式,因为它从根本上依赖于流式查询的微批处理执行。如果以连续模式写入数据,请foreach改用。

Foreach

如果foreachBatch不可用(例如,对应的批处理数据写入器不存在,或连续处理模式),那么可以使用foreach. 具体来说,可以将数据的写入逻辑分为三种方法:openprocessclose。从 Spark 2.4 开始,foreach可以在 Scala、Java 和 Python 中使用。

在 Scala 中,必须扩展类ForeachWriter( docs )。

streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // 开启连接
    }

    def process(record: String): Unit = {
      // 写数据到连接
    }

    def close(errorOrNull: Throwable): Unit = {
      // 关闭连接
    }
  }
).start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

执行语义 当流式查询开始时,Spark 以如下方式调用函数或对象的方法:

  • 此对象的单个副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责处理分布式生成的数据的一个分区。

  • 该对象必须是可序列化的,因为每个任务都将获得所提供对象的新的序列化-反序列化副本。因此,强烈建议在调用 open() 方法之后完成任何写入数据的初始化(例如,打开连接或启动事务),这表示任务已准备好生成数据。

  • 方法的生命周期如下:

    • 对于每个带有 partition_id 的分区:
      • 对于具有 epoch_id 的每个批次/时期的流数据:
        • 方法 open(partitionId, epochId) 被调用。
        • 如果 open(…) 返回 true,则对于分区中的每一行和批处理/纪元,都会调用 process(row) 方法。
        • 在处理行时看到错误(如果有)调用方法 close(error) 。
  • 如果 open() 方法存在并成功返回(与返回值无关),则调用 close() 方法(如果存在),除非 JVM 或 Python 进程在中间崩溃。

  • **注意:**当失败导致重新处理某些输入数据时,open() 方法中的 partitionId 和 epochId 可用于对生成的数据进行重复数据删除。这取决于查询的执行模式。如果流式查询是以微批处理模式执行的,那么由唯一元组(partition_id,epoch_id)表示的每个分区都保证具有相同的数据。因此,(partition_id, epoch_id) 可用于重复数据删除和/或事务性提交数据并实现仅一次保证。但是,如果流查询以连续模式执行,则此保证不成立,因此不应用于重复数据删除。

6.3 触发器

流查询的触发器设置定义了流数据处理的时间,也定义了查询是作为具有固定批处理间隔的微批处理查询还是作为连续处理查询执行。以下是支持的不同类型的触发器

Trigger TypeDescription
unspecified (default)如果没有显式指定触发器设置,那么默认情况下,查询将以微批处理模式执行,只要前一个微批处理完成处理,就会生成微批处理。
Fixed interval micro-batches查询将以微批次模式执行,其中微批次将以用户指定的时间间隔启动。
如果前一个微批次在间隔内完成,则引擎将等待间隔结束,然后再启动下一个微批次。
如果前一个 micro-batch 花费的时间比完成间隔的时间长(即如果错过了一个间隔边界),那么下一个 micro-batch将在前一个完成后立即开始(即,它不会等待下一个间隔边界) )。
如果没有新数据可用,则不会启动微批次。
One-time micro-batch查询将执行一个微批处理来处理所有可用数据,然后自行停止。这在您希望定期启动集群、处理自上一个周期以来可用的所有内容,然后关闭集群的情况下非常有用。在某些情况下,这可能会显着节省成本。
Continuous with fixed checkpoint interval (experimental)查询将在新的低延迟、连续处理模式下执行。在下面的连续处理部分阅读更多相关信息。

下面是一些代码示例。

import org.apache.spark.sql.streaming.Trigger

// 默认触发器为微批
df.writeStream
  .format("console")
  .start()

// 固定2秒间隔微批次
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// 一次性微批次
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// 以固定检查点间隔连续
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

7 管理流查询

StreamingQuery启动查询时创建的对象可用于监控和管理查询。

val query = df.writeStream.format("console").start()   // get the query object

query.id          // 从检查点数据获取重新启动后持续存在或正在运行的查询的唯一标识符

query.runId       // 获取此查询运行的唯一 ID,它将在每次启动/重启时生成

query.name        // 获取自动生成或用户指定名称的名称

query.explain()   // 打印查询的详细说明

query.stop()      // 停止查询

query.awaitTermination()   // 阻塞直到查询终止,由于 stop() 或错误

query.exception       // 查询因错误而终止时的异常

query.recentProgress  // 此查询的最新进度更新数组

query.lastProgress    // 此流式查询的最新进度更新
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

可以在单个 SparkSession 中启动任意数量的查询。它们都将同时运行,共享集群资源。可以使用sparkSession.streams()获取可用于管理当前活动查询的StreamingQueryManagerScala / Java / Python文档)。

val spark: SparkSession = ...

spark.streams.active    // 获取当前存活的流式查询列表

spark.streams.get(id)   // 根据唯一id获取查询对象

spark.streams.awaitAnyTermination()   // 锁定直到其中任何一个终止
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

8 监控流查询

有多种方法可以监控活动流查询。可以使用 Spark 的 Dropwizard Metrics 将指标推送到外部系统,或者以编程方式访问它们。

8.1交互式阅读指标

可以使用streamingQuery.lastProgress()streamingQuery.status()直接获取活动查询的当前状态和指标。 在ScalaJava 中lastProgress()返回一个StreamingQueryProgress对象, 在 Python 中返回一个具有相同字段的字典。它包含有关流的最后一个触发器中取得的进度的所有信息 - 处理了哪些数据,处理速率是多少,延迟等。还有streamingQuery.recentProgress返回最后几个进度的数组。

此外,在ScalaJava 中streamingQuery.status()返回一个StreamingQueryStatus对象, 在 Python 中返回一个具有相同字段的字典。它提供有关查询立即执行的操作的信息 - 触发器是否处于活动状态,是否正在处理数据等。

这里有一些例子。

val query: StreamingQuery = ...

println(query.lastProgress)

/* 将会打印一些像下面的信息.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


println(query.status)

/*  将会打印一些像下面的信息.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

8.2 使用异步 API 以编程方式报告指标

还可以SparkSession通过附加 a StreamingQueryListenerScala / Java文档)来异步监视与 a 关联的所有查询 。使用 附加自定义StreamingQueryListener对象在 sparkSession.streams.attachListener(),您将在查询开始和停止以及活动查询取得进展时收到回调。这是一个例子,

val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

8.3 使用 Dropwizard 报告指标

Spark 支持使用Dropwizard 库的报告指标。要同时报告结构化流查询的指标,必须显式启用SparkSession 中的配置spark.sql.streaming.metricsEnabled

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
  • 1
  • 2
  • 3

启用此配置后,在 SparkSession 中启动的所有查询都将通过 Dropwizard 向已配置的任何接收器(例如 Ganglia、Graphite、JMX 等)报告指标。

9 使用检查点从故障中恢复

如果出现故障或故意关闭,可以恢复先前查询的先前进度和状态,并从中断处继续。这是使用检查点和预写日志完成的。您可以使用检查点位置配置查询,该查询会将所有进度信息(即在每个触发器中处理的偏移量范围)和正在运行的聚合(例如,快速示例中的字数)保存到检查点位置。此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在启动查询时设置为 DataStreamWriter 中的选项。

aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

10 流查询更改后的恢复语义

从同一检查点位置重新启动之间允许流式查询中的哪些更改存在限制。以下是一些不允许的更改,或者更改的效果没有明确定义。对于所有:

  • 术语允许意味着您可以进行指定的更改,但其效果的语义是否明确定义取决于查询和更改。
  • 术语不允许意味着您不应进行指定的更改,因为重新启动的查询可能会因不可预测的错误而失败。sdf表示使用 sparkSession.readStream 生成的流式数据帧/数据集。

变更类型

  • 输入源的数量或类型(即不同的源)的更改:这是不允许的。
  • 输入源参数的更改:是否允许以及更改的语义是否明确定义取决于源和查询。这里有一些例子。
    • 的速率限制增加/删除/修改是允许:spark.readStream.format("kafka").option("subscribe", "topic")spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
    • 一般不允许更改订阅主题/文件,结果是不可预知的:spark.readStream.format("kafka").option("subscribe", "topic")spark.readStream.format("kafka").option("subscribe", "newTopic")
  • 输出接收器类型的更改:允许在几个特定接收器组合之间进行更改。这需要根据具体情况进行验证。这里有一些例子。
    • 文件接收器到 Kafka 接收器是允许的。Kafka 只会看到新数据。
    • 不允许 Kafka 接收器到文件接收器。
    • 允许将 Kafka sink 更改为 foreach,反之亦然。
  • 输出接收器参数的更改:是否允许以及更改的语义是否明确定义取决于接收器和查询。这里有一些例子。
    • 对文件的修改水槽的输出目录是不允许的:sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • 允许更改输出主题:sdf.writeStream.format("kafka").option("topic", "someTopic")sdf.writeStream.format("kafka").option("topic", "anotherTopic")
    • 允许对用户定义的 foreach sink(即ForeachWriter代码)进行更改,但更改的语义取决于代码。
  • 投影/过滤器/类地图操作的更改*:允许某些情况。例如:
    • 允许添加/删除过滤器:sdf.selectExpr("a")to sdf.where(...).selectExpr("a").filter(...)
    • 在相同的输出模式预测的变化是允许:sdf.selectExpr("stringColumn AS json").writeStreamsdf.selectExpr("anotherStringColumn AS json").writeStream
    • 在与不同的输出模式突起变化是有条件地允许:sdf.selectExpr("a").writeStreamsdf.selectExpr("b").writeStream允许仅当输出信宿允许从模式更改"a""b"
  • 有状态操作的变化:流式查询中的一些操作需要维护状态数据才能不断更新结果。Structured Streaming 会自动将状态数据检查点到容错存储(例如 HDFS、AWS S3、Azure Blob 存储)并在重启后恢复。但是,这假设状态数据的模式在重新启动时保持不变。这意味着 在重新启动之间不允许对流查询的有状态操作进行任何更改(即添加、删除或模式修改)。以下是有状态操作的列表,其架构不应在重新启动之间更改以确保状态恢复:
    • 流聚合:例如,sdf.groupBy("a").agg(...). 不允许对分组键或聚合的数量或类型进行任何更改。
    • 流式重复数据删除:例如,sdf.dropDuplicates("a"). 不允许对分组键或聚合的数量或类型进行任何更改。
    • 流-流连接:例如,sdf1.join(sdf2, ...)(即两个输入都是用 生成的sparkSession.readStream)。不允许更改架构或等效连接列。不允许更改连接类型(外部或内部)。连接条件中的其他更改是不明确的。
    • 任意状态操作:例如,sdf.groupByKey(...).mapGroupsWithState(...)sdf.groupByKey(...).flatMapGroupsWithState(...)。不允许对用户定义状态的架构和超时类型进行任何更改。允许在用户定义的状态映射函数内进行任何更改,但更改的语义效果取决于用户定义的逻辑。如果您真的想支持状态模式更改,那么您可以使用支持模式迁移的编码/解码方案将复杂的状态数据结构显式编码/解码为字节。例如,如果您将状态保存为 Avro 编码字节,那么您可以在查询重新启动之间自由更改 Avro-state-schema,因为二进制状态将始终成功恢复。

11 连续加工

连续处理是 Spark 2.3 中引入的一种新的、实验性的流执行模式,它支持低(约 1 毫秒)的端到端延迟,并具有至少一次容错保证。将此与默认的微批处理引擎进行比较,后者可以实现恰好一次保证,但最多可实现约 100 毫秒的延迟。对于某些类型的查询(下面讨论),您可以选择在不修改应用程序逻辑的情况下执行它们的模式(即不更改 DataFrame/Dataset 操作)。

要在连续处理模式下运行受支持的查询,需要做的就是指定一个具有所需检查点间隔作为参数的连续触发器。例如,

import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .option("")

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

1 秒的检查点间隔意味着连续处理引擎将每秒记录查询的进度。结果检查点的格式与微批处理引擎兼容,因此可以使用任何触发器重新启动任何查询。例如,以微批处理模式启动的受支持查询可以在连续模式下重新启动,反之亦然。请注意,每次切换到连续模式时,您都将获得至少一次容错保证。

11.1 支持的查询

从 Spark 2.3 开始,连续处理模式只支持以下类型的查询。

  • 操作:只有地图状数据集/数据帧操作是在连续模式下,即,只有突出部(支撑selectmapflatMapmapPartitions,等)和选择(wherefilter等)。
    • 支持除聚合函数之外的所有 SQL 函数(因为尚不支持聚合)current_timestamp()current_date()(使用时间的确定性计算具有挑战性)。
  • 源:
    • Kafka 源:支持所有选项。
    • 费率来源:适合测试。只有在连续模式下支持的选项是numPartitionsrowsPerSecond
  • 输出
    • Kafka 接收器:支持所有选项。
    • 内存接收器:适合调试。
    • 控制台接收器:适合调试。支持所有选项。请注意,控制台将打印您在连续触发器中指定的每个检查点间隔

有关它们的更多详细信息,请参阅输入源和输出接收器部分。虽然控制台接收器非常适合测试,但使用 Kafka 作为源和接收器可以最好地观察端到端的低延迟处理,因为这允许引擎处理数据并使结果在输出主题中可用输入主题中可用的输入数据的毫秒数。

11.2注意事项

  • 连续处理引擎启动多个长时间运行的任务,这些任务连续从源读取数据、处理数据并连续写入接收器。查询所需的任务数量取决于查询可以从源并行读取多少个分区。因此,在开始连续处理查询之前,您必须确保集群中有足够的内核来并行执行所有任务。例如,如果您正在读取具有 10 个分区的 Kafka 主题,那么集群必须至少具有 10 个核心才能使查询取得进展。
  • 停止连续处理流可能会产生虚假的任务终止警告。这些可以安全地忽略。
  • 目前没有失败任务的自动重试。任何失败都会导致查询停止,需要从检查点手动重新启动。

六、Spark ML

//TODO

七、Spark GraphX

//TODO

八、通用

1 配置

配置文档

2 部署

提交任务

3 监控

监控文档

4 内存管理

4.1 静态内存管理

Spark1.5及之前的内存管理机制。在静态内存管理器机制下,存储内存,执行内存和其他内存的大小在Spark应用程序的操作期间是固定的,用户可以在应用程序启动之前对其进行配置。

缺点:

  1. 开发人员需要针对不同的应用配置不同的参数,根据任务的不同执行逻辑,调整shuffle和storage的内存占比
    2. 需要用户熟悉Spark的存储机制
    3. 很容易造成资源的浪费,例如spark程序中存储只占用一小部分可用内存,而默认的内存配置中storage为50%,造成了很大的浪费

4.2 统一内存管理

统一内存管理是Spark 1.6之后引入的默认管理方式。统一内存管理器和静态内存管理器之间的区别在于,在统一内存管理器机制下,存储内存和执行内存共享一个内存区域,两者都可以占用彼此的空闲区域,存储内存和执行内存都可以在堆外分配了。

运行在Executor的Task同时可使用JVM(OnHeap+Off-heap)和Off-heap两种模式的内存。默认是JVM内存,Off-heap需要修改配置

img

如上图所示,Yarn集群管理模式中,Spark 以Executor Container的形式在NodeManager中运行,其可使用的内存上限由“yarn.scheduler.maximum-allocation-mb” 指定, —我们可以称其为MonitorMemory。

Executor的内存由Heap内存和设定的Off-heap内存组成:

  • Heap: 由“spark.executor.memory” 指定, 以下称为ExecutorMemory
  • Off-heap: 由 “spark.yarn.executor.memoryOverhead” 指定, 以下称为MemoryOverhead

Yarn集群中必须满足这样的关系 : ExecutorMemory + MemoryOverhead <= MonitorMemory

若应用提交之时,指定的 ExecutorMemory与MemoryOverhead 之和大于 MonitorMemory,则会导致Executor申请失败;若运行过程中,实际使用内存超过上限阈值,Executor进程会被Yarn终止掉(kill)。

4.2.1 堆内存

–executor-memory 也就是 “spark.executor.memory"指定的内存为Executor JVM最大分配的堆内存(”-xmx"),Spark为了更高效的使用这部分内存,对这部分内存进行了细分,下图(备注:此图源于互联网)对基于spark2(1.6+)对堆内存分配比例进行了描述:

img

Spark2.4中内存参数

  • Unsable Memory可用内存,等于堆内存减去预留内存,spark.executor.memory - 300M
  • Reserved Memory预留内存,默认值为300M
  • Unified Memory,由参数spark.memory.fraction控制,默认值为Usable * 0.6
  • Storage,用于缓存和跨集群传播内部数据的内存,由参数spark.memory.storageFraction控制,默认值为 spark.memory.fraction * 0.5
  • Execution,用于在shuffles, joins, sorts and aggregations中进行计算的内存,默认值为 spark.memory.fraction*0.5
  • Other,用户使用的内存,例如读取一个文件或数据库表的数据就保存在这里,,默认值为 Usable - Unified

若有一方内存不足,另一方空余,Storage和Execution可以动态相互占用对方内存。如有必要,Execution可能会驱逐Storage占用的内存,Execution内存可以强制收回Storage占用的内存。Execution是Unified Memory中永远不会驱逐缓存块的子区域。

5 调优

5.1 数据序列化

序列化在任何分布式应用程序的性能中都扮演着重要的角色。将对象序列化为缓慢或消耗大量字节的格式将大大减慢计算速度。通常,这将是应该调整以优化 Spark 应用程序的第一件事。Spark 旨在在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两个序列化库:

  • Java 序列化:默认情况下,Spark 使用 JavaObjectOutputStream框架序列化对象,并且可以与您创建的任何实现 java.io.Serializable. 您还可以通过扩展 java.io.Externalizable. Java 序列化很灵活,但通常很慢,并且导致许多类的序列化格式很大。
  • Kryo 序列化:Spark 还可以使用 Kryo 库(版本 4)更快地序列化对象。Kryo 比 Java 序列化更快、更紧凑(通常高达 10 倍),但不支持所有 Serializable类型,并且需要提前注册将在程序中使用的类以获得最佳性能。

可以通过使用SparkConf初始化Job,并调用conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). 此设置配置的序列化程序不仅用于在工作节点之间混洗数据,还用于将 RDD 序列化到磁盘。Kryo 不是默认设置的唯一原因是自定义注册要求,但我们建议在任何网络密集型应用程序中尝试它。从 Spark 2.0.0 开始,我们在内部使用 Kryo 序列化程序来对具有简单类型、简单类型数组或字符串类型的 RDD 进行Shuffle。

Spark自动包含了常用核心 Scala 类Kryo序列化,由Twitter chill](https://github.com/twitter/chill)提供

要向 Kryo 注册自己的自定义类,使用registerKryoClasses方法。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
  • 1
  • 2
  • 3

KRYO文档描述了更先进的注册选项,如添加自定义序列的代码。

如果您的对象很大,您可能还需要增加spark.kryoserializer.buffer config。该值需要足够大以容纳您将序列化的最大对象。

最后,如果不注册自定义类,Kryo 仍然可以工作,但它必须为每个对象存储完整的类名,这很浪费。

5.2 内存调优

默认情况下,Java 对象可以快速访问,但很容易消耗比其字段中的“原始”数据多 2-5 倍的空间。这是由于以下几个原因:

  • 每个不同的 Java 对象都有一个“对象头”,它大约 16 个字节,包含诸如指向其类的指针等信息。对于其中数据很少的对象(比如一个Int字段),这可能比数据大。
  • Java Strings 对原始字符串数据有大约 40 字节的开销(因为它们将它存储在一个Chars数组中并保留额外的数据,例如长度),并且由于UTF-16 的内部使用而将每个字符存储为两个字节String编码。因此,一个 10 个字符的字符串很容易消耗 60 个字节。
  • 常见的集合类,例如HashMap和LinkedList,使用链接数据结构,其中每个条目(例如Map.Entry)都有一个“包装器”对象。这个对象不仅有一个标头,而且还有指向列表中下一个对象的指针(通常每个 8 个字节)。
  • 原始类型的集合通常将它们存储为“装箱”对象,例如java.lang.Integer.
5.2.1 内存管理

参考4 内存管理

5.2.2 确定内存消耗

确定数据集所需内存消耗量的最佳方法是创建一个 RDD,将其放入缓存,然后查看 Web UI 中的“存储”页面。该页面会告诉我们 RDD 占用了多少内存。

要估计特定对象的内存消耗,请使用SizeEstimatorestimate方法。这对于尝试不同的数据布局以减少内存使用以及确定广播变量将在每个执行程序堆上占用的空间量非常有用。

5.2.3 调整数据结构

减少内存消耗的第一种方法是避免增加开销的 Java 特性,例如基于指针的数据结构和包装对象。做这件事有很多种方法:

  1. 设计您的数据结构,更应该使用对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如HashMap)。fastutil 库为与 Java 标准库兼容的原始类型提供了方便的集合类。
  2. 尽可能避免使用包含大量小对象和指针的嵌套结构。
  3. 考虑使用数字 ID 或枚举对象而不是字符串作为键。
  4. 如果 RAM 少于 32 GB,请设置 JVM 标志-XX:+UseCompressedOops以使指针为 4 个字节而不是 8 个字节。可以在 spark-env.sh设置.
5.2.4 序列化RDD存储

尽管进行了这种调整,但当对象仍然太大而无法有效存储时,减少内存使用的一种更简单的方法是以序列化形式存储它们,使用RDD 持久性 API 中的序列化 StorageLevels ,例如MEMORY_ONLY_SER. 然后 Spark 会将每个 RDD 分区存储为一个大字节数组。以序列化形式存储数据的唯一缺点是访问时间较慢,因为必须动态反序列化每个对象。如果您想以序列化形式缓存数据,我们强烈建议使用 Kryo,因为它比 Java 序列化(当然也比原始 Java 对象)小得多。

5.2.5 垃圾收集调优

当程序存储的 RDD 有大量“流失”时,JVM 垃圾收集可能会成为一个问题。(对于只读取一次 RDD 然后对其运行许多操作的程序,这通常不是问题。)当 Java 需要驱逐旧对象为新对象腾出空间时,它需要跟踪所有 Java 对象并找到未使用的。这里要记住的要点是垃圾收集的成本与 Java 对象的数量成正比,因此使用具有较少对象的数据结构(例如Ints 而不是 a的数组LinkedList)可以大大降低此成本。一种更好的方法是以序列化形式持久化对象,如上所述:现在将只有一个每个 RDD 分区的对象(一个字节数组)。在尝试其他技术之前,如果 GC 有问题,首先要尝试使用序列化缓存

由于任务的工作内存(运行任务所需的空间量)和节点上缓存的 RDD 之间的干扰,GC 也可能是一个问题。我们将讨论如何控制分配给 RDD 缓存的空间来缓解这种情况。

测量 GC 的影响

GC 调优的第一步是收集有关垃圾收集发生频率和 GC 花费时间的统计信息。这可以通过添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps到 Java 选项来完成。(有关将 Java 选项传递给 Spark 作业的信息,请参阅配置指南。)下次运行 Spark 作业时,每次发生垃圾收集时,您都会看到在工作程序日志中打印的消息。请注意,这些日志将在集群的Executor节点上(stdout在其工作目录中的文件中),而不是在您的Driver程序上。

高级 GC 调优

为了进一步调优垃圾回收,我们首先需要了解一些关于JVM内存管理的基本信息:

  • Java 堆空间分为两个区域 Young 和 Old。年轻代用于保存生命周期较短的对象,而老年代用于保存生命周期较长的对象。
  • 年轻代进一步分为三个区域 [Eden, Survivor1, Survivor2]。
  • 垃圾回收过程的简单描述:当 Eden 已满时,在 Eden 上运行minor GC ,并将来自 Eden 和 Survivor1 的存活对象复制到 Survivor2。幸存者区域被交换。如果对象足够旧或 Survivor2 已满,则将其移至 Old。最后,当 Old 接近 full 时,会调用 full GC。

Spark 中 GC 调优的目标是确保只有长寿命的 RDD 存储在老年代,并且年轻代的大小足以存储短寿命的对象。这将有助于避免完整的 GC 来收集在任务执行期间创建的临时对象。一些可能有用的步骤是:

  • 通过收集 GC 统计信息来检查是否有太多垃圾收集。如果在任务完成之前多次调用 full GC,则意味着没有足够的内存可用于执行任务。
  • 如果有太minor GC但没有太多 full GC,为 Eden 分配更多内存会有所帮助。您可以将 Eden 的大小设置为高估每个任务需要多少内存。如果确定了 Eden 的大小E,则可以使用该选项设置 Young 代的大小-Xmn=4/3*E。(放大 4/3 也是为了考虑幸存者区域使用的空间。)
  • 在打印的 GC 统计信息中,如果 OldGen 接近满,则通过降低spark.memory.fraction来减少用于缓存的内存量;缓存更少的对象比减慢任务执行速度更好。或者,考虑减少年轻代的大小。这意味着-Xmn如果您已将其设置为如上,则降低。如果没有,请尝试更改 JVMNewRatio参数的值。许多 JVM 将其默认为 2,这意味着 Old 代占用堆的 2/3。它应该足够大,使得这个分数超过spark.memory.fraction
  • 小于32G的堆,新生代用ParallelGC,老年代用CMS-XX:+UseParNewGC -XX:+UseConcMarkSweepGC
  • 大于32G堆,建议使用 G1GC 垃圾收集器-XX:+UseG1GC。在垃圾收集成为瓶颈的某些情况下,它可以提高性能。需要注意的是大Executor堆大小,设置-XX:G1HeapRegionSize增加G1区域大小 很重要
  • 例如,如果您的任务是从 HDFS 读取数据,则可以使用从 HDFS 读取的数据块大小来估算任务使用的内存量。请注意,解压缩块的大小通常是块大小的 2 或 3 倍。所以如果我们希望有 3 或 4 个Task的工作空间,并且 HDFS 块大小为 128 MB,我们可以估计 Eden 的大小为4*3*128MB.
  • 监视垃圾收集的频率和时间如何随新设置发生变化。

我们的经验表明,GC 调优的效果取决于您的应用程序和可用内存量。有更多的微调选项描述联机,但需要较高的水平管理。管理 full GC 发生的频率有助于减少开销。

可以通过spark.executor.extraJavaOptions在作业的配置中设置来指定执行程序的 GC 配置。

5.3 其他注意事项

5.3.1 并行度

除非您将每个操作的并行级别设置得足够高,否则集群不会得到充分利用。Spark 会根据每个文件的大小自动设置要在每个文件上运行的“map”任务的数量(尽管您可以通过可选参数 toSparkContext.textFile等来控制它),并且对于分布式“reduce”操作,例如groupByKeyand reduceByKey,它使用最大的父级RDD 的分区数。您可以将并行级别作为第二个参数传递(请参阅spark.PairRDDFunctions文档),或设置 config 属性spark.default.parallelism以更改默认值。通常,我们建议集群中的每个 CPU 核心 2-3 个任务。

5.3.2 Reduce 任务的内存使用情况

有时,您会得到 OutOfMemoryError 不是因为您的 RDD 不适合内存,而是因为您的一项任务的工作集(例如 中的一个 reduce 任务groupByKey)太大。Spark 的 shuffle 操作(sortByKeygroupByKeyreduceByKeyjoin等)在每个任务中构建一个哈希表来执行分组,这通常可能很大。这里最简单的解决方法是 提高并行度,使每个任务的输入集更小。Spark 可以有效地支持短至 200 毫秒的任务,因为它在多个任务中重用一个 executor JVM,并且任务启动成本低,因此您可以安全地将并行级别提高到超过集群中的内核数量。

5.3.3广播大变量

使用 SparkContext中 可用的广播功能可以大大减少每个序列化任务的大小,以及在集群上启动作业的成本。如果您的任务使用其中的驱动程序中的任何大对象(例如静态查找表),请考虑将其转换为广播变量。Spark 会在 master 上打印每个任务的序列化大小,因此您可以查看它以确定您的任务是否太大;一般来说,大于 20 KB 的任务可能值得优化。

5.3.4数据本地化

数据本地化会对 Spark 作业的性能产生重大影响。如果数据和对其进行操作的代码在一起,那么计算往往会很快。但是如果代码和数据是分开的,就必须移到另一个。通常,将序列化代码从一个地方传送到另一个地方比一大块数据更快,因为代码大小比数据小得多。Spark 围绕数据本地化的一般原则构建其调度。

数据本地化是数据与处理它的代码的接近程度。根据数据的当前位置,存在多个位置级别。按照从近到远的顺序:

  • PROCESS_LOCAL数据与运行代码在同一个 JVM 中。这是最好的地方
  • NODE_LOCAL数据在同一个节点上。示例可能在同一节点上的 HDFS 中,或在同一节点上的另一个执行程序中。这比PROCESS_LOCAL因为数据必须在进程之间传输要慢一点
  • NO_PREF 从任何地方同样快速地访问数据,并且没有位置偏好
  • RACK_LOCAL数据在同一个服务器机架上。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常是通过单个交换机
  • ANY 数据在网络上的其他地方,而不是在同一个机架中

Spark 更喜欢在最佳位置级别安排所有任务,但这并不总是可行的。在任何空闲执行器上没有未处理数据的情况下,Spark 会切换到较低的位置级别。有两种选择:a) 等到繁忙的 CPU 腾出时间来启动同一服务器上的数据任务,或者 b) 立即在需要移动数据的较远地方启动新任务。

Spark 通常做的是稍等片刻,希望繁忙的 CPU 腾出时间。一旦超时到期,它就会开始将数据从远处移动到空闲 CPU。每个级别之间回退的等待超时可以单独配置,也可以在一个参数中一起配置;详见 配置页面spark.locality参数。如果您的任务很长并且局部性较差,您应该增加这些设置,但默认设置通常效果很好。

总结

这是一个简短的指南,用于指出您在调优 Spark 应用程序时应该了解的主要问题——最重要的是,数据序列化和内存调优。对于大多数程序,切换到 Kryo 序列化并以序列化形式持久化数据将解决最常见的性能问题。请随时在 Spark 邮件列表中询问其他调优最佳实践。

6 任务调度和资源调度原理

基于Yarn

  1. 集群启动时NodeManager就不断向ResourceManager汇报资源
  2. 根据配置new SparkContext(),配置优先级SparkConf>spark-submit>spark-defaults.conf
  3. 客户端向ResourceManager申请资源启动ApplicationMaster(Driver)
  4. ResourceManager收到请求会随机找一台NodeManager启动
  5. Driver启动后,会向ResourceManager申请资源启动Executor
  6. ResourceManager返回NodeManager列表
  7. Driver在NodeManager上启动Executor
  8. Executor反向注册给Driver
  9. Driver里根据Action算子划分job,一个Action算子就是一个job。job里根据算子的依赖关系形成一个有向无环图。根据宽窄依赖(前RDD的一个分区数据到后RDD多个分区为宽依赖,其他都是窄依赖)切割job,划分为多个stage,每个stage又包括多个并行的task。TaskSchedule遍历stage,又遍历stage中的task将task发送到Executor中执行
  10. 监控Task执行
  11. 回收执行结果

参考资料

spark官方文档
http://spark.apache.org/docs/2.4.0/

Spark案例
https://sparkbyexamples.com/spark/

持久化
https://cloud.tencent.com/developer/article/1482150

《Spark快速大数据分析》

《High Performance Spark》

性能调优
https://zhuanlan.zhihu.com/p/354998357

数据倾斜调优
https://www.cnblogs.com/cssdongl/p/6594298.html

内存模型

https://www.iteblog.com/archives/2342.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/666456
推荐阅读
相关标签
  

闽ICP备14008679号