赞
踩
我的spark学习笔记,基于Spark 2.4.0
Apache Spark是一个分布式计算框架,相比上一代计算框架MapReduce速度更快,且提供更多更方便的接口和函数实现。
Spark软件栈示意图
Spark包括Spark Core、Spark Sql、Spark Structured Streaming、Spark Streaming、Spark MLib、Spark GraghX。
Spark Core里包含任务调度、内存管理、错误恢复、与存储系统交互等模块。还包括RDD(Resilient Distributed Dateset 弹性分布式数据集)的定义。
Spark Sql是Spark提供的处理结构化数据的包,通过Spark Sql可以使用SQL或 hive sql来查询数据。
Spark Structured Streaming是Spark提供的以SQL的方式处理流式数据的包。
Spark Streaming是Spark 提供的处理流式数据的包。
Spark MLib是Spark封装好的机器学习包,封装了一些算法。
Spark GraghX是Spark提供的处理图数据的包。
PS:Spark编程 最好使用Scala语言,Scala Api比Java Api调用方便,Scala性能比Python更好
Spark1.0开始就有的,弹性分布式数据集。数据的抽象模型,跨集群节点分区的元素集合,可以并行操作,且有容错恢复功能。不应该把RDD看做存放特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的,记录如何计算数据的指令列表。
RDD操作分为两类,一类是transformation(转换算子),转换算子会返回一个新的RDD,转换算子都是懒执行;另一类是action(行动算子),行动算子会返回汇总RDD或者输出数据,行动算子是立即执行的。
转化操作可能会多次执行,所以为了避免读取超时和重算消耗,读取时间较长或多次使用的RDD必须持久化。
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
val totoalCount = lineLengths.count()
这个例子里,因为有reduce()、count()两个行动算子,所以textFile()、map()会执行2次。
如果在第2行后插入一行如下代码
lineLengths.persist()
这样的话每个算子都会只运行一次
RDD读操作的方法都在SparkContext里,下面是几个常用的
读操作 | 作用 |
---|---|
textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] | 从HDFS或本地文件系统或其他支持Hadoop文件接口的其他数据源读取文件,返回一个String类型的RDD |
binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] | 读取二进制文件 |
objectFile[T](path: String, minPartitions: Int = defaultMinPartitions)(implicit arg0: ClassTag[T]): RDD[T] | 读取序列化后的Sequence文件 |
sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] | 根据给定的key-value格式读取,返回一个String类型的RDD |
wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)] | 读取文件,一个文件会生成一个(文件名,文件内容)的k v ,适用于处理多个小文件或处理整个文件 |
下表列出了 Spark 支持的一些常见转换。有关详细信息,请参阅 RDD API 文档(Scala、 Java、 Python)和配对 RDD 函数文档(Scala、 Java)。
转换算子 | 作用 |
---|---|
map(func) | 返回使用传入函数转换每个元素后的RDD |
filter(func) | 过滤操作,保留传入函数判断为true的数据 |
flatMap(func) | 类似map,不同的是,flatMap输入一个可迭代的列表,输出0到多个,这就要求func返回值必须是可迭代的数据结构 |
mapPartitions(func) | 类似map,不同的是mapPartitions是针对一个分区做一次操作,输入和输出都是可迭代的数据结构 |
mapPartitionsWithIndex ( func ) | 通过对这个 RDD 的每个分区应用一个函数来返回一个新的 RDD,同时可以得到原始分区的索引。 |
sample(withReplacement, fraction, seed) | 使用给定的随机数生成器种子对数据的一小部分进行采样,无论是否有替换。 |
union(otherRDD) | 返回两个RDD合并后的RDD |
intersection(otherRDD) | 返回两个RDD交集的RDD |
distinct([numPartitions])) | 返回去除重复元素的RDD |
groupByKey([numPartitions]) | 在 (K, V) 对的数据集上调用时,返回 (K, Iterable) 对的数据集。如果分组是为了对每个键执行聚合(例如求和或平均值),使用reduceByKey或aggregateByKey将产生更好的性能。默认情况下,输出中的并行级别取决于父 RDD 的分区数。可以传递一个可选numPartitions参数来设置不同数量的任务。 |
reduceByKey(func, [numPartitions]) | 当在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数func聚合,该函数必须是 (V,V) => V. 与groupByKey一样,reduce 任务的数量可通过可选的第二个参数进行配置。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | PairRDD才能调用,使用给定的聚合函数和中性的“零值”聚合每个键的值。这个函数可以返回与这个RDD V中的值类型不同的结果类型U。同样可以传入一个分区数来设置分区的数量 |
sortByKey([ascending], [numPartitions]) | PairRDD才能调用,根据key排序,特殊点在于会产生shuffle |
join(otherDataset, [numPartitions]) | 当调用类型为 (K, V) 和 (K, W) 的数据集时,返回一个 (K, (V, W)) 对的数据集,其中包含每个键的所有元素对。外连接通过leftOuterJoin,rightOuterJoin和fullOuterJoin实现。 |
cogroup(otherDataset, [numPartitions]) | 当调用类型为 (K, V) 和 (K, W) 的数据集时,返回一个包含 (K, (Iterable, Iterable)) 元组的数据集。此操作和groupWith相同。 |
pipe(command, [envVars]) | 通过 shell 命令(例如 Perl 或 bash 脚本)管理 RDD 的每个分区。RDD 元素被写入进程的标准输入,输出到标准输出的行作为字符串的 RDD 返回。 |
coalesce(numPartitions) | 将 RDD 中的分区数减少到 numPartitions。对过滤大型数据集后更有效地运行操作很有用。 |
repartition(numPartitions) | 随机重组 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡,会产生shuffle。repartion是coalesce的shuffle为true的调用 |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的分区器对 RDD 进行重新分区,并在每个结果分区内,按键对记录进行排序。这比repartition在每个分区内调用然后排序更有效,因为它可以将排序下推到 shuffle 机器中。 |
下表列出了 Spark 支持的一些常见操作。请参阅 RDD API 文档(Scala、 Java、 Python、 R)和Pair RDD 函数文档(Scala,Java )了解详细信息。
action算子 | 作用 |
---|---|
reduce(func) | 使用函数func(它接受两个参数并返回一个)聚合数据集的元素。该函数应该是可交换的和关联的,以便它可以被正确地并行计算。 |
collect() | 在驱动程序中将数据集的所有元素作为数组返回。这通常在过滤器或其他返回足够小的数据子集的操作之后很有用。 |
count() | 返回数据集中元素的数量。 |
first() | 返回数据集的第一个元素(类似于 take(1))。 |
take(n) | 返回一个包含数据集前n 个元素的数组。 |
takeSample(withReplacement, num, [seed]) | 返回一个数组,其中包含数据集的num 个元素的随机样本,有或没有替换,可选择预先指定随机数生成器种子。 |
takeOrdered(n, [ordering]) | 使用自然顺序或自定义比较器返回RDD的前n 个元素。 |
saveAsTextFile(path) | 将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 以将其转换为文件中的一行文本。 |
saveAsSequenceFile(path) | 将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对的 RDD 上可用。在 Scala 中,它也可用于隐式转换为 Writable 的类型(Spark 包括对 Int、Double、String 等基本类型的转换)。 |
saveAsObjectFile(path) | 使用 Java 序列化以简单格式编写数据集的元素,然后可以使用 SparkContext.objectFile()读取 |
countByKey () | 仅适用于 (K, V) 类型的 RDD。返回 (K, Int) 对的HashMap,其中包含每个键的计数。 |
foreach(func) | 对数据集的每个元素运行函数func。 |
scala传递方法k可以传递方法的引用或静态方法传递给Spark,引用类型必须实现序列化,尽量避免使用引用类型。变量尽量使用临时变量。
Spark 中的某些操作会触发一个称为 shuffle 的事件。shuffle 是 Spark 用于重新分配数据的机制,以便它跨分区进行不同的分组。这通常涉及在执行器和机器之间复制数据,从而使 shuffle 成为一项复杂且成本高昂的操作。
要了解在 shuffle 期间发生了什么,可以考虑reduceByKey操作示例 。该reduceByKey操作生成一个新的 RDD,其中单个键的所有值都组合成一个元组 - 键和针对与该键关联的所有值执行 reduce 函数的结果。挑战在于,单个键的所有值不一定位于同一分区,甚至同一台机器上,但它们必须位于同一位置以计算结果。
在 Spark 中,数据通常不会跨分区分布在特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,为了组织单个reduceByKeyreduce 任务执行的所有数据,Spark 需要执行一个 all-to-all 操作。它必须从所有分区中读取以找到所有键的所有值,然后将跨分区的值组合在一起以计算每个键的最终结果 - 这称为shuffle。
编码中要尽量避免shuffle
Spark 中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当你持久化一个 RDD 时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从它派生的数据集)的其他操作中重用它们。这使得未来的动作可以更快(通常超过 10 倍)。缓存是迭代算法和快速交互使用的关键工具。
可以使用persist()、cache()、checkpoint()。其中cache()调用了persist(StorageLevel.MEMORY_ONLY)。
每个持久化的 RDD 都可以使用不同的存储级别来存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的 Java 对象(以节省空间),跨节点复制它。这些级别是通过将 StorageLevel对象(Scala、 Java、 Python)传递给persist()
持久化级别的选择
Spark 还会在 shuffle 操作(例如reduceByKey)中自动保留一些中间数据,即使没有调用persist.。这样做是为了避免在 shuffle 期间节点失败时重新计算整个输入。
通常,当传递给 Spark 操作(例如map或reduce)的函数在远程集群节点上执行时,它会处理函数中使用的所有变量的单独副本。这些变量被复制到每台机器上,并且对远程机器上的变量的更新不会传播回驱动程序。支持跨任务的通用读写共享变量将是低效的。但是,Spark 确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
一般情况下,传递一个变量给map()等方法时,会在每个task基本保存一个变量,如果变量很大时就会导致占用很大空间。
广播变量允许我们在每台机器上缓存一个只读变量,而不是随任务一起传送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark 还尝试使用高效的广播算法来分发广播变量以降低通信成本。
广播变量是通过调用SparkContext.broadcast(v)从变量创建的。广播变量是围绕变量的包装器,可以通过调用value 方法访问其值。下面的代码显示了这一点:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
需要注意的是广播变量不应该在广播后修改。
累加器是可以在分布式执行的一个计数器。支持一些默认的累加器,也支持自定义累加器
累加器的例子
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
scala> accum.value
res2: Long = 10
相同算子相同计算逻辑的,一定要重用,不要重复计算。
提前将不需要的数据过滤掉,可以减少后面计算的时间。如果先计算后过滤,则浪费了部分计算时间。
读取多个小文件或处理单个文件要用wholeTextFiles读取
普通的map算子每个元素做一次操作,如果是写数据库之类的成本比较高的操作成本就太高了。此时可以用mapPartition,mapPartition算子是一个分区的数据做一次操作。
数据量大可能会OOM,所以要适当调整内存。
其他的foreach和foreachPartition也是类似的。
filter后,有一些分区的数据可能会减少很多,一些分区减少不多,此时就会造成分区间数据分布不均匀。此时重分区可以适当提升性能。
使用filter后,可以用coalesce或者repartition调整分区数。
Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。这样可以更充分地利用硬件资源。
并行度可以通过SparkConf设置
val conf = new SparkConf().set("spark.default.parallelism", "300")
聚合算子尽可能少用groupByKey,尽可能多的用reduceByKey。
reduceByKey会在map端预聚合,每个key预先计算出一个值,到reduce再进行最终聚合。所以会节约IO,网络传输等时间。
groupByKey则是将所有数据都拉取到reduce端才会执行聚合,相比reduceByKey而言浪费了很多时间。
以下情况应该将RDD缓存起来
checkpoint可将数据缓存到HDFS等文件系统,如果缓存数据丢失可以读取checkpoint数据。缺点是与文件系统交互,io速度慢。
遇到这三种情况必须将RDD缓存
sc.setCheckpointDir('HDFS')
rdd.cache/persist(memory_and_disk)
rdd.checkpoint
默认情况下RDD中使用外部变量,会在每个task中生成一个副本,如果变量数据很大会占用很多内存。此时要使用广播变量,广播变量会在每个executor保存一个副本,这个executor的所有task都会引用这个副本,task很多的时候可以节约很多内存。
从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs已经默认是Kryo序列化了。
自定义的类需要实现
public class MyKryoRegistrator implements KryoRegistrator{
@Override
public void registerClasses(Kryo kryo){
kryo.register(StartupReportLogs.class);
}
}
//创建SparkConf对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化库
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在Kryo序列化库中注册自定义的类集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator");
join前给两个RDD指定相同的分区器,可以避免昂贵的shuffle操作。
如果不指定就会产生shuffle。
某个分区的数据显著多于其他分区的数据导致单个task执行时间远远超出平均执行时间。
表现为
例如某市不同街道上的过车记录,一定是十几条主要街道过车记录远远大于其他街道。
这种数据源本身就是倾斜的。
像这种数据可以针对性的采集时就将主要街道的记录针对性的打散,比如加1~8的前缀,聚合计算后,再次汇总聚合。
缺点:需要预处理数据,计算步骤增多
场景:大量不同的Key被分配到了相同的Task造成该Task数据量过大。
例如原本有10个task,则可以尝试repartition(20),将多个key分不到其他分区,缓解shuffle。
也可以尝试减少并行度,coalesce(5,true)减少分区,使数据更加均匀,缓解shuffle。具体减少还是增加,需要测试对比。
劣势:适用场景少,只能将分配到同一Task的不同Key分散开,但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。
默认情况下Spark使用HashPartitioner,HashPartitioner是对key求取hash值再对partitions 取余数的方法,因此如果大部分key是相同的话将会导致,各partition之间存在数据倾斜的问题,极端情况下,RDD的所有row被分配到了同一个partition中。
RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
因此可以尝试使用RangePartitioner来缓解数据倾斜。
将多个key分布到不同分区
完全随机分区
class CustomerPartition(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
def getPartition(key: Any): Int = {
(key.toString.charAt(0) + scala.util.Random.nextInt(10)) % numPartitions
}
}
只针对部分key随机分区
package org.apache.spark.examples
import org.apache.spark.{HashPartitioner, Partitioner}
class CustomerPartition(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => nonNegativeMod(key, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
def nonNegativeMod(x: Any, mod: Int): Int = {
val skewKeys = Set("key1","key2","key3","key4","key5")
val keyHashCode = x.hashCode
val rawMod = keyHashCode % mod
if(skewKeys.contains(x.toString)){
val sourcePartitionNum = rawMod + (if (rawMod < 0) mod else 0)
sourcePartitionNum + (scala.util.Random.nextInt(mod + skewKeys.size) % (mod + skewKeys.size))
}else{
rawMod + (if (rawMod < 0) mod else 0)
}
}
}
使用自定义分区器
join前必须两个RDD设置相同的分区器(可以使用3.12.2 shuffle数据倾斜里的自定义分区器或其他自定义分区器)重分区
倾斜RDD加n个随机前缀,不倾斜RDD膨胀n倍
将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。再做需要的聚合操作。
案例
package org.apache.spark.examples
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.sql.SparkSession
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf}
import scala.util.Random
class DataSkew {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setAppName("ResolveDataSkewWithNAndRandom")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = sparkSession.sparkContext
val leftRdd = sc.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/").map(row => (row.split(",")(0),row.split(",")(1)))
val rightRdd = sc.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/").map(row => (row.split(",")(0),row.split(",")(1)))
var addList: List[String] = List()
var a:Int = 0
for(a <- 1 to 20){
addList = addList :+ a.toString
}
val addListKeys = sc.broadcast(addList)
val newRandomPrefixLeftRDD = leftRdd.map(t2 => (Random.nextInt(20)+","+t2._1,t2._2)).partitionBy(new CustomerPartition(20))
val rightPrefixRdd = rightRdd.flatMap(t2 => (addListKeys.value.toStream.map((s => (s + "," +t2._1,t2._2))).toList)).partitionBy(new CustomerPartition(20))
val joinRDD = leftRdd.join(rightRdd).map(t2 => (t2._1.split(",")(1),t2._2._2))
joinRDD.foreachPartition(iterable => {
val atomicInteger = new AtomicInteger()
iterable.toStream.foreach(t2 => atomicInteger.incrementAndGet())
})
sc.stop()
sparkSession.stop()
}
}
DataFrame是Dataset的一个子集,从源码来看DataFrame就是Row类型的Dataset。Dataset类似关系数据库的一张表。Dataset提供了编译时类型检查。
sparksql的功能入口是SparkSession,SparkSession创建案例
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// RDD到DataFrame的隐式转换
import spark.implicits._
使用SparkSession通过已有RDD创建,也可以通过hive、读取其他数据源创建
读取操作都是通过DataFrameReader类读取的
读取Json案例
val df = spark.read.json("examples/src/main/resources/people.json")
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
DataFrame提供一种DSL(特定领域语言)来操作数据,支持Scala、Java、Python API
由于2.0之后,DataFrame实际上就是Row类型的Dataset,所以这些操作也可适用于Dataset
案例
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
示例代码可以在spark源码仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"中找到完整的示例代码。
全部操作可以点上面蓝字部分的API文档
SparkSession的sql允许适用SQL操作DataFrame和Dataset
案例
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
临时视图,根据DataFrame或Dataset可以创建临时视图,相当于一张表,临时视图有不同的作用域;session级别的临时视图只能被当前SparkSession看到,且会随当前SparkSession的消失而消失;如果希望创建一个所有会话共享的临时视图可以创建全局临时视图,全局级的的临时视图可以被所有SparkSession看到。
全局临时视图存在系统数据库global_temp
中,对全局临时视图查询必须加数据库名,否则会报一个找不到临时视图的错。例如SELECT * FROM global_temp.view1
// 将 DataFrame 注册为全局临时视图
df . createGlobalTempView ( "people" )
// 全局临时视图绑定到系统保留的数据库 `global_temp`
spark 。sql ( "SELECT * FROM global_temp.people" )。show ()
// +----+-------+
// | 年龄| 姓名|
// +----+-------+
// |null|迈克尔|
// | 30| 安迪|
// | 19| 贾斯汀|
// +----+-----+
// 全局临时视图是跨会话
spark . 新会话()。sql ( "SELECT * FROM global_temp.people" )。show ()
// +----+-------+
// | 年龄| 姓名|
// +----+-------+
// |null|迈克尔|
// | 30| 安迪|
// | 19| 贾斯汀|
// +----+-----+
在 Spark源码库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”中可以找到完整的示例代码。
Dataset类似RDD,但Dataset不使用Java序列化或Kryo而是使用专门的编码器。 这种编码器动态生成代码的方式允许Spark执行许多操作,比如filter、sort、hash等,而无需反序列化成对象。
case class Person(name: String, age: Long)
// 编码器通过样例类创建
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// 对大部分类型编码器可以通过导入隐式转换实现即 importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrame转化为Dataset只需要提供一个类名即可
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Spark SQL提供两种方式将现有RDD转化为DataFrame。第一种方法使用反射来推断包含特定类型对象的 RDD 的模式。在编写 Spark 应用程序时已经知道schema时,这种基于反射的方法会产生更简洁的代码并且效果很好。
创建数据集的第二种方法是通过编程接口,该接口允许构建schema,然后将其应用于现有 RDD。虽然此方法更加冗长,但它允许在列及其类型直到运行时才知道时构造数据集。
// 用于从RDD到DataFrame的隐式转换
import spark.implicits._
// 从一个文本文件创建一个Person对象的RDD,将其转化为DataFrame
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// 通过Spark提供的sql方法可以执行sql语句
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// 可以通过索引访问列
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// 或者通过列名访问
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// 没有预先定义Dataset[Map[K,V]]的编码器, 使用kryo编码器显示定义
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// 原始类型和样例类也可以定义成下面的编码器
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// 这个方法 row.getValuesMap[T] 把所有的列和值放到一个 Map[String, T]里
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
全部示例代码位于Spark代码仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”
如果预先无法得到schema,可以通过如下步骤将RDD转化为DataFrame
import org.apache.spark.sql.types._
// 创建一个RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// schema定义
val schemaString = "name age"
// 根据上面string类型的schema创建StructType
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// 将People类型的RDD转化为Row类型的RDD
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// 根据schema和RDD创建DataFrame
val peopleDF = spark.createDataFrame(rowRDD, schema)
// 根据DataFrame创建临时视图
peopleDF.createOrReplaceTempView("people")
// 在临时视图上执行sql
val results = spark.sql("SELECT name FROM people")
// SQL查询返回一个DataFrame,支持所有RDD支持的操作
// 一行数据可通过索引和列名访问
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
在 Spark 源码仓库“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”中可以找到完整的示例代码。
内置聚合方法提供了包括count(), countDistinct(), avg(), max(), min()等等。这些方法在Spark Sql中也有对应的类型安全的Scala和Java实现。
如果预定义的无法满足需求可以自定义聚合函数。
弱类型的自定义聚合函数适用于DataFrame。无类型的自定义聚合函数要继承UserDefinedAggregateFunction抽象类,实现其中的方法。一个求平均数弱类型用户自定义聚合函数案例
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
object MyAverage extends UserDefinedAggregateFunction {
// 聚合函数输入参数数据类型
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// 聚合buffer的数据类型
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的类型
def dataType: DataType = DoubleType
// 输入相同时,函数是否总输出相同的结果
def deterministic: Boolean = true
// 初始化buffer,buffer本是是row对象。
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// 根据新值修改buffer的值
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// 聚合buffer的值,聚合后的值还存到buffer里
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 计算最终结果
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
// 注册聚合函数来调用它
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
在 Spark 源码库"examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala"中可以找到完整的示例代码
强类型的自定义聚合函数要继承Aggregator。
一个求平均数的类型安全的自定义聚合函数案例
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// 0值,要满足一个值加0还等于它本身
def zero: Average = Average(0L, 0L)
// 聚合两个值,返回聚合后的buffer对象
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 合并两个中间值
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 转换为输出结果
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 指定中间值类型编码器
def bufferEncoder: Encoder[Average] = Encoders.product
// 指定最终输出值类型的编码器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
// 将函数应用到Dataset,并重命名为一个列
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
多次使用的表可以缓存起来
通过spark.catalog.cacheTable("tableName") 或 dataFrame.cache()
然后 Spark SQL 将只扫描需要的列并自动调整压缩以最小化内存使用和 GC 压力。可以调用spark.catalog.uncacheTable("tableName")
从内存中删除该表。
可以使用SparkSession的setConf方法或通过SET key=value使用 SQL运行 命令来完成内存缓存的配置。
属性名 | 默认值 | 意义 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | 当设置为 true 时,Spark SQL 将根据数据统计自动为每列选择一个压缩编解码器。 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制列缓存的批次大小。更大的批处理大小可以提高内存利用率和压缩率,但在缓存数据时会面临 OOM 的风险。 |
以后的版本里这些参数可以会被干掉,未来将会自动优化
属性名 | 默认值 | 意义 |
---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | 读取文件时打包到单个分区的最大字节数。 |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | 打开一个文件的估计成本,以同时扫描的字节数来衡量。这在将多个文件放入一个分区时使用。最好是高估,那么小文件的分区会比大文件的分区(排在最前面)快。 |
spark.sql.broadcastTimeout | 300 | 广播加入中广播等待时间的超时(以秒为单位) |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | 配置表的最大大小(以字节为单位),该表将在执行连接时广播到所有工作节点。通过将此值设置为 -1 可以禁用广播。请注意,当前仅支持ANALYZE TABLE COMPUTE STATISTICS noscan已运行该命令的 Hive Metastore 表的统计信息 。 |
spark.sql.shuffle.partitions | 200 | 配置shulffle数据以进行连接或聚合时要使用的分区数。 |
broadcast join模式,会将小于spark.sql.autoBroadcastJoinThreshold
值(默认为10M)的表广播到其他计算节点,不走shuffle过程,所以会更加高效。
scala案例
import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), "key").show()
sql案例
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
如果默认的SparkSQL生成的分区太少或太多,不能充分利用资源,可以读取数据后,调用repartition或者coalesce调整分区数。
Spark Streaming是基于Spark Core的扩展,用来处理流式数据的API。可以读取Kafka、Flume或者TCP套接字等多种数据源。 并且可以使用高级别功能表达复杂的算法map,reduce,join和window来处理数据。处理后可以将数据写入文件系统、数据库和实时仪表盘。
内部工作原理是接收实时数据,把数据切成一批一批的数据,一批数据作为一个RDD处理。
Spark Streaming 提供了一种称为离散流或DStream的高级抽象,它表示连续的数据流。DStream 可以从来自 Kafka、Flume 和 Kinesis 等来源的输入数据流创建,也可以通过对其他 DStream 应用高级操作来创建。在内部,DStream 表示为一系列 RDD。StreamingContext是所有流功能的主要入口点。
一个简单案例
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 创建一个具有两个执行线程的本地 StreamingContext,批处理间隔为 1 秒。
// 必须至少是2个线程,一个接收数据,一个处理数据
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// 创建一个DStream表示来自Socket的数据
val lines = ssc.socketTextStream("localhost", 9999)
// 将每一行用空格作为分隔符切割成一个个单词
val words = lines.flatMap(_.split(" "))
// 计算每个批次里的单词个数
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 打印前10个元素到控制台
wordCounts.print()
// 启动计算
ssc.start()
// 等待计算终止
ssc.awaitTermination()
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName是应用程序在集群UI或本地UI上显示的应用名称。master是一个Spark、Mesos 或 YARN 集群 URL,或者是一个表示本地运行的字符串local[*]
。上面硬编码只是作为案例,还可以通过运行Spark程序时指定。
创建StreamingContext时,会默认创建一个SparkContext(所有Spark功能的起点),可以通过这样得到ssc.sparkContext
时间间隔必须根据程序的延迟要求和集群的可用资源设置,详情可看后面的性能调优。
定义上下文后,您必须执行以下操作。
重要的点:
Discretized Stream或DStream是 Spark Streaming 提供的基本抽象。表示一个连续的数据流,可以是从源接收到的输入数据流,也可以是通过转换输入流生成的处理后的数据流。在内部,DStream 由一系列连续的 RDD 表示。DStream 中的每个 RDD 都包含来自某个区间的数据,如下图所示。
对 DStream 应用的任何操作都会转换为对底层 RDD 的操作。例如,在前面将行转换为单词的示例中,该flatMap操作应用于linesDStream 中的每个 RDD,以生成 DStream 的 wordsRDD。这如下图所示。
这些底层 RDD 转换由 Spark 引擎计算。DStream 操作隐藏了大部分这些细节,并为开发人员提供了更高级别的 API 以方便使用。
输入 DStreams 是表示从流源接收的输入数据流的 DStreams。在前面的简单案例中lines是一个输入 DStream,因为它表示从 netcat 服务器接收到的数据流。每个输入 DStream(除了文件流,本节稍后讨论)都与一个Receiver (Scala doc、 Java doc)对象相关联,该对象从源接收数据并将其存储在 Spark 的内存中以供处理。
Spark Streaming提供了两种流式数据源连接方式
Spark Streaming支持读取多个流式数据源,这样会创建多个DStream和多个接收器。要集群资源核数是否够用。调试的话,建议用local[*]
。集群运行的话,设置的核数必须大于程序中接收器的个数。
socket案例在1 里已经有了,这里不再赘述
文件流不需要接收器,所以不需要分配额外的核
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
streamingContext.textFileStream(dataDirectory)
object HdfsWordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: HdfsWordCount <directory>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
完整示例可以在spark源码库的examples\src\main\scala\org\apache\spark\examples\streaming\HdfsWordCount.scala中找到完整案例
Spark Streaming如何监控目录:
“完整”文件系统(例如 HDFS)倾向于在创建输出流后立即对其文件设置修改时间。当一个文件被打开时,甚至在数据被完全写入之前,它可能被包含在DStream- 之后在同一窗口内对文件的更新将被忽略。为确保在窗口中获取更改,请将文件写入不受监视的目录,然后在关闭输出流后立即将其重命名为目标目录。
如果重命名的文件在其创建窗口期间出现在扫描的目标目录中,则将选取新数据。相比之下,Amazon S3 和 Azure Storage 等对象存储通常具有较慢的重命名操作,因为数据实际上是复制的。此外,重命名的对象可能会将rename()操作时间作为其修改时间,因此可能不会被视为原始创建时间所暗示的窗口的一部分。需要针对目标对象存储进行仔细测试,以验证存储的时间戳行为是否与Spark Streaming 预期的一致。直接写入目标目录可能是通过所选对象存储流式传输数据的适当策略。
可以使用通过自定义接收器接收的数据流创建 DStream。有关更多详细信息,请参阅自定义接收器指南。
为了使用测试数据测试 Spark Streaming 应用程序,还可以创建基于 RDD 队列的 DStream,使用streamingContext.queueStream(queueOfRDDs). 每个推入队列的 RDD 都会在 DStream 中被视为一批数据,并像流一样进行处理。
根据其可靠性,可以有两种数据源。来源(如 Kafka 和 Flume)允许确认传输的数据。如果从这些可靠来源接收数据的系统正确确认接收到的数据,就可以确保不会因任何类型的故障而丢失数据。这导致了两种接收器:
Spark Streaming 2.4.0 与 Kafka 代理版本 0.8.2.1 或更高版本兼容。
Spark Streaming提供了2种连接模式
SparkStreaming会启动和Kafka分区数对应的Task数去读取Kafka,每个task读取对应的分区。读取到内存中后,Kafka的的一个分区数据会成为RDD的一个分区的数据。
直连模式Spark任务的偏移量存储在内存中,一旦任务挂掉就要从头开始消费,所以需要用户自己维护偏移量。读取完毕将偏移量保存到Zookeeper或Redis或HDFS,这样当任务挂掉,可以从保存位置直接读取。SparkStreaming处理间隔在1秒以上可以使用Zookeeper,小于1秒建议使用Redis。
处理流程简单,无需启动Receiver
读取数据自由,任务失败可以重新去Kafka读取
At-most-once就是最多一次。这种就是先提交偏移量后,任务挂掉。这种情况下有些数据未处理就丢失掉了,下次任务周期只会从已更新的偏移量开始消费。这就是最多一次,也就是最多只会处理一次。不在乎丢数据的情况下可以使用这种方案。
At-least-once就是最少一次。这种就是提交偏移量之前任务挂掉。这种情况一些已经处理过的数据已经消费过,但偏移量没来得及提交,所以下次任务周期就还是从上次任务的偏移量开始读,这样就出现了重复消费数据的问题。要求不能丢数据可以使用这种方案。重复消费可以通过幂等性设计来避免。
Direct模式读取数据本身就符合幂等性,Spark Streaming内部处理也复合幂等性。所以要保证幂等性的就是输出端,比如写到HDFS(HDFS写本身就具有幂等性)、Mysql、Hbase、Redis等。
通用思路:
Mysql幂等,前提要有主键和唯一索引:
接口幂等,可以通过token:
流程
问题
更新完偏移量后,如果Driver突然挂掉,数据未成功处理。下次再次执行时,读取的数据已经是新一批的数据了。这种情况就造成数据丢失了。
Kafka 项目在 0.8 和 0.10 版本之间引入了一个新的消费者 API,因此有 2 个单独的对应 Spark Streaming 包可用。0.8与0.9或0.10兼容,但0.10版的集成和之前版本不兼容。如果考虑兼容以前项目建议使用0.8,如果是新项目建议使用0.10。需要注意的是0.10版本只支持Direct直连模式,而不再支持Receivrer模式。
有关更多详细信息,参考Kafka集成指南
Spark Streaming 2.4.0 与 Flume 1.6.0 兼容。有关更多详细信息,参考Flume集成指南
与 RDD 类似,转换允许修改来自输入 DStream 的数据。DStreams 支持许多普通 Spark RDD 上可用的转换。一些常见的如下
转换操作 | 作用 |
---|---|
map(func) | 通过将源 DStream 的每个元素传递给函数func 来返回一个新的 DStream 。 |
flatMap(func) | 与 map 类似,但每个输入项可以映射到 0 个或多个输出项。 |
filter(func) | 通过仅选择func返回 true的源 DStream 的记录来返回新的 DStream 。 |
repartition(numPartitions) | 通过创建更多或更少的分区来更改此 DStream 中的并行级别。 |
union(otherStream) | 返回一个新的 DStream,它包含源 DStream 和otherDStream 中元素的联合 。 |
count() | 通过计算源 DStream 的每个 RDD 中的元素数量,返回一个新的单元素 RDD 的 DStream。 |
reduce(func) | 通过使用函数func(它接受两个参数并返回一个)聚合源 DStream 的每个 RDD 中的元素,返回一个新的单元素 RDD 的 DStream 。该函数应该是关联的和可交换的,以便它可以并行计算。 |
countByValue() | 当在类型为 K 的元素的 DStream 上调用时,返回一个 (K, Long) 对的新 DStream,其中每个键的值是它在源 DStream 的每个 RDD 中的频率。 |
reduceByKey(func, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对 DStream,其中使用给定的 reduce 函数聚合每个键的值。注意:默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,在集群模式下,数量由 config 属性决定spark.default.parallelism)进行分组。您可以传递一个可选numTasks参数来设置不同数量的任务。 |
join(otherStream, [numTasks]) | 当在 (K, V) 和 (K, W) 对的两个 DStream 上调用时,返回一个新的 (K, (V, W)) 对的 DStream,其中包含每个键的所有元素对。 |
cogroup(otherStream, [numTasks]) | 当在 (K, V) 和 (K, W) 对的 DStream 上调用时,返回 (K, Seq[V], Seq[W]) 元组的新 DStream。 |
transform(func) | 通过将 RDD-to-RDD 函数应用于源 DStream 的每个 RDD,返回一个新的 DStream。这可用于在 DStream 上执行任意 RDD 操作。 |
updateStateByKey(func) | 返回一个新的“状态”DStream,其中每个键的状态通过将给定的函数应用于键的先前状态和键的新值来更新。这可用于维护每个键的任意状态数据。 |
updateStateByKey操作允许保持任意状态,同时使用新信息不断更新它。要使用它,必须执行两个步骤。
在每个批次中,Spark会对所有的Key-Value都会应用状态更新函数,无论该批次中是否有新数据。如果针对某对Key-value状态更新函数返回None,该K-V将会被淘汰。
让我们用一个例子来说明这一点。假设要维护在文本数据流中看到的每个单词的运行计数。这里运行计数是状态,它是一个整数。我们将更新函数定义为:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
这适用于包含单词的 DStream(例如,在前面的示例中pairs包含(word, 1)对的DStream )。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
将为每个单词调用更新函数,其中newValues包含一系列 1(来自(word, 1)对)和runningCount具有先前计数的 1。
请注意,使用updateStateByKey需要配置检查点目录,这将在检查点部分详细讨论。
transform操作(及其类似的变体transformWith)允许将任意 RDD-to-RDD 函数应用于 DStream。它可用于应用任何未在 DStream API 中公开的 RDD 操作。例如,将数据流中的每个批次与另一个数据集连接的功能并未直接在 DStream API 中公开。但是,可以轻松地使用它transform来执行此操作。这实现了非常强大的可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可能由 Spark 生成)连接,然后根据它进行过滤来进行实时数据清理。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}
每个批次间隔都会调用该函数。这里允许做一些操作,比如在批处理中,操作RDD、修改分区数、使用广播变量等。
Spark Streaming 还提供窗口计算,允许在数据的滑动窗口上应用转换。下图展示了滑动窗口。
如图所示,每次窗口滑过一个源 DStream 时,落入窗口内的源 RDD 被组合并操作以产生窗口化 DStream 的 RDD。在这种特定情况下,该操作应用于最后 3 个时间单位的数据,并滑动 2 个时间单位。这说明任何窗口操作都需要指定两个参数。
这两个参数必须是源DStream的batch间隔的倍数(图中为1)。
用一个例子来说明窗口操作。假设想通过每 10 秒生成过去 30 秒数据的字数来扩展 前面的示例。为此,必须在过去 30 秒的数据reduceByKey对的pairsDStream 上应用操作(word, 1)。使用reduceByKeyAndWindow操作实现。
// 每10秒聚合过去30秒的数据
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些常见的窗口操作如下。所有这些操作都采用上述两个参数 windowLength和slideInterval。
转换算子 | 作用 |
---|---|
window(windowLength, slideInterval) | 返回一个新的 DStream,它是根据源 DStream 的窗口批次计算的。 |
countByWindow(windowLength, slideInterval) | 每隔一段时间(滑动间隔),返回指定窗口间隔(窗口长度)中元素的个数 |
reduceByWindow(func, windowLength, slideInterval) | 返回一个新的单元素流,它是通过使用func在滑动间隔内聚合流中的元素而创建的。该函数应该是关联的和可交换的,以便它可以被正确地并行计算。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对 DStream,其中每个键的值使用给定的 reduce 函数func 在滑动窗口中按批次聚合。注意:默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,在集群模式下,数量由 config 属性决定spark.default.parallelism)进行分组。可以传递一个可选 numTasks参数来设置不同数量的任务。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 是上面reduceByKeyAndWindow的更高效的版本。不会每次重新计算每个窗口的数据。而是使用上一个窗口的数据减去离开窗口的数据,加上新进入窗口的数据计算的。但是,它仅适用于“可逆减函数(invertible reduce functions)”,即具有相应“反减”功能的减函数(作为参数invFunc)。 像reduceByKeyAndWindow一样,通过可选参数可以配置reduce任务的数量。请注意,必须启用检查点才能使用此操作。 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, Long) 对 DStream,其中每个键的值是其在滑动窗口内的频率。与reduceByKeyAndWindow一样,reduce 任务的数量可通过可选参数进行配置。 |
流和流的join很简单
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
在这里,在每个批次间隔中,由stream1生成的 RDD将与由stream2生成的 RDD 连接。也可以做leftOuterJoin, rightOuterJoin, fullOuterJoin。此外,在流的窗口上进行连接通常非常有用。这也很容易。
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
前面transform操作已经有过案例,这里是另一个案例
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
事实上,还可以动态更改要加入的数据集。提供给的函数在transform每个批次间隔进行评估,因此将使用dataset引用指向的当前数据集。
API 文档中提供了 DStream 转换的完整列表。对于 Scala API,参考DStream 和PairDStreamFunctions。有关 Java API,参考JavaDStream 和JavaPairDStream。有关 Python API,请参阅DStream。
输出操作允许将 DStream 的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统使用转换后的数据,因此它们会触发所有 DStream 转换的实际执行(类似于 RDD 的操作)。目前,定义了以下输出操作:
输出操作 | 作用 |
---|---|
print() | 在运行流应用程序的驱动程序节点上的 DStream 中打印每批数据的前十个元素。这对于开发和调试很有用。 |
saveAsTextFiles(prefix, [suffix]) | 将此 DStream 的内容保存为文本文件。每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix-TIME_IN_MS[.suffix]”。 |
saveAsObjectFiles ( prefix , [ suffix ]) | 将此 DStream 的内容保存为SequenceFiles序列化的 Java 对象。每个批处理间隔的文件名是根据前缀和 后缀生成的:“prefix-TIME_IN_MS[.suffix]”。 |
saveAsHadoopFiles ( prefix , [ suffix ]) | 将此 DStream 的内容保存为 Hadoop 文件。每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix-TIME_IN_MS[.suffix]”。 |
foreachRDD(func) | 将函数func应用于从流生成的每个 RDD的最通用的输出运算符。这个函数应该将每个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件,或者通过网络将其写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中包含 RDD 操作,这些操作将强制计算流 RDD。 |
#### 5.1 使用 foreachRDD 的设计模式 | |
为避免为每条记录创建和销毁连接对象,建议使用foreachPartition来创建连接。使得每个分区的数据使用一个连接。可以类似这样 |
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
可以通过跨多个 RDD/batch 重用连接对象来进一步优化。可以维护一个静态的连接对象池,当多个批次的 RDD 被推送到外部系统时可以重用,从而进一步减少开销。类似这样
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
请注意,池中的连接应该按需延迟创建,如果一段时间不使用则超时。这样可以最有效地将数据发送到外部系统。
其他要点:
可以轻松地对流数据使用DataFrames 和 SQL操作。必须使用 StreamingContext 正在使用的 SparkContext 创建一个 SparkSession。此外,必须这样做才能在驱动程序故障时重新启动。这是通过创建一个延迟实例化的 SparkSession 单例实例来完成的。这在以下示例中显示。它修改了前面的wordcount示例以使用 DataFrames 和 SQL 生成字数。每个 RDD 被转换为一个 DataFrame,注册为临时表,然后使用 SQL 进行查询。
/** 在流式处理中使用DataFrame和SQL */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// 获取单例的SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// 将String类型RDD转换为DataFrame
val wordsDataFrame = rdd.toDF("word")
// 创建一个临时视图
wordsDataFrame.createOrReplaceTempView("words")
// 使用SQL做wordcount操作,然后打印结果
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
完整源码
还可以对在来自不同线程的流数据上定义的表运行 SQL 查询(即与正在运行的 StreamingContext 异步)。只需确保将 StreamingContext 设置为记住足够数量的流数据,以便查询可以运行。否则,不知道任何异步 SQL 查询的 StreamingContext 将在查询完成之前删除旧的流数据。例如,如果您想查询最后一批,但您的查询可能需要 5 分钟才能运行,然后调用streamingContext.remember(Minutes(5))(在 Scala 中,或其他语言中的等效项)。
可以参考三、Spark SQL、DataFrame、Dataset
与 RDD 类似,DStreams 也允许开发人员将流的数据保存在内存中。也就是说,persist()在 DStream 上使用该方法将自动将该 DStream 的每个 RDD 持久化在内存中。如果 DStream 中的数据将被多次计算(例如,对同一数据进行多次操作),这将非常有用。对于像reduceByWindow和这样的基于窗口的操作和 像 那样reduceByKeyAndWindow的基于状态的操作updateStateByKey,这是隐含的。因此,由基于窗口的操作生成的 DStream 会自动保存在内存中,无需开发人员调用persist().
对于通过网络接收数据的输入流(如 Kafka、Flume、sockets 等),默认的持久化级别设置为将数据复制到两个节点以实现容错。
与 RDD 不同,DStreams 的默认持久化级别将数据序列化在内存中。
流式应用程序必须 24/7 全天候运行,因此必须能够应对与应用程序逻辑无关的故障(例如,系统故障、JVM 崩溃等)。为了使这成为可能,Spark Streaming 需要将足够的信息检查点到容错存储系统,以便它可以从故障中恢复。检查点有两种类型的数据。
总而言之,元数据检查点主要用于从驱动程序故障中恢复,而如果使用有状态转换,即使对于基本功能,数据或 RDD 检查点也是必要的。
必须为具有以下任何要求的应用程序启用检查点:
请注意,没有上述状态转换的简单流应用程序可以在不启用检查点的情况下运行。在这种情况下,驱动程序故障的恢复也将是部分的(一些已接收但未处理的数据可能会丢失)。这通常是可以接受的,许多人以这种方式运行 Spark Streaming 应用程序。对非 Hadoop 环境的支持有望在未来得到改善。
可以通过在容错、可靠的文件系统(例如 HDFS、S3 等)中设置一个目录来启用检查点,检查点信息将保存到该目录中。这是通过使用streamingContext.checkpoint(checkpointDirectory). 这将允许您使用上述有状态转换。此外,如果想让应用程序从驱动程序故障中恢复,应该重写流应用程序以具有以下行为。
使用StreamingContext.getOrCreate可以更简单一些
// 创建新StreamingContext实例的方法
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// 从checkpoint获取StreamingContext或者创建一个新的StreamingContext
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// 配置需要配置的项
// 不论第一次启动还是重启
context. ...
// 启动context
context.start()
context.awaitTermination()
如果checkpointDirectory存在,则将从检查点数据重新创建上下文。如果该目录不存在(即第一次运行),则将functionToCreateContext调用该函数以创建新的上下文并设置 DStream。参考 Scala 示例 RecoverableNetworkWordCount。此示例将网络数据的字数附加到文件中。
除了使用getOrCreate一个还需要确保驱动程序进程在失败时自动重新启动。这只能由运行应用程序的部署基础结构来完成。具体参考部署部分。
请注意,RDD 的检查点会产生保存到可靠存储的成本。这可能会导致 有检查点的RDD 的那些批次的处理时间增加。因此,需要仔细设置检查点的间隔。在小批量(比如 1 秒)下,每批检查点可能会显着降低操作吞吐量。相反,检查点太少会导致谱系和任务大小增加,这可能会产生不利影响。对于需要 RDD 检查点的有状态转换,默认间隔是批处理间隔的倍数,至少为 10 秒。可以使用dstream.checkpoint(checkpointInterval)进行设置 。通常,DStream 的 5 - 10 个滑动间隔的检查点间隔是一个很好的尝试设置。
无法从 Spark Streaming 中的检查点恢复累加器和广播变量。如果启用检查点并使用 累加器或广播变量 ,则必须为累加器和广播变量创建延迟实例化的单例实例, 以便在驱动程序出现故障后重新启动后重新实例化它们。以下示例中展示了这一点。
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// 获取或注册一个黑名单广播变量
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// 获取或注册一个移除单词个数的累加器
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
查看完整源码
部署Spark Streaming程序的步骤
要运行 Spark Streaming 应用程序,需要具备以下条件。
如果需要使用新的应用程序代码升级正在运行的 Spark Streaming 应用程序,则有两种可能的机制。
除了 Spark 的监控功能之外,还有其他特定于 Spark Streaming 的功能。使用 StreamingContext 时, Spark Web UI 会显示一个附加Streaming选项卡,其中显示有关正在运行的接收器(接收器是否处于活动状态、接收到的记录数、接收器错误等)和已完成批次(批处理时间、排队延迟等)的统计信息。 )。这可用于监视流应用程序的进度。
Web UI 中的以下两个指标特别重要:
如果批处理时间始终大于批处理间隔和/或排队延迟不断增加,则表明系统无法像生成批处理一样快地处理批处理,并且正在落后。在这种情况下,请考虑 减少批处理时间。
也可以使用StreamingListener接口监控 Spark Streaming 程序的进度,该接口允许获取接收器状态和处理时间。请注意,这是一个开发人员 API,将来可能会对其进行改进(即报告更多信息)。
从集群上的 Spark Streaming 应用程序中获得最佳性能需要进行一些调整。本节介绍了许多可以调整以提高应用程序性能的参数和配置。在高层次上,需要考虑两件事:
可以在 Spark 中进行许多优化以最小化每个批次的处理时间。这里可以参考通用-性能调优。
通过网络接收数据(如 Kafka、Flume、socket 等)需要将数据反序列化并存储在 Spark 中。如果数据接收成为系统的瓶颈,则考虑并行化数据接收。请注意,每个输入 DStream 都会创建一个接收器(在工作机器上运行),用于接收单个数据流。因此,可以通过创建多个输入 DStream 并将它们配置为从源接收数据流的不同分区来实现接收多个数据流。例如,接收两个主题数据的单个 Kafka 输入 DStream 可以拆分为两个 Kafka 输入流,每个输入流仅接收一个主题。这将运行两个接收器,允许并行接收数据,从而提高整体吞吐量。这些多个 DStream 可以结合在一起以创建单个 DStream。然后可以将应用于单个输入 DStream 的转换应用于统一流。案例:
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另一个应该考虑的参数是接收方的块间隔,由配置参数 spark.streaming.blockInterval决定. 对于大多数接收器,接收到的数据在存储在 Spark 的内存中之前被合并成数据块。每批中的块数决定了将用于在类似地图的转换中处理接收到的数据的任务数。每批每个接收器的任务数约为(批间隔/块间隔)。例如,200 ms 的块间隔将每 2 秒创建 10 个任务批次。如果任务数量太少(即小于每台机器的核心数量),那么效率会很低,因为所有可用的核心都不会用于处理数据。要增加给定批处理间隔的任务数,请减少块间隔。但是推荐的block interval最小值为50ms左右,低于这个值可能会导致任务启动开销问题。
如果在计算的任何阶段使用的并行任务数量不够多,则集群资源可能未得到充分利用。对于像reduceByKey和reduceByKeyAndWindow这样的分布式reduce操作,并行任务的默认数量由spark.default.parallelism 配置属性控制。以将并行级别作为参数传递(请参阅 PairDStreamFunctions 文档),或设置spark.default.parallelism 配置属性以更改默认值。
通过调整序列化格式可以减少数据序列化的开销。在流的情况下,有两种类型的数据正在被序列化。
在这两种情况下,使用 Kryo 序列化都可以减少 CPU 和内存开销。有关更多详细信息,可以参考Spark 调优指南。对于 Kryo,应该注册自定义类,并禁用对象引用跟踪(请参阅配置指南中的 Kryo 相关配置)。
在流应用程序需要保留的数据量不大的特定情况下,将数据(两种类型)作为反序列化对象持久化而不产生过多的 GC 开销可能是可行的。例如,如果您使用几秒钟的批处理间隔并且没有窗口操作,那么您可以尝试通过相应地显式设置存储级别来禁用持久数据中的序列化。这将减少由于序列化而导致的 CPU 开销,从而潜在地提高性能而不会产生过多的 GC 开销。
如果每秒启动的任务数量很高(例如,每秒 50 个或更多),那么向从站发送任务的开销可能很大,并且很难实现亚秒级延迟。此时可以减少任务数量,也就是减少并行度。
这样可能会将处理时间减少一些,从而使亚秒级的批处理大小可行。
了使在集群上运行的 Spark Streaming 应用程序稳定,系统应该能够在接收数据时尽快处理数据。换句话说,批量数据的处理速度应与生成它们的速度一样快。可以通过在流式 Web UI 中监视处理时间来确定应用程序是否如此 ,其中批处理时间应小于批处理间隔。
根据流计算的性质,所使用的批处理间隔可能会对应用程序在一组固定集群资源上维持的数据速率产生重大影响。例如,让我们考虑较早的 WordCountNetwork 示例。对于特定的数据速率,系统可能能够每 2 秒(即 2 秒的批处理间隔)跟上报告字数,但不是每 500 毫秒。因此,需要设置批处理间隔,以便能够维持生产中的预期数据速率。
确定应用程序正确批处理大小的一个好方法是使用保守的批处理间隔(例如 5-10 秒)和低数据速率对其进行测试。要验证系统是否能够跟上数据速率,您可以检查每个处理批次所经历的端到端延迟的值(在 Spark 驱动程序 log4j 日志中查找“总延迟”,或使用 流媒体监听器 界面)。如果延迟保持与批量大小相当,则系统是稳定的。否则,如果延迟不断增加,则表示系统跟不上,因此不稳定。一旦您有了稳定配置的想法,您就可以尝试增加数据速率和/或减少批量大小。请注意,由于临时数据速率增加而导致的瞬时延迟增加可能没问题,只要延迟减少回较低的值(即小于批量大小)。
Spark Streaming 应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,如果您想对最近 10 分钟的数据使用窗口操作,那么您的集群应该有足够的内存来在内存中保存 10 分钟的数据。或者如果要使用updateStateByKey大量键,那么必要的内存会很高。反之,如果要做简单的map-filter-store操作,那么所需的内存就会很低。
一般情况下,由于通过接收器接收到的数据存储在 StorageLevel.MEMORY_AND_DISK_SER_2 中,因此无法放入内存的数据会溢出到磁盘。这可能会降低流式应用程序的性能,因此建议您根据流式应用程序的需要提供足够的内存。最好尝试在小范围内查看内存使用情况并进行相应估计。
内存调优的另一个方面是垃圾收集。对于需要低延迟的流式应用来说,JVM 垃圾回收导致的??大停顿是不可取的。
有几个参数可以帮助调整内存使用和 GC 开销:
DStreams 的持久化级别:如前面数据序列化部分所述,默认情况下输入数据和 RDD 作为序列化字节持久化。与反序列化持久化相比,这减少了内存使用和 GC 开销。启用 Kryo 序列化可进一步减少序列化大小和内存使用量。可以通过压缩(请参阅 Spark 配置spark.rdd.compress)以 CPU 时间为代价进一步减少内存使用。
清除旧数据:默认情况下,所有输入数据和 DStream 转换生成的持久化 RDD 都会自动清除。Spark Streaming 根据使用的转换决定何时清除数据。例如,如果您使用 10 分钟的窗口操作,那么 Spark Streaming 将保留最近 10 分钟的数据,并主动丢弃旧数据。通过设置 ,可以将数据保留更长的时间(例如交互式查询旧数据)streamingContext.remember。
CMS 垃圾收集器:强烈建议使用并发标记和清除 GC 以保持与 GC 相关的暂停始终较低。尽管已知并发 GC 会降低系统的整体处理吞吐量,但仍建议使用它来实现更一致的批处理时间。确保在驱动程序(使用–driver-java-optionsin spark-submit)和执行程序(使用Spark 配置 spark.executor.extraJavaOptions)上都设置了 CMS GC 。
其他提示:为了进一步减少 GC 开销,这里有一些更多的提示可以尝试。
内存相关的要点:
Spark Streaming 应用程序在发生故障时的行为
为了理解 Spark Streaming 提供的语义,让我们记住 Spark 的 RDD 的基本容错语义。
Spark 对 HDFS 或 S3 等容错文件系统中的数据进行操作。因此,从容错数据生成的所有 RDD 也是容错的。但是,Spark Streaming 的情况并非如此,因为在大多数情况下,数据是通过网络接收的(使用时除外 fileStream)。为了为所有生成的 RDD 实现相同的容错特性,接收到的数据在集群中工作节点的多个 Spark 执行器之间复制(默认复制因子为 2)。这导致系统中有两种数据需要在发生故障时恢复:
此外,还有两种故障值得我们关注:
有了这些基础知识,让我们来了解一下 Spark Streaming 的容错语义。
流系统的语义通常根据系统可以处理每条记录的次数来捕获。系统可以在所有可能的操作条件下(尽管出现故障等)提供三种类型的保证
在任何流处理系统中,从广义上讲,处理数据都分为三个步骤。
如果流应用程序必须实现端到端的恰好一次保证,那么每个步骤都必须提供一个恰好一次的保证。也就是说,每条记录必须只接收一次,只转换一次,并且只推送到下游系统一次。让我们在 Spark Streaming 的上下文中理解这些步骤的语义。
不同的输入源提供不同的保证,范围从至少一次到恰好一次。阅读更多详情。
如果所有输入数据都已经存在于像 HDFS 这样的容错文件系统中,Spark Streaming 总是可以从任何故障中恢复并处理所有数据。这给出 了一次性语义,这意味着无论什么失败,所有数据都将被处理一次。
对于基于接收器的输入源,容错语义取决于故障场景和接收器的类型。正如我们之前讨论的,有两种类型的接收器:
根据使用的接收器类型,我们实现了以下语义。如果工作节点发生故障,那么可靠的接收器不会丢失数据。对于不可靠的接收器,接收到但未复制的数据可能会丢失。如果驱动节点发生故障,那么除了这些损失之外,所有过去在内存中接收和复制的数据都将丢失。这将影响有状态转换的结果。
为了避免丢失过去接收到的数据,Spark 1.2 引入了预写日志,将接收到的数据保存到容错存储中。通过启用预写日志和可靠的接收器,数据丢失为零。在语义方面,它提供了至少一次的保证。
带有预写日志的 Spark 1.2 或更高版本,Worker节点可以支持可靠接收器的零数据丢失
实现至少一次语义,Driver支持可靠的接收器和文件实现零数据丢失的至少一次语义
在 Spark 1.3 中,我们引入了一个新的 Kafka Direct API,它可以确保 Spark Streaming 只接收一次所有 Kafka 数据。除此之外,如果您实现恰好一次输出操作,则可以实现端到端的恰好一次保证。有关更多详细信息,参考Kafka集成指南
输出操作(如foreachRDD)具有至少一次语义,也就是说,如果发生工作故障,转换后的数据可能会多次写入外部实体。虽然这对于使用saveAs***Files操作保存到文件系统是可以接受的 (因为文件只会被相同的数据覆盖),但可能需要额外的努力来实现精确一次语义。有两种方法。
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// 使用这个uniqueId唯一id事务性的提交整个分区的数据
}
}
Structured Streaming 是一种基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎。您可以像在静态数据上表达批处理计算一样表达流式计算。Spark SQL 引擎将负责以增量方式连续运行它,并随着流数据的不断到达更新最终结果。您可以使用Scala、Java、Python 或 R 中的Dataset/DataFrame API来表达流聚合、事件时间窗口、流到批处理连接等。计算在同一个优化的 Spark SQL 引擎上执行。最后,系统通过检查点和预写日志确保端到端的一次性容错保证。简而言之,Structured Streaming 提供快速、可扩展、容错、端到端的一次性流处理,用户无需对流进行推理。
在内部,默认情况下,Structured Streaming 查询使用微批处理引擎进行处理,该引擎将数据流处理为一系列小批量作业,从而实现低至 100 毫秒的端到端延迟和仅一次容错保证. 但是,从 Spark 2.3 开始,Spark引入了一种新的低延迟处理模式,称为连续处理,它可以在至少一次保证的情况下实现低至 1 毫秒的端到端延迟。在不更改查询中的 Dataset/DataFrame 操作的情况下,可以根据的应用程序要求选择模式。
这部分主要介绍编程模型和 API,主要使用默认的微批处理模型来解释这些概念。然后再讨论连续处理模型。首先,让我们从结构化流查询的简单示例WordCount开始。
假设要维护从侦听 TCP 套接字的数据服务器接收的文本数据的运行字数。让我们看看如何使用结构化流来表达这一点。可以在Scala / Java / Python 中查看完整代码 。如果下载 Spark,则可以直接运行该示例。无论如何,让我们一步一步地浏览示例并了解它是如何工作的。首先,我们必须导入必要的类并创建一个本地 SparkSession,这是与 Spark 相关的所有功能的起点。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
//创建一个流式 DataFrame,它表示从侦听 localhost:9999 的服务器接收的文本数据
//转换 DataFrame 以计算字数。
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 将一行切割成单词
val words = lines.as[String].flatMap(_.split(" "))
// 统计word个数
val wordCounts = words.groupBy("value").count()
这个lines DataFrame 表示一个包含流文本数据的无界表。该表包含一列名为“value”的字符串,流文本数据中的每一行都成为表中的一行。请注意,这当前没有接收任何数据,因为我们只是在设置转换,还没有启动它。接下来,我们使用 将 DataFrame 转换为 String 的 Dataset .as[String],以便我们可以应用flatMap操作将每一行拆分为多个单词。结果words数据集包含所有单词。最后,我们wordCounts通过按 Dataset 中的唯一值进行分组并对其进行计数来定义DataFrame。请注意,这是一个流数据帧,它表示流的运行字数。
我们现在已经设置了对流数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们将其设置outputMode(“complete”)为在每次更新时将完整的计数集(由 指定)打印到控制台。然后使用start().
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
执行此代码后,流式计算将在后台启动。该query对象是该活动流查询的句柄,使用 awaitTermination() 等待查询终止,以防止进程在查询处于活动状态时退出。
要实际执行此示例代码,可以在自己的Spark 应用程序中编译代码,也可以在 下载 Spark 后直接 运行该示例。这里展示的是后者。首先需要使用 Netcat(在大多数类 Unix 系统中的一个小实用程序,windows也有类似工具)作为数据服务器运行
$ nc -lk 9999
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
apache spark
apache hadoop
...
使用以下命令启动示例
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
# TERMINAL 2: RUNNING StructuredNetworkWordCount
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+-----+
...
Structured Streaming 的关键思想是将实时数据流视为一个不断追加的表。这导致了一种与批处理模型非常相似的新流处理模型。将流计算表示为标准的类似批处理的查询,就像在静态表上一样,Spark在无界输入表上将其作为增量查询运行。让我们更详细地了解这个模型。
将输入数据流视为“输入表”。到达流的每个数据项就像是添加到输入表的新行。
基于输入数据的查询将生成“结果表”。每个触发间隔(例如,每 1 秒),新行都会附加到输入表,最终更新结果表。每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
“输出”定义为写入外部存储的内容。输出可以定义为不同的模式:
请注意,每种模式都适用于某些类型的查询。这将在后面详细讨论。
为了说明该模型的使用,让我们在上面的快速示例的上下文中理解该模型。第一个lines DataFrame是输入表,最后一个wordCounts DataFrame是结果表。需要注意的是在流媒体的查询lines数据帧生成wordCounts是完全一样的,因为它是一个静态的数据帧。然而,当这个查询开始时,Spark 会不断检查来自套接字连接的新数据。如果有新数据,Spark 将运行一个“增量”查询,将之前的运行计数与新数据结合起来计算更新的计数,如下所示。
请注意,Structured Streaming 不会具体化整个表。它从流数据源读取最新的可用数据,增量处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。
该模型与许多其他流处理引擎明显不同。很多要求用户自己维护正在运行的聚合,因此必须考虑容错和数据一致性(至少一次,或最多一次,或恰好一次)。在这个模型中,Spark 负责在有新数据时更新 Result Table,从而免除用户的推理。
事件时间是嵌入数据本身的时间。对于许多应用程序,可能希望在此事件时间上进行操作。比如你想获取IoT设备每分钟产生的事件数,那么你可能想使用数据产生的时间(即数据中的event-time),而不是Spark接收他们的时间。这个事件时间在这个模型中非常自然地表达——来自设备的每个事件都是表中的一行,而事件时间是该行中的一个列值。这允许基于窗口的聚合(例如每分钟的事件数)只是事件时间列上的一种特殊类型的分组和聚合——每个时间窗口都是一个组,每一行都可以属于多个窗口/组。
此外,该模型自然会根据其事件时间处理比预期晚到达的数据。由于 Spark 正在更新结果表,因此它可以完全控制在有延迟数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。从 Spark 2.1 开始,我们支持水印,它允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。这些将在稍后的窗口操作部分中进行更详细的解释。
提供端到端的恰好一次语义是结构化流设计背后的关键目标之一。为了实现这一点,我们设计了结构化流源、接收器和执行引擎来可靠地跟踪处理的确切进度,以便它可以通过重新启动和/或重新处理来处理任何类型的故障。假设每个流源都有偏移量(类似于 Kafka 偏移量或 Kinesis 序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移范围。流接收器被设计为幂等处理重复处理。结合使用可重放源和幂等接收器,在任何故障下结构化流可以确保端到端的恰好一次语义 。
从 Spark 2.0 开始,DataFrames 和 Datasets 可以表示静态的、有界的数据,以及流式的、无界的数据。与静态数据集/数据帧类似,可以使用通用入口点SparkSession (Scala / Java / Python 文档)从流源创建流数据帧/数据集,并对它们应用与静态数据帧/数据集相同的操作。
Streaming DataFrame可以通过SparkSession.readStream()返回的 DataStreamReader接口(Scala / Java / Python docs)创建
有一些内置源。
某些源不具有容错能力,因为它们不能保证在发生故障后可以使用检查点偏移量重放数据。请参阅前面关于容错语义的部分。以下是 Spark 中所有源的详细信息。
参数
true
,以下文件将被视为同一文件,因为它们的文件名“dataset.txt”是相同的:对于特定于文件格式的选项,DataStreamReader接口(Scala / Java / Python docs)。例如,对于parquet格式选项,请参阅DataStreamReader.parquet()。
此外,还有会影响某些文件格式的会话配置。有关更多详细信息,请参阅Spark SQL部分。例如,对于“parquet”,请参阅Parquet 配置部分。
读取文件源的案例
val spark: SparkSession = ...
// Read text from socket
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
socketDF.isStreaming // 如果有流式数据返回true
socketDF.printSchema
// 从文件夹中自动读取所有csv文件
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // 指定csv文件的schema
.csv("/path/to/directory") // 等价于
format("csv").load("/path/to/directory")
Structured Streaming集成Kafka
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version> 2.4.0</version>
</dependency>
流式读取
// 从一个topic读取
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 从多个topic读取
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 使用模式匹配读取
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
批量读取
如果使用场景适合批量读取,也可以根据偏移量范围创建DataFrame和DataSet
// 从一个topic读取,默认是从开始偏移量到结尾偏移量
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 从多个topic读取,指定偏移量范围
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 根据模式匹配读取topic, 偏移量从开始到结束
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
源中的每一行都具有以下schema
列名 | 类型 |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
必须为 Kafka 源为批处理和流查询设置以下选项。
选项 | 值 | 意义 |
---|---|---|
assign | json string {“topicA”:[0,1],“topicB”:[2,4]} | 要使用的特定主题分区。只能为 Kafka 源指定“assign”、“subscribe”或“subscribePattern”选项之一。 |
subscribe | 以逗号分隔的主题列表 | 要订阅的主题列表。只能为 Kafka 源指定“assign”、“subscribe”或“subscribePattern”选项之一。 |
subscribePattern | Java 正则表达式字符串 | 用于订阅主题的模式。只能为 Kafka 源指定“assign、“subscribe”或“subscribePattern”选项之一。 |
kafka.bootstrap.servers | 逗号分隔的主机列表:端口 | Kafka“bootstrap.servers”配置。 |
可选配置
选项 | 值 | 默认 | 查询类型 | 意义 |
---|---|---|---|---|
startingOffsets | “earliest”、“latest”(仅限流媒体)或 json 字符串 “”" {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-2} } “”" | latest用于流式传输,earliest用于批处理 | 流和批处理 | 查询开始时的起始点,可以是来自最早偏移量的“earliest”、仅来自最新偏移量的“latest”,或者是指定每个 TopicPartition 的起始偏移量的 json 字符串。在json中,-2作为偏移量可以用来指最早的,-1指的是最新的。注意:对于批量查询,latest(隐式或在 json 中使用 -1)是不允许的。对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询停止的位置开始。查询期间新发现的分区将最早开始。 |
endingOffsets | 最新或 json 字符串 {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}} | latest | 批量查询 | 批处理查询结束时的结束点,可以是“最新”,它只是指最新的,或者是指定每个 TopicPartition 的结束偏移量的 json 字符串。在json中,-1作为偏移量可以用来引用latest,-2(earlyest)作为偏移量是不允许的。 |
failOnDataLoss | true 或者 false | true | 流查询 | 当数据可能丢失(例如,主题被删除,或偏移量超出范围)时是否使查询失败。这可能是误报。当它不能按预期工作时,您可以禁用它。如果由于丢失数据而无法从提供的偏移量中读取任何数据,则批处理查询将始终失败。 |
kafkaConsumer.pollTimeoutMs | long | 512 | 流和批处理 | 在执行程序中从 Kafka 轮询数据的超时时间(以毫秒为单位)。 |
fetchOffset.numRetries | int | 3 | 流和批处理 | 在放弃获取 Kafka 偏移量之前重试的次数。 |
fetchOffset.retryIntervalMs | long | 10 | 流和批处理 | 在重试获取 Kafka 偏移量之前等待的毫秒数 |
maxOffsetsPerTrigger | long | none | 流和批处理 | 每个触发间隔处理的最大偏移量的速率限制。指定的总偏移量将在不同卷的主题分区之间按比例分配。 |
请注意,Apache Kafka 仅支持至少一次写入语义。因此,在向 Kafka 写入(流式查询或批量查询)时,某些记录可能会重复;例如,如果 Kafka 需要重试一条未被 Broker 确认的消息,即使该 Broker 接收并写入了消息记录,也会发生这种情况。由于这些 Kafka 写语义,结构化流无法防止此类重复的发生。但是,如果写入查询成功,那么您可以假设查询输出至少写入了一次。在读取写入的数据时删除重复项的可能解决方案可能是引入一个主(唯一)键,该键可用于在读取时执行重复数据删除。
写入 Kafka 的数据帧应该在架构中具有以下列:
列 | 类型 |
---|---|
key (可选的) | string or binary |
value 必须的) | string or binary |
topic (*可选的) | string |
值列是唯一必需的选项。如果未指定null键列,则将自动添加值键列(请参阅有关如何null处理值键值的Kafka 语义)。如果存在主题列,则在将给定行写入 Kafka 时将其值用作主题,除非设置了“主题”配置选项,即“主题”配置选项覆盖主题列。
必须为 批处理和流查询的 Kafka 接收器设置以下选项。
Option | value | meaning |
---|---|---|
kafka.bootstrap.servers | 逗号分隔的主机列表:端口 | Kafka“bootstrap.servers”配置。 |
以下配置是可选的: |
Option | value | default | query type | meaning |
---|---|---|---|---|
topic | string | none | streaming and batch | 设置所有行将写入 Kafka 的主题。此选项会覆盖数据中可能存在的任何主题列。 |
流式数据写入Kafka
// 从DataFrame写key-value数据到Kafka,通过option指定topic
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 从DataFrame写key-value数据到Kafka,数据中指定topic
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
将批量查询的输出写入 Kafka
// 从DataFrame写key-value数据到Kafka,通过option指定topic
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// 从DataFrame写key-value数据到Kafka,数据中指定topic
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
卡夫卡自己的配置可以通过设置DataStreamReader.option与kafka.前缀,例如 stream.option(“kafka.bootstrap.servers”, “host:port”)。对于可能的 kafka 参数,有关读取数据的参数参考Kafka 消费者配置文档, 有关写入数据的参数参考 Kafka生产者配置文档。
请注意,无法设置以下 Kafka 参数,并且 Kafka 源或接收器将抛出异常:
部署时要注意,必须把spark-sql-kafka-0-10_2.1打到依赖里,否则需要显示指定。
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 ...
rowsPerSecond(例如 100,默认值:1):每秒应生成多少行。
rampUpTime(例如 5s,默认值:0s):在生成速度变为 之前需要多长时间斜坡上升rowsPerSecond。使用比秒更细的粒度将被截断为整数秒。
numPartitions(eg 10, default: Spark’s default parallelism): 生成行的分区号。
源将尽力达到rowsPerSecond,但查询可能受到资源限制,并且numPartitions可以进行调整以帮助达到所需的速度。
默认情况下,来自基于文件的源的结构化流需要您指定架构,而不是依赖 Spark 自动推断它。此限制确保一致的架构将用于流式查询,即使在失败的情况下也是如此。对于临时用例,您可以通过设置spark.sql.streaming.schemaInference
为来重新启用模式推断true
。
当指定的子目录/key=value/
存在并且列表将自动递归到这些目录中时,确实会发生分区发现。如果这些列出现在用户提供的模式中,Spark 将根据正在读取的文件的路径填充它们。组成分区方案的目录在查询开始时必须存在并且必须保持静态。例如,存在/data/year=2016/
时添加是可以的/data/year=2015/
,但更改分区列(即通过创建目录/data/date=2016-04-17/
)是无效的。
可以在流式数据帧/数据集上应用各种操作——从无类型、类似 SQL 的操作(例如select
、where
、groupBy
)到类型化的 RDD 类操作(例如map
、filter
、flatMap
)。有关更多详细信息,请参阅Spark SQL部分。让我们看一下可以使用的一些示例操作。
DataFrame/Dataset 上的大多数常见操作都支持流式传输。本节稍后将讨论少数不受支持的操作。
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // IOT设备的流式数据,schema为 { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // IOT 设备的流式数据
// 查询信号数大于10的设备
df.select("device").where("signal > 10") // 使用弱类型 API
ds.filter(_.signal > 10).map(_.device) // 使用强类型 API
// 获得各种设备类型的个数
df.groupBy("deviceType").count() // 使用弱类型 API
// 获得每种设备的平均数
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // 使用强类型 API
还可以将流式数据帧/数据集注册为临时视图,然后对其应用 SQL 命令。
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates") // 返回sql查询结果的DataFrame
请注意,可以使用df.isStreaming
来判断是否是流。
df.isStreaming
滑动事件时间窗口上的聚合对于结构化流很简单,并且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。在基于窗口的聚合的情况下,为行的事件时间所属的每个窗口维护聚合值。让我们通过一个插图来理解这一点。
想象一下我们的快速示例被修改,流现在包含行以及生成行的时间。我们不想计算字数,而是要计算 10 分钟窗口内的字数,每 5 分钟更新一次。也就是说,在 10 分钟窗口 12:00 - 12:10、12:05 - 12:15、12:10 - 12:20 等之间接收到的字数中的字数。请注意,12:00 - 12:10 表示数据在 12:00 之后但在 12:10 之前到达。现在,考虑在 12:07 收到的一个词。这个字应该增加对应于两个窗口 12:00 - 12:10 和 12:05 - 12:15 的计数。因此计数将由分组键(即单词)和窗口(可以从事件时间计算)两者进行索引。
结果表将类似于以下内容。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WbQ6q4LX-1631754842072)(http://spark.apache.org/docs/2.4.0/img/structured-streaming-window.png)]
由于这种加窗类似于分组,在代码中,可以使用groupBy()
和window()
操作来表达加窗聚合。您可以在Scala / Java / Python 中查看以下示例的完整代码 。
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// 根据窗口和word聚合 计算每组的word个数
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
现在考虑如果其中一个事件迟到应用程序会发生什么。例如,假设在 12:04(即事件时间)生成的单词可以在 12:11 被应用程序接收。应用程序应该使用时间 12:04 而不是 12:11 来更新窗口12:00 - 12:10
的旧计数。这在我们基于窗口的分组中很自然地发生——结构化流可以在很长一段时间内保持部分聚合的中间状态,以便后期数据可以正确更新旧窗口的聚合,如下图所示。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x1TYbuIa-1631754842075)(http://spark.apache.org/docs/2.4.0/img/structured-streaming-late-data.png)]
但是,要运行此查询数天,系统必须限制其累积的中间内存状态量。这意味着系统需要知道何时可以从内存状态中删除旧聚合,因为应用程序将不再接收该聚合的迟到数据。为了实现这一点,在 Spark 2.1 中,引入了 水印,它可以让引擎自动跟踪数据中的当前事件时间并尝试相应地清理旧状态。您可以通过指定事件时间列和数据在事件时间方面的预期延迟阈值来定义查询的水印。对于从T
时间开始的特定窗口,引擎将保持状态并允许延迟数据更新状态直到(max event time seen by the engine - late threshold > T)
. 换句话说,阈值内的迟到数据将被聚合,但迟于阈值的数据将开始被丢弃( 有关确切保证,请参阅本节后面的部分)。让我们通过一个例子来理解这一点。我们可以使用withWatermark()
如下所示轻松地在前面的示例中定义水印。
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// 根据窗口和word聚合 计算每组的word个数
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
在这个例子中,我们在timestamp列的值上定义查询的水印,并定义“10分钟”作为允许数据延迟多长时间的阈值。如果此查询在更新输出模式下运行(稍后在输出模式部分讨论),引擎将不断更新结果表中窗口的计数,直到窗口比水印更旧,这滞后于列“中的当前事件时间”时间戳”前 10 分钟。这是一个插图。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u28z65MD-1631754842076)(http://spark.apache.org/docs/2.4.0/img/structured-streaming-watermark-update-mode.png)]
如图,引擎跟踪的最大事件时间为 蓝色虚线,(max event time - '10 mins')
每次触发开始时设置的水印为红线。例如,当引擎观察到数据时 (12:14, dog)
,它将下一个触发器的水印设置为12:04
。此水印让引擎保持中间状态额外 10 分钟,以允许对延迟数据进行计数。比如数据(12:09, cat)
乱序、迟到,落在windows12:00 - 12:10
和12:05 - 12:15
. 由于它仍然12:04
在触发器中的水印之前,引擎仍然将中间计数保持为状态并正确更新相关窗口的计数。但是,当水印更新为12:11
,窗口的中间状态(12:00 - 12:10)
被清除,所有后续数据(例如(12:04, donkey)
)都被认为“太晚了”,因此被忽略。请注意,在每次触发后,更新的计数(即紫色行)被写入接收器作为触发输出,如更新模式所指示的那样。
某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。为了与它们一起工作,我们还支持追加模式,其中仅将最终计数写入接收器。这如下图所示。
请注意,withWatermark
在非流式数据集上使用是无操作的。由于水印不应该以任何方式影响任何批量查询,我们将直接忽略它。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U98X8Khb-1631754842079)(http://spark.apache.org/docs/2.4.0/img/structured-streaming-watermark-append-mode.png)]
与之前的更新模式类似,引擎维护每个窗口的中间计数。但是,部分计数不会更新到结果表中,也不会写入接收器。引擎等待“10 分钟”以计算延迟日期,然后丢弃窗口 < 水印的中间状态,并将最终计数附加到结果表/接收器。例如,窗口的最终计数12:00 - 12:10
仅在水印更新为 后才附加到结果表中12:11
。
需要注意的是,水印必须满足以下条件才能清除聚合查询中的状态*(从 Spark 2.1.1 开始,将来可能会发生变化)*。
window
事件时间列上的 a。withWatermark
必须在与聚合中使用的时间戳列相同的列上调用。例如, df.withWatermark("time", "1 min").groupBy("time2").count()
在追加输出模式下无效,因为水印是在与聚合列不同的列上定义的。withWatermark
必须在要使用的水印详细信息的聚合之前调用。例如,df.groupBy("time").count().withWatermark("time", "1 min")
在追加输出模式下无效。withWatermark
“2 小时”的水印延迟(设置为)可确保引擎永远不会丢弃任何延迟小于 2 小时的数据。换句话说,任何比到那时处理的最新数据晚不到 2 小时(就事件时间而言)的数据都可以保证聚合。结构化流支持将流数据集/数据帧与静态数据集/数据帧以及另一个流数据集/数据帧连接起来。流式连接的结果是增量生成的,类似于上一节中流式聚合的结果。在本节中,我们将探讨在上述情况下支持哪种类型的连接(即内部、外部等)。请注意,在所有支持的连接类型中,与流数据集/数据帧的连接结果将与在流中包含相同数据的静态数据集/数据帧完全相同。
自从在 Spark 2.0 中引入以来,Structured Streaming 已经支持流和静态 DataFrame/Dataset 之间的连接(内连接和某种类型的外连接)。这是一个简单的例子。
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") // 和静态DF内连接
streamingDf.join(staticDf, "type", "right_join") // 和静态DF右外连接
请注意,流静态连接不是有状态的,因此不需要状态管理。但是,尚不支持几种类型的流静态外部联接。这些都列在此join部分的末尾。
在 Spark 2.3 中,增加了对流-流连接的支持,即可以连接两个流数据集/数据帧。在两个数据流之间生成连接结果的挑战在于,在任何时间点,连接两侧的数据集视图都是不完整的,这使得在输入之间找到匹配变得更加困难。从一个输入流接收的任何行都可以与来自另一个输入流的任何未来的、尚未接收的行匹配。因此,对于两个输入流,我们将过去的输入缓冲为流状态,以便我们可以将每个未来的输入与过去的输入相匹配,并相应地生成连接结果。此外,类似于流聚合,我们自动处理迟到的乱序数据,并可以使用水印限制状态。让我们讨论支持的流-流连接的不同类型以及如何使用它们。
支持任何类型的列上的内部联接以及任何类型的联接条件。但是,随着流的运行,流状态的大小将无限增长,因为 必须保存所有过去的输入,因为任何新输入都可以与过去的任何输入匹配。为了避免无界状态,您必须定义额外的连接条件,以便无限期的旧输入无法与未来输入匹配,因此可以从状态中清除。换句话说,您必须在联接中执行以下附加步骤。
...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR
),...JOIN ON leftTimeWindow = rightTimeWindow
)。让我们通过一个例子来理解这一点。
假设我们想要将一个广告展示流(当广告展示时)与另一个用户点击广告流连接起来,以关联展示何时导致可获利的点击。要在此流-流连接中进行状态清理,您必须指定水印延迟和时间限制,如下所示。
代码看起来像这样。
import org.apache.spark.sql.functions.expr
val impressions = spark.readStream. ...
val clicks = spark.readStream. ...
// 在事件时间上使用水印
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
// 使用事件时间约束join
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
带水印的流-流内部连接的语义保证
这类似于在聚合上加水印提供的保证。“2 小时”的水印延迟保证引擎永远不会丢弃任何延迟小于 2 小时的数据。但延迟超过 2 小时的数据可能会也可能不会得到处理。
虽然水印 + 事件时间约束对于内连接是可选的,但对于左外连接和右外连接,它们必须被指定。这是因为为了在外连接中生成 NULL 结果,引擎必须知道输入行将来何时不会与任何内容匹配。因此,必须指定水印 + 事件时间约束才能生成正确的结果。因此,带有外连接的查询看起来与之前的广告货币化示例非常相似,只是会有一个附加参数将其指定为外连接。
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // 可以是 "inner", "leftOuter", "rightOuter"
)
带水印的流-流外部连接的语义保证
关于水印延迟以及数据是否会被丢弃,外连接与内连接具有相同的保证。
注意事项
关于如何生成外部结果,有几个重要的特征需要注意。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0iF8BKBg-1631754842082)(C:\Users\my\AppData\Roaming\Typora\typora-user-images\image-20210915221221093.png)]
有关支持的连接的其他详细信息:
df1.join(df2, ...).join(df3, ...).join(df4, ....)
.使用事件中的唯一标识符对数据流中的记录进行重复数据删除。这与使用唯一标识符列的静态重复数据删除完全相同。该查询将存储来自先前记录的必要数据量,以便它可以过滤重复记录。与聚合类似,可以使用带有或不带有水印的重复数据删除。
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// Without watermark using guid column
streamingDf.dropDuplicates("guid")
// With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime")
流查询可以有多个联合或连接在一起的输入流。每个输入流都可以具有不同的延迟数据阈值,有状态操作需要容忍这些延迟数据。您可以withWatermarks("eventTime", delay)
在每个输入流上使用这些阈值 。例如,考虑在inputStream1
和之间使用流-流连接的查询inputStream2
。
inputStream1.withWatermark(“eventTime1”, “1 hours”) .join( inputStream2.withWatermark(“eventTime2”, “2 hours”), joinCondition)
在执行查询时,Structured Streaming 单独跟踪在每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择单个全局水印与它们一起用于有状态操作。默认情况下,选择最小值作为全局水印,因为它可以确保如果其中一个流落后于其他流(例如,其中一个流由于上游故障而停止接收数据),则不会因为太晚而意外丢弃数据。换句话说,全局水印将安全地以最慢流的速度移动,查询输出将相应延迟。
但是,在某些情况下,您可能希望获得更快的结果,即使这意味着从最慢的流中删除数据。从 Spark 2.4 开始,您可以通过将 SQL 配置设置spark.sql.streaming.multipleWatermarkPolicy
为max
(默认为min
)来设置多水印策略以选择最大值作为全局水印 。这让全局水印以最快的流速度移动。然而,作为副作用,来自较慢流的数据将被积极丢弃。因此,请谨慎使用此配置。
许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,必须从事件数据流中跟踪会话。为了进行这种会话化,必须将任意类型的数据保存为状态,并使用每个触发器中的数据流事件对状态执行任意操作。从 Spark 2.2 开始,这可以使用操作mapGroupsWithState
和更强大的操作来完成flatMapGroupsWithState
。这两种操作都允许在分组数据集上应用用户定义的代码以更新用户定义的状态。有关更具体的详细信息,请查看 API 文档 ( Scala / Java ) 和示例 ( Scala / Java )。
流式数据帧/数据集不支持一些数据帧/数据集操作。其中一些如下。
此外,还有一些 Dataset 方法不适用于流式数据集。它们是将立即运行查询并返回结果的操作,这在流式数据集上没有意义。相反,这些功能可以通过显式启动流式查询来完成(请参阅下一节关于此的内容)。
count()
- 无法从流式数据集返回单个计数。相反,使用ds.groupBy().count()
它返回一个包含运行计数的流数据集。foreach()
- 改为使用ds.writeStream.foreach(...)
(见下一节)。show()
- 而是使用控制台接收器(请参阅下一节)。如果您尝试这些操作中的任何一个,您将看到AnalysisException
类似“流式数据帧/数据集不支持操作 XYZ”。虽然其中一些可能在 Spark 的未来版本中得到支持,但还有一些从根本上很难有效地在流数据上实现。例如,不支持对输入流进行排序,因为它需要跟踪流中接收到的所有数据。因此,这从根本上很难有效执行。
一旦定义了最终结果 DataFrame/Dataset,剩下的就是开始流式计算。要做到这一点,你必须使用通过Dataset.writeStream()
返回的DataStreamWriter
(Scala/ Java的/ Python文档)。必须在此界面中指定以下一项或多项。
有几种类型的输出模式。
select
, where
,map
,flatMap
,filter
,join
,等会支持追加模式。不同类型的流查询支持不同的输出模式。这是兼容性矩阵。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HP2GN6kl-1631754842083)(C:\Users\my\AppData\Roaming\Typora\typora-user-images\image-20210915223027459.png)]
有几种类型的内置输出接收器。
文件接收器- 将输出存储到目录中
writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
Kafka sink - 将输出存储到 Kafka 中的一个或多个主题。
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()
Foreach sink - 对输出中的记录运行任意计算。有关更多详细信息,请参阅本节后面部分。
writeStream
.foreach(...)
.start()
控制台接收器(用于调试) - 每次触发时将输出打印到控制台/标准输出。支持追加和完成输出模式。这应该用于低数据量的调试目的,因为每次触发后都会收集整个输出并存储在驱动程序的内存中。
writeStream
.format("console")
.start()
内存接收器(用于调试) - 输出作为内存表存储在内存中。支持追加和完成输出模式。这应该用于低数据量的调试目的,因为整个输出都被收集并存储在驱动程序的内存中。因此,请谨慎使用。
writeStream
.format("memory")
.queryName("tableName")
.start()
一些接收器不是容错的,因为它们不保证输出的持久性并且仅用于调试目的。请参阅前面关于 容错语义的部分。以下是 Spark 中所有接收器的详细信息。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-54gwFYpC-1631754842085)(C:\Users\my\AppData\Roaming\Typora\typora-user-images\image-20210915223422407.png)]
请注意,必须调用start()
才能真正开始执行查询。这将返回一个 StreamingQuery 对象,该对象是持续运行的执行的句柄。可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个例子来理解这一切。
// ========== 无聚合的DataFrame ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")
// 打印新数据到控制台
noAggDF
.writeStream
.format("console")
.start()
// 将新数据以Parquet格式写到文件
noAggDF
.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
// ========== 有聚合的DataFrame ==========
val aggDF = df.groupBy("device").count()
// 打印新数据到控制台
aggDF
.writeStream
.outputMode("complete")
.format("console")
.start()
// 所有聚合结果保存在一张内存表
aggDF
.writeStream
.queryName("aggregates") // 这里指定的查询名将会是表名
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show() // 交互式查询内存表
foreach
与foreachBatch
操作让应用在流媒体查询的输出任意操作和写的逻辑。它们的用例略有不同——虽然foreach
允许在每一行上自定义写入逻辑,但foreachBatch
允许对每个微批次的输出进行任意操作和自定义逻辑。让我们更详细地了解它们的用法。
foreachBatch(...)
允许指定一个函数,该函数对流式查询的每个微批次的输出数据执行。从 Spark 2.4 开始,Scala、Java 和 Python 都支持此功能。它需要两个参数:具有微批次输出数据的 DataFrame 或 Dataset 和微批次的唯一 ID。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// Transform and write batchDF
}.start()
使用foreachBatch
,可以执行以下操作。
重用现有的批处理数据源- 对于许多存储系统,可能还没有可用的流接收器,但可能已经存在用于批处理查询的数据编写器。使用foreachBatch
,您可以在每个微批次的输出上使用批次数据编写器。
写入多个位置- 如果要将流式查询的输出写入多个位置,则只需多次写入输出 DataFrame/Dataset。但是,每次写入尝试都会导致重新计算输出数据(包括可能重新读取输入数据)。为避免重新计算,您应该缓存输出 DataFrame/Dataset,将其写入多个位置,然后取消缓存。这是一个大纲。
streamDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.persist() batchDF.write.format(…).save(…) // location 1 batchDF.write.format(…).save( …) // 位置 2 batchDF.unpersist() }
应用额外的 DataFrame 操作- 流式 DataFrame 不支持许多 DataFrame 和 Dataset 操作,因为 Spark 不支持在这些情况下生成增量计划。使用foreachBatch
,您可以对每个微批次输出应用其中一些操作。但是,您必须自己推理执行该操作的端到端语义。
注意:
foreachBatch
仅提供至少一次写入保证。但是,您可以使用提供给函数的 batchId 作为对输出进行重复数据删除并获得精确一次保证的方法。foreachBatch
不适用于连续处理模式,因为它从根本上依赖于流式查询的微批处理执行。如果以连续模式写入数据,请foreach
改用。Foreach
如果foreachBatch
不可用(例如,对应的批处理数据写入器不存在,或连续处理模式),那么可以使用foreach
. 具体来说,可以将数据的写入逻辑分为三种方法:open
、process
、close
。从 Spark 2.4 开始,foreach
可以在 Scala、Java 和 Python 中使用。
在 Scala 中,必须扩展类ForeachWriter
( docs )。
streamingDatasetOfString.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// 开启连接
}
def process(record: String): Unit = {
// 写数据到连接
}
def close(errorOrNull: Throwable): Unit = {
// 关闭连接
}
}
).start()
执行语义 当流式查询开始时,Spark 以如下方式调用函数或对象的方法:
此对象的单个副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责处理分布式生成的数据的一个分区。
该对象必须是可序列化的,因为每个任务都将获得所提供对象的新的序列化-反序列化副本。因此,强烈建议在调用 open() 方法之后完成任何写入数据的初始化(例如,打开连接或启动事务),这表示任务已准备好生成数据。
方法的生命周期如下:
如果 open() 方法存在并成功返回(与返回值无关),则调用 close() 方法(如果存在),除非 JVM 或 Python 进程在中间崩溃。
**注意:**当失败导致重新处理某些输入数据时,open() 方法中的 partitionId 和 epochId 可用于对生成的数据进行重复数据删除。这取决于查询的执行模式。如果流式查询是以微批处理模式执行的,那么由唯一元组(partition_id,epoch_id)表示的每个分区都保证具有相同的数据。因此,(partition_id, epoch_id) 可用于重复数据删除和/或事务性提交数据并实现仅一次保证。但是,如果流查询以连续模式执行,则此保证不成立,因此不应用于重复数据删除。
流查询的触发器设置定义了流数据处理的时间,也定义了查询是作为具有固定批处理间隔的微批处理查询还是作为连续处理查询执行。以下是支持的不同类型的触发器
Trigger Type | Description |
---|---|
unspecified (default) | 如果没有显式指定触发器设置,那么默认情况下,查询将以微批处理模式执行,只要前一个微批处理完成处理,就会生成微批处理。 |
Fixed interval micro-batches | 查询将以微批次模式执行,其中微批次将以用户指定的时间间隔启动。 如果前一个微批次在间隔内完成,则引擎将等待间隔结束,然后再启动下一个微批次。 如果前一个 micro-batch 花费的时间比完成间隔的时间长(即如果错过了一个间隔边界),那么下一个 micro-batch将在前一个完成后立即开始(即,它不会等待下一个间隔边界) )。 如果没有新数据可用,则不会启动微批次。 |
One-time micro-batch | 查询将执行一个微批处理来处理所有可用数据,然后自行停止。这在您希望定期启动集群、处理自上一个周期以来可用的所有内容,然后关闭集群的情况下非常有用。在某些情况下,这可能会显着节省成本。 |
Continuous with fixed checkpoint interval (experimental) | 查询将在新的低延迟、连续处理模式下执行。在下面的连续处理部分阅读更多相关信息。 |
下面是一些代码示例。
import org.apache.spark.sql.streaming.Trigger
// 默认触发器为微批
df.writeStream
.format("console")
.start()
// 固定2秒间隔微批次
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
// 一次性微批次
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
// 以固定检查点间隔连续
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()
StreamingQuery
启动查询时创建的对象可用于监控和管理查询。
val query = df.writeStream.format("console").start() // get the query object
query.id // 从检查点数据获取重新启动后持续存在或正在运行的查询的唯一标识符
query.runId // 获取此查询运行的唯一 ID,它将在每次启动/重启时生成
query.name // 获取自动生成或用户指定名称的名称
query.explain() // 打印查询的详细说明
query.stop() // 停止查询
query.awaitTermination() // 阻塞直到查询终止,由于 stop() 或错误
query.exception // 查询因错误而终止时的异常
query.recentProgress // 此查询的最新进度更新数组
query.lastProgress // 此流式查询的最新进度更新
可以在单个 SparkSession 中启动任意数量的查询。它们都将同时运行,共享集群资源。可以使用sparkSession.streams()
获取可用于管理当前活动查询的StreamingQueryManager
(Scala / Java / Python文档)。
val spark: SparkSession = ...
spark.streams.active // 获取当前存活的流式查询列表
spark.streams.get(id) // 根据唯一id获取查询对象
spark.streams.awaitAnyTermination() // 锁定直到其中任何一个终止
有多种方法可以监控活动流查询。可以使用 Spark 的 Dropwizard Metrics 将指标推送到外部系统,或者以编程方式访问它们。
可以使用streamingQuery.lastProgress()
和 streamingQuery.status()
直接获取活动查询的当前状态和指标。 在Scala 和Java 中lastProgress()
返回一个StreamingQueryProgress
对象, 在 Python 中返回一个具有相同字段的字典。它包含有关流的最后一个触发器中取得的进度的所有信息 - 处理了哪些数据,处理速率是多少,延迟等。还有streamingQuery.recentProgress
返回最后几个进度的数组。
此外,在Scala 和Java 中streamingQuery.status()
返回一个StreamingQueryStatus
对象, 在 Python 中返回一个具有相同字段的字典。它提供有关查询立即执行的操作的信息 - 触发器是否处于活动状态,是否正在处理数据等。
这里有一些例子。
val query: StreamingQuery = ...
println(query.lastProgress)
/* 将会打印一些像下面的信息.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
println(query.status)
/* 将会打印一些像下面的信息.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
还可以SparkSession
通过附加 a StreamingQueryListener
(Scala / Java文档)来异步监视与 a 关联的所有查询 。使用 附加自定义StreamingQueryListener
对象在 sparkSession.streams.attachListener()
,您将在查询开始和停止以及活动查询取得进展时收到回调。这是一个例子,
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
Spark 支持使用Dropwizard 库的报告指标。要同时报告结构化流查询的指标,必须显式启用SparkSession 中的配置spark.sql.streaming.metricsEnabled
。
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
启用此配置后,在 SparkSession 中启动的所有查询都将通过 Dropwizard 向已配置的任何接收器(例如 Ganglia、Graphite、JMX 等)报告指标。
如果出现故障或故意关闭,可以恢复先前查询的先前进度和状态,并从中断处继续。这是使用检查点和预写日志完成的。您可以使用检查点位置配置查询,该查询会将所有进度信息(即在每个触发器中处理的偏移量范围)和正在运行的聚合(例如,快速示例中的字数)保存到检查点位置。此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在启动查询时设置为 DataStreamWriter 中的选项。
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
从同一检查点位置重新启动之间允许流式查询中的哪些更改存在限制。以下是一些不允许的更改,或者更改的效果没有明确定义。对于所有:
sdf
表示使用 sparkSession.readStream 生成的流式数据帧/数据集。变更类型
spark.readStream.format("kafka").option("subscribe", "topic")
至spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
spark.readStream.format("kafka").option("subscribe", "topic")
对spark.readStream.format("kafka").option("subscribe", "newTopic")
sdf.writeStream.format("parquet").option("path", "/somePath")
对sdf.writeStream.format("parquet").option("path", "/anotherPath")
sdf.writeStream.format("kafka").option("topic", "someTopic")
到sdf.writeStream.format("kafka").option("topic", "anotherTopic")
ForeachWriter
代码)进行更改,但更改的语义取决于代码。sdf.selectExpr("a")
to sdf.where(...).selectExpr("a").filter(...)
。sdf.selectExpr("stringColumn AS json").writeStream
以sdf.selectExpr("anotherStringColumn AS json").writeStream
sdf.selectExpr("a").writeStream
以sdf.selectExpr("b").writeStream
允许仅当输出信宿允许从模式更改"a"
到"b"
。sdf.groupBy("a").agg(...)
. 不允许对分组键或聚合的数量或类型进行任何更改。sdf.dropDuplicates("a")
. 不允许对分组键或聚合的数量或类型进行任何更改。sdf1.join(sdf2, ...)
(即两个输入都是用 生成的sparkSession.readStream
)。不允许更改架构或等效连接列。不允许更改连接类型(外部或内部)。连接条件中的其他更改是不明确的。sdf.groupByKey(...).mapGroupsWithState(...)
或sdf.groupByKey(...).flatMapGroupsWithState(...)
。不允许对用户定义状态的架构和超时类型进行任何更改。允许在用户定义的状态映射函数内进行任何更改,但更改的语义效果取决于用户定义的逻辑。如果您真的想支持状态模式更改,那么您可以使用支持模式迁移的编码/解码方案将复杂的状态数据结构显式编码/解码为字节。例如,如果您将状态保存为 Avro 编码字节,那么您可以在查询重新启动之间自由更改 Avro-state-schema,因为二进制状态将始终成功恢复。连续处理是 Spark 2.3 中引入的一种新的、实验性的流执行模式,它支持低(约 1 毫秒)的端到端延迟,并具有至少一次容错保证。将此与默认的微批处理引擎进行比较,后者可以实现恰好一次保证,但最多可实现约 100 毫秒的延迟。对于某些类型的查询(下面讨论),您可以选择在不修改应用程序逻辑的情况下执行它们的模式(即不更改 DataFrame/Dataset 操作)。
要在连续处理模式下运行受支持的查询,需要做的就是指定一个具有所需检查点间隔作为参数的连续触发器。例如,
import org.apache.spark.sql.streaming.Trigger
spark
.readStream
.format("rate")
.option("rowsPerSecond", "10")
.option("")
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // only change in query
.start()
1 秒的检查点间隔意味着连续处理引擎将每秒记录查询的进度。结果检查点的格式与微批处理引擎兼容,因此可以使用任何触发器重新启动任何查询。例如,以微批处理模式启动的受支持查询可以在连续模式下重新启动,反之亦然。请注意,每次切换到连续模式时,您都将获得至少一次容错保证。
从 Spark 2.3 开始,连续处理模式只支持以下类型的查询。
select
,map
,flatMap
,mapPartitions
,等)和选择(where
,filter
等)。
current_timestamp()
和current_date()
(使用时间的确定性计算具有挑战性)。numPartitions
和rowsPerSecond
。有关它们的更多详细信息,请参阅输入源和输出接收器部分。虽然控制台接收器非常适合测试,但使用 Kafka 作为源和接收器可以最好地观察端到端的低延迟处理,因为这允许引擎处理数据并使结果在输出主题中可用输入主题中可用的输入数据的毫秒数。
//TODO
//TODO
Spark1.5及之前的内存管理机制。在静态内存管理器机制下,存储内存,执行内存和其他内存的大小在Spark应用程序的操作期间是固定的,用户可以在应用程序启动之前对其进行配置。
缺点:
统一内存管理是Spark 1.6之后引入的默认管理方式。统一内存管理器和静态内存管理器之间的区别在于,在统一内存管理器机制下,存储内存和执行内存共享一个内存区域,两者都可以占用彼此的空闲区域,存储内存和执行内存都可以在堆外分配了。
运行在Executor的Task同时可使用JVM(OnHeap+Off-heap)和Off-heap两种模式的内存。默认是JVM内存,Off-heap需要修改配置
如上图所示,Yarn集群管理模式中,Spark 以Executor Container的形式在NodeManager中运行,其可使用的内存上限由“yarn.scheduler.maximum-allocation-mb” 指定, —我们可以称其为MonitorMemory。
Executor的内存由Heap内存和设定的Off-heap内存组成:
Yarn集群中必须满足这样的关系 : ExecutorMemory + MemoryOverhead <= MonitorMemory
若应用提交之时,指定的 ExecutorMemory与MemoryOverhead 之和大于 MonitorMemory,则会导致Executor申请失败;若运行过程中,实际使用内存超过上限阈值,Executor进程会被Yarn终止掉(kill)。
–executor-memory 也就是 “spark.executor.memory"指定的内存为Executor JVM最大分配的堆内存(”-xmx"),Spark为了更高效的使用这部分内存,对这部分内存进行了细分,下图(备注:此图源于互联网)对基于spark2(1.6+)对堆内存分配比例进行了描述:
Spark2.4中内存参数
若有一方内存不足,另一方空余,Storage和Execution可以动态相互占用对方内存。如有必要,Execution可能会驱逐Storage占用的内存,Execution内存可以强制收回Storage占用的内存。Execution是Unified Memory中永远不会驱逐缓存块的子区域。
序列化在任何分布式应用程序的性能中都扮演着重要的角色。将对象序列化为缓慢或消耗大量字节的格式将大大减慢计算速度。通常,这将是应该调整以优化 Spark 应用程序的第一件事。Spark 旨在在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两个序列化库:
ObjectOutputStream
框架序列化对象,并且可以与您创建的任何实现 java.io.Serializable
. 您还可以通过扩展 java.io.Externalizable
. Java 序列化很灵活,但通常很慢,并且导致许多类的序列化格式很大。Serializable
类型,并且需要提前注册将在程序中使用的类以获得最佳性能。可以通过使用SparkConf初始化Job,并调用conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
. 此设置配置的序列化程序不仅用于在工作节点之间混洗数据,还用于将 RDD 序列化到磁盘。Kryo 不是默认设置的唯一原因是自定义注册要求,但我们建议在任何网络密集型应用程序中尝试它。从 Spark 2.0.0 开始,我们在内部使用 Kryo 序列化程序来对具有简单类型、简单类型数组或字符串类型的 RDD 进行Shuffle。
Spark自动包含了常用核心 Scala 类Kryo序列化,由Twitter chill](https://github.com/twitter/chill)提供
要向 Kryo 注册自己的自定义类,使用registerKryoClasses
方法。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
KRYO文档描述了更先进的注册选项,如添加自定义序列的代码。
如果您的对象很大,您可能还需要增加spark.kryoserializer.buffer
config。该值需要足够大以容纳您将序列化的最大对象。
最后,如果不注册自定义类,Kryo 仍然可以工作,但它必须为每个对象存储完整的类名,这很浪费。
默认情况下,Java 对象可以快速访问,但很容易消耗比其字段中的“原始”数据多 2-5 倍的空间。这是由于以下几个原因:
参考4 内存管理
确定数据集所需内存消耗量的最佳方法是创建一个 RDD,将其放入缓存,然后查看 Web UI 中的“存储”页面。该页面会告诉我们 RDD 占用了多少内存。
要估计特定对象的内存消耗,请使用SizeEstimator
的estimate
方法。这对于尝试不同的数据布局以减少内存使用以及确定广播变量将在每个执行程序堆上占用的空间量非常有用。
减少内存消耗的第一种方法是避免增加开销的 Java 特性,例如基于指针的数据结构和包装对象。做这件事有很多种方法:
HashMap
)。fastutil 库为与 Java 标准库兼容的原始类型提供了方便的集合类。-XX:+UseCompressedOops
以使指针为 4 个字节而不是 8 个字节。可以在 spark-env.sh
设置.尽管进行了这种调整,但当对象仍然太大而无法有效存储时,减少内存使用的一种更简单的方法是以序列化形式存储它们,使用RDD 持久性 API 中的序列化 StorageLevels ,例如MEMORY_ONLY_SER
. 然后 Spark 会将每个 RDD 分区存储为一个大字节数组。以序列化形式存储数据的唯一缺点是访问时间较慢,因为必须动态反序列化每个对象。如果您想以序列化形式缓存数据,我们强烈建议使用 Kryo,因为它比 Java 序列化(当然也比原始 Java 对象)小得多。
当程序存储的 RDD 有大量“流失”时,JVM 垃圾收集可能会成为一个问题。(对于只读取一次 RDD 然后对其运行许多操作的程序,这通常不是问题。)当 Java 需要驱逐旧对象为新对象腾出空间时,它需要跟踪所有 Java 对象并找到未使用的。这里要记住的要点是垃圾收集的成本与 Java 对象的数量成正比,因此使用具有较少对象的数据结构(例如Int
s 而不是 a的数组LinkedList
)可以大大降低此成本。一种更好的方法是以序列化形式持久化对象,如上所述:现在将只有一个每个 RDD 分区的对象(一个字节数组)。在尝试其他技术之前,如果 GC 有问题,首先要尝试使用序列化缓存。
由于任务的工作内存(运行任务所需的空间量)和节点上缓存的 RDD 之间的干扰,GC 也可能是一个问题。我们将讨论如何控制分配给 RDD 缓存的空间来缓解这种情况。
测量 GC 的影响
GC 调优的第一步是收集有关垃圾收集发生频率和 GC 花费时间的统计信息。这可以通过添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
到 Java 选项来完成。(有关将 Java 选项传递给 Spark 作业的信息,请参阅配置指南。)下次运行 Spark 作业时,每次发生垃圾收集时,您都会看到在工作程序日志中打印的消息。请注意,这些日志将在集群的Executor节点上(stdout
在其工作目录中的文件中),而不是在您的Driver程序上。
高级 GC 调优
为了进一步调优垃圾回收,我们首先需要了解一些关于JVM内存管理的基本信息:
Spark 中 GC 调优的目标是确保只有长寿命的 RDD 存储在老年代,并且年轻代的大小足以存储短寿命的对象。这将有助于避免完整的 GC 来收集在任务执行期间创建的临时对象。一些可能有用的步骤是:
E
,则可以使用该选项设置 Young 代的大小-Xmn=4/3*E
。(放大 4/3 也是为了考虑幸存者区域使用的空间。)spark.memory.fraction
来减少用于缓存的内存量;缓存更少的对象比减慢任务执行速度更好。或者,考虑减少年轻代的大小。这意味着-Xmn
如果您已将其设置为如上,则降低。如果没有,请尝试更改 JVMNewRatio
参数的值。许多 JVM 将其默认为 2,这意味着 Old 代占用堆的 2/3。它应该足够大,使得这个分数超过spark.memory.fraction
。-XX:+UseParNewGC -XX:+UseConcMarkSweepGC
-XX:+UseG1GC
。在垃圾收集成为瓶颈的某些情况下,它可以提高性能。需要注意的是大Executor堆大小,设置-XX:G1HeapRegionSize
增加G1区域大小 很重要4*3*128MB
.我们的经验表明,GC 调优的效果取决于您的应用程序和可用内存量。有更多的微调选项描述联机,但需要较高的水平管理。管理 full GC 发生的频率有助于减少开销。
可以通过spark.executor.extraJavaOptions
在作业的配置中设置来指定执行程序的 GC 配置。
除非您将每个操作的并行级别设置得足够高,否则集群不会得到充分利用。Spark 会根据每个文件的大小自动设置要在每个文件上运行的“map”任务的数量(尽管您可以通过可选参数 toSparkContext.textFile
等来控制它),并且对于分布式“reduce”操作,例如groupByKey
and reduceByKey
,它使用最大的父级RDD 的分区数。您可以将并行级别作为第二个参数传递(请参阅spark.PairRDDFunctions
文档),或设置 config 属性spark.default.parallelism
以更改默认值。通常,我们建议集群中的每个 CPU 核心 2-3 个任务。
有时,您会得到 OutOfMemoryError 不是因为您的 RDD 不适合内存,而是因为您的一项任务的工作集(例如 中的一个 reduce 任务groupByKey
)太大。Spark 的 shuffle 操作(sortByKey
、groupByKey
、reduceByKey
、join
等)在每个任务中构建一个哈希表来执行分组,这通常可能很大。这里最简单的解决方法是 提高并行度,使每个任务的输入集更小。Spark 可以有效地支持短至 200 毫秒的任务,因为它在多个任务中重用一个 executor JVM,并且任务启动成本低,因此您可以安全地将并行级别提高到超过集群中的内核数量。
使用 SparkContext
中 可用的广播功能可以大大减少每个序列化任务的大小,以及在集群上启动作业的成本。如果您的任务使用其中的驱动程序中的任何大对象(例如静态查找表),请考虑将其转换为广播变量。Spark 会在 master 上打印每个任务的序列化大小,因此您可以查看它以确定您的任务是否太大;一般来说,大于 20 KB 的任务可能值得优化。
数据本地化会对 Spark 作业的性能产生重大影响。如果数据和对其进行操作的代码在一起,那么计算往往会很快。但是如果代码和数据是分开的,就必须移到另一个。通常,将序列化代码从一个地方传送到另一个地方比一大块数据更快,因为代码大小比数据小得多。Spark 围绕数据本地化的一般原则构建其调度。
数据本地化是数据与处理它的代码的接近程度。根据数据的当前位置,存在多个位置级别。按照从近到远的顺序:
PROCESS_LOCAL
数据与运行代码在同一个 JVM 中。这是最好的地方NODE_LOCAL
数据在同一个节点上。示例可能在同一节点上的 HDFS 中,或在同一节点上的另一个执行程序中。这比PROCESS_LOCAL
因为数据必须在进程之间传输要慢一点NO_PREF
从任何地方同样快速地访问数据,并且没有位置偏好RACK_LOCAL
数据在同一个服务器机架上。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常是通过单个交换机ANY
数据在网络上的其他地方,而不是在同一个机架中Spark 更喜欢在最佳位置级别安排所有任务,但这并不总是可行的。在任何空闲执行器上没有未处理数据的情况下,Spark 会切换到较低的位置级别。有两种选择:a) 等到繁忙的 CPU 腾出时间来启动同一服务器上的数据任务,或者 b) 立即在需要移动数据的较远地方启动新任务。
Spark 通常做的是稍等片刻,希望繁忙的 CPU 腾出时间。一旦超时到期,它就会开始将数据从远处移动到空闲 CPU。每个级别之间回退的等待超时可以单独配置,也可以在一个参数中一起配置;详见 配置页面spark.locality
参数。如果您的任务很长并且局部性较差,您应该增加这些设置,但默认设置通常效果很好。
总结
这是一个简短的指南,用于指出您在调优 Spark 应用程序时应该了解的主要问题——最重要的是,数据序列化和内存调优。对于大多数程序,切换到 Kryo 序列化并以序列化形式持久化数据将解决最常见的性能问题。请随时在 Spark 邮件列表中询问其他调优最佳实践。
基于Yarn
spark官方文档
http://spark.apache.org/docs/2.4.0/
Spark案例
https://sparkbyexamples.com/spark/
持久化
https://cloud.tencent.com/developer/article/1482150
《Spark快速大数据分析》
《High Performance Spark》
性能调优
https://zhuanlan.zhihu.com/p/354998357
数据倾斜调优
https://www.cnblogs.com/cssdongl/p/6594298.html
内存模型
https://www.iteblog.com/archives/2342.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。