赞
踩
foreachRDD 设计模式的使用
dstream.foreachRDD 是一个强大的原语, 允许将数据发送到外部系统.但是, 了解如何正确有效地使用这个原语很重要. 避免一些常见的错误如下.
通常向外部系统写入数据需要创建连接对象(例如与远程服务器的 TCP 连接), 并使用它将数据发送到远程系统.
为此, 开发人员可能会无意中尝试在Spark driver 中创建连接对象, 然后尝试在Spark工作人员中使用它来在RDD中保存记录.例如(在 Scala 中):
- dstream.foreachRDD { rdd =>
- val connection = createNewConnection() // executed at the driver
- rdd.foreach { record =>
- connection.send(record) // executed at the worker
- }
- }
这是不正确的, 因为这需要将连接对象序列化并从 driver 发送到 worker. 这种连接对象很少能跨机器转移.
此错误可能会显示为序列化错误(连接对象不可序列化), 初始化错误(连接对象需要在 worker 初始化)等. 正确的解决方案是在 worker 创建连接对象.
但是, 这可能会导致另一个常见的错误 - 为每个记录创建一个新的连接. 例如:
- dstream.foreachRDD { rdd =>
- rdd.foreach { record =>
- val connection = createNewConnection()
- connection.send(record)
- connection.close()
- }
- }
通常, 创建连接对象具有时间和资源开销. 因此, 创建和销毁每个记录的连接对象可能会引起不必要的高开销, 并可显着降低系统的总体吞吐量.
一个更好的解决方案是使用 rdd.foreachPartition - 创建一个连接对象, 并使用该连接在 RDD 分区中发送所有记录.
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- val connection = createNewConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- connection.close()
- }
- }
这样可以在多个记录上分摊连接创建开销.
最后, 可以通过跨多个RDD /批次重用连接对象来进一步优化. 可以维护连接对象的静态池, 而不是将多个批次的 RDD 推送到外部系统时重新使用, 从而进一步减少开销.
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- // ConnectionPool is a static, lazily initialized pool of connections
- val connection = ConnectionPool.getConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- ConnectionPool.returnConnection(connection) // return to the pool for future reuse
- }
- }
请注意, 池中的连接应根据需要懒惰创建, 如果不使用一段时间, 则会超时. 这实现了最有效地将数据发送到外部系统.
其他要记住的要点:
DStreams 通过输出操作进行延迟执行, 就像 RDD 由 RDD 操作懒惰地执行.
具体来说, DStream 输出操作中的 RDD 动作强制处理接收到的数据.
因此, 如果您的应用程序没有任何输出操作, 或者具有 dstream.foreachRDD() 等输出操作, 而在其中没有任何 RDD 操作, 则不会执行任何操作.系统将简单地接收数据并将其丢弃.
默认情况下, 输出操作是 one-at-a-time 执行的. 它们按照它们在应用程序中定义的顺序执行.
DataFrame 和 SQL 操作
您可以轻松地在流数据上使用 DataFrames and SQL 和 SQL 操作. 您必须使用 StreamingContext 正在使用的 SparkContext 创建一个 SparkSession.
此外, 必须这样做, 以便可以在 driver 故障时重新启动.
这是通过创建一个简单实例化的 SparkSession 单例实例来实现的.
它使用 DataFrames 和 SQL 来修改早期的字数 示例以生成单词计数.将每个 RDD 转换为 DataFrame, 注册为临时表, 然后使用 SQL 进行查询.
- /** DataFrame operations inside your streaming program */
-
- val words: DStream[String] = ...
-
- words.foreachRDD { rdd =>
-
- // Get the singleton instance of SparkSession
- val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
- import spark.implicits._
-
- // Convert RDD[String] to DataFrame
- val wordsDataFrame = rdd.toDF("word")
-
- // Create a temporary view
- wordsDataFrame.createOrReplaceTempView("words")
-
- // Do word count on DataFrame using SQL and print it
- val wordCountsDataFrame =
- spark.sql("select word, count(*) as total from words group by word")
- wordCountsDataFrame.show()
- }
请参阅完整的 源代码.
您还可以对来自不同线程的流数据(即异步运行的 StreamingContext )上定义的表运行 SQL 查询.
只需确保您将 StreamingContext 设置为记住足够数量的流数据, 以便查询可以运行. 否则, 不知道任何异步 SQL 查询的 StreamingContext 将在查询完成之前删除旧的流数据.
例如, 如果要查询最后一个批次, 但是您的查询可能需要5分钟才能运行, 则可以调用 streamingContext.remember(Minutes(5)) (以 Scala 或其他语言的等价物).实战案例:使用SparkStreaming完成词频统计,并将结果写入MySQL
- val lines = ssc.socketTextStream("192.168.43.150", 9999)
- val result = lines.flatMap(_.split(" +")).map((_, 1)).reduceByKey(_ + _)
- //将结果写入MySQL
- result.foreachRDD(rdd=>{
- rdd.foreachPartition(partitionOfRecords=>{
- // ConnectionPool is a static, lazily initialized pool of connections
- val connection = createConnection()
- partitionOfRecords.foreach(record=>{
- val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
- connection.createStatement().execute(sql)
- })
- connection.close()
- })
- })
-
- ssc.start()
- ssc.awaitTermination()
- }
- /**
- * 获取MySQL的连接
- */
- def createConnection() = {
- Class.forName("com.mysql.jdbc.Driver")
- DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123")
- }
存在的问题:
1.对于已有的数据做更新,但是所有的数据均为insert,在MySQL表中表现为追加的形式,没有累加
改进思路:a)在插入数据之前先判断单词是否存在,如果存在就update,如果不存在就insert
b)在工作中一般使用HBase/Redis,他们有自带的API,可以累加
2.每个rdd的partition创建connection,建议改换成连接池
窗口函数的使用:
window: 定时的进行一个时间段内的数据处理
两个关键参数:window length 窗口的长度;sliding interval: 窗口的间隔
这两个参数和我们的batch size有关系:倍数
使用场景: 每隔一段时间计算某个范围内的数据 --> 每隔10秒(窗口的间隔)计算前10分钟(窗口的长度)的Wordcount
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
- 黑名单的过滤:
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- //黑名单过滤
- object TransForm {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf()
- .setAppName(this.getClass.getSimpleName)
- .setMaster("local[*]")
- val ssc = new StreamingContext(sparkConf,Seconds(2))
-
- /**
- * 构建黑名单
- */
- val blacks = List("zs","ls")
- val blacksRDD: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacks).map((_,true))
- val lines = ssc.socketTextStream("hadoop101",9999)
- val clicklog = lines.map(rdd => (rdd.split(",")(1), rdd)).transform(rdd => {
- rdd.leftOuterJoin(blacksRDD)
- .filter(x => x._2._2.getOrElse(false) != true)
- .map(x => x._2._1)
- })
- clicklog.print()
-
- ssc.start()
- ssc.awaitTermination()
- }
-
- }
Spark Streaming整合Spark SQL完成词频统计操作:
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-
- //Spark Streaming整合Spark SQL完成词频统计操作
- object SqlNetworkWordCount {
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf()
- .setAppName(this.getClass.getSimpleName)
- .setMaster("local[*]")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
-
- val lines = ssc.socketTextStream("hadoop101", 9999)
- val words = lines.flatMap(_.split(" +"))
- // Convert RDDs of the words DStream to DataFrame and run SQL query
- //将Dstream中的rdd通过foreachRDD拿出来,
- //再通过toDF()转换成DataFrame,
- //再通过createOrReplaceTempView将DataFrame注册成一个临时表
- //每次单词统计的时候,用SQL的方式将结果统计出来
- words.foreachRDD { (rdd: RDD[String], time: Time) =>
- // Get the singleton instance of SparkSession
- val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
- import spark.implicits._
- // Convert RDD[String] to RDD[case class] to DataFrame
- val wordsDataFrame = rdd.map(w => Record(w)).toDF()
- // Creates a temporary view using the DataFrame
- wordsDataFrame.createOrReplaceTempView("words")
- // Do word count on table using SQL and print it
- val wordCountsDataFrame =
- spark.sql("select word, count(*) as total from words group by word")
- println(s"========= $time =========")
- wordCountsDataFrame.show()
-
- }
-
-
- ssc.start()
- ssc.awaitTermination()
- }
- }
- /** Case class for converting RDD to DataFrame */
- case class Record(word: String)
-
-
- /** Lazily instantiated singleton instance of SparkSession */
- object SparkSessionSingleton {
- @transient private var instance: SparkSession = _
-
- def getInstance(sparkConf: SparkConf) = {
- if (instance == null) {
- SparkSession
- .builder()
- .config(sparkConf)
- .getOrCreate()
- }
- instance
- }
-
- }
-
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。