当前位置:   article > 正文

使用SparkStreaming完成词频统计,并将结果写入MySQL,黑名单过滤_第1关:套接字流实现黑名单过滤

第1关:套接字流实现黑名单过滤

foreachRDD 设计模式的使用
dstream.foreachRDD 是一个强大的原语, 允许将数据发送到外部系统.但是, 了解如何正确有效地使用这个原语很重要. 避免一些常见的错误如下.

通常向外部系统写入数据需要创建连接对象(例如与远程服务器的 TCP 连接), 并使用它将数据发送到远程系统.
为此, 开发人员可能会无意中尝试在Spark driver 中创建连接对象, 然后尝试在Spark工作人员中使用它来在RDD中保存记录.例如(在 Scala 中):

  1. dstream.foreachRDD { rdd =>
  2.   val connection = createNewConnection()  // executed at the driver
  3.   rdd.foreach { record =>
  4.     connection.send(record) // executed at the worker
  5.   }
  6. }


这是不正确的, 因为这需要将连接对象序列化并从 driver 发送到 worker. 这种连接对象很少能跨机器转移. 
此错误可能会显示为序列化错误(连接对象不可序列化), 初始化错误(连接对象需要在 worker 初始化)等. 正确的解决方案是在 worker 创建连接对象.

但是, 这可能会导致另一个常见的错误 - 为每个记录创建一个新的连接. 例如:

  1. dstream.foreachRDD { rdd =>
  2.   rdd.foreach { record =>
  3.     val connection = createNewConnection()
  4.     connection.send(record)
  5.     connection.close()
  6.   }
  7. }


通常, 创建连接对象具有时间和资源开销. 因此, 创建和销毁每个记录的连接对象可能会引起不必要的高开销, 并可显着降低系统的总体吞吐量. 
一个更好的解决方案是使用 rdd.foreachPartition - 创建一个连接对象, 并使用该连接在 RDD 分区中发送所有记录.

  1. dstream.foreachRDD { rdd =>
  2.   rdd.foreachPartition { partitionOfRecords =>
  3.     val connection = createNewConnection()
  4.     partitionOfRecords.foreach(record => connection.send(record))
  5.     connection.close()
  6.   }
  7. }


这样可以在多个记录上分摊连接创建开销.

最后, 可以通过跨多个RDD /批次重用连接对象来进一步优化. 可以维护连接对象的静态池, 而不是将多个批次的 RDD 推送到外部系统时重新使用, 从而进一步减少开销.

  1. dstream.foreachRDD { rdd =>
  2.   rdd.foreachPartition { partitionOfRecords =>
  3.     // ConnectionPool is a static, lazily initialized pool of connections
  4.     val connection = ConnectionPool.getConnection()
  5.     partitionOfRecords.foreach(record => connection.send(record))
  6.     ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  7.   }
  8. }


请注意, 池中的连接应根据需要懒惰创建, 如果不使用一段时间, 则会超时. 这实现了最有效地将数据发送到外部系统.

其他要记住的要点:

DStreams 通过输出操作进行延迟执行, 就像 RDD 由 RDD 操作懒惰地执行. 
具体来说, DStream 输出操作中的 RDD 动作强制处理接收到的数据.
因此, 如果您的应用程序没有任何输出操作, 或者具有 dstream.foreachRDD() 等输出操作, 而在其中没有任何 RDD 操作, 则不会执行任何操作.系统将简单地接收数据并将其丢弃.

默认情况下, 输出操作是 one-at-a-time 执行的. 它们按照它们在应用程序中定义的顺序执行.

DataFrame 和 SQL 操作
您可以轻松地在流数据上使用 DataFrames and SQL 和 SQL 操作. 您必须使用 StreamingContext 正在使用的 SparkContext 创建一个 SparkSession.
此外, 必须这样做, 以便可以在 driver 故障时重新启动. 
这是通过创建一个简单实例化的 SparkSession 单例实例来实现的.
它使用 DataFrames 和 SQL 来修改早期的字数 示例以生成单词计数.将每个 RDD 转换为 DataFrame, 注册为临时表, 然后使用 SQL 进行查询.

  1. /** DataFrame operations inside your streaming program */
  2. val words: DStream[String] = ...
  3. words.foreachRDD { rdd =>
  4.   // Get the singleton instance of SparkSession
  5.   val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  6.   import spark.implicits._
  7.   // Convert RDD[String] to DataFrame
  8.   val wordsDataFrame = rdd.toDF("word")
  9.   // Create a temporary view
  10.   wordsDataFrame.createOrReplaceTempView("words")
  11.   // Do word count on DataFrame using SQL and print it
  12.   val wordCountsDataFrame = 
  13.     spark.sql("select word, count(*) as total from words group by word")
  14.   wordCountsDataFrame.show()
  15. }


请参阅完整的 源代码.

您还可以对来自不同线程的流数据(即异步运行的 StreamingContext )上定义的表运行 SQL 查询. 
只需确保您将 StreamingContext 设置为记住足够数量的流数据, 以便查询可以运行. 否则, 不知道任何异步 SQL 查询的 StreamingContext 将在查询完成之前删除旧的流数据.
例如, 如果要查询最后一个批次, 但是您的查询可能需要5分钟才能运行, 则可以调用 streamingContext.remember(Minutes(5)) (以 Scala 或其他语言的等价物).实战案例:使用SparkStreaming完成词频统计,并将结果写入MySQL

  1. val lines = ssc.socketTextStream("192.168.43.150", 9999)
  2.     val result = lines.flatMap(_.split(" +")).map((_, 1)).reduceByKey(_ + _)
  3.     //将结果写入MySQL
  4.     result.foreachRDD(rdd=>{
  5.     rdd.foreachPartition(partitionOfRecords=>{
  6.       // ConnectionPool is a static, lazily initialized pool of connections
  7.       val connection = createConnection()
  8.       partitionOfRecords.foreach(record=>{
  9.         val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
  10.         connection.createStatement().execute(sql)
  11.       })
  12.       connection.close()
  13.     })
  14.     })
  15.     ssc.start()
  16.     ssc.awaitTermination()
  17.   }

 

  1. /**
  2.     * 获取MySQL的连接
  3.     */
  4.   def createConnection() = {
  5.     Class.forName("com.mysql.jdbc.Driver")
  6.     DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123")
  7.   }

存在的问题:
    1.对于已有的数据做更新,但是所有的数据均为insert,在MySQL表中表现为追加的形式,没有累加
        改进思路:a)在插入数据之前先判断单词是否存在,如果存在就update,如果不存在就insert
                 b)在工作中一般使用HBase/Redis,他们有自带的API,可以累加
                 
    2.每个rdd的partition创建connection,建议改换成连接池
    
    
窗口函数的使用:
    window:    定时的进行一个时间段内的数据处理
        两个关键参数:window length 窗口的长度;sliding interval: 窗口的间隔
    这两个参数和我们的batch size有关系:倍数
使用场景:    每隔一段时间计算某个范围内的数据    -->    每隔10秒(窗口的间隔)计算前10分钟(窗口的长度)的Wordcount
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

  1. 黑名单的过滤:
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. //黑名单过滤
  6. object TransForm {
  7.   def main(args: Array[String]): Unit = {
  8.     val sparkConf = new SparkConf()
  9.       .setAppName(this.getClass.getSimpleName)
  10.       .setMaster("local[*]")
  11.     val ssc = new StreamingContext(sparkConf,Seconds(2))
  12.     /**
  13.       * 构建黑名单
  14.       */
  15.     val blacks = List("zs","ls")
  16.     val blacksRDD: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacks).map((_,true))
  17.     val lines = ssc.socketTextStream("hadoop101",9999)
  18.     val clicklog = lines.map(rdd => (rdd.split(",")(1), rdd)).transform(rdd => {
  19.       rdd.leftOuterJoin(blacksRDD)
  20.         .filter(x => x._2._2.getOrElse(false) != true)
  21.         .map(x => x._2._1)
  22.     })
  23.     clicklog.print()
  24.     ssc.start()
  25.     ssc.awaitTermination()
  26.   }
  27. }


Spark Streaming整合Spark SQL完成词频统计操作:

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
  5. //Spark Streaming整合Spark SQL完成词频统计操作
  6. object SqlNetworkWordCount {
  7.   def main(args: Array[String]): Unit = {
  8.     val sparkConf = new SparkConf()
  9.       .setAppName(this.getClass.getSimpleName)
  10.       .setMaster("local[*]")
  11.     val ssc = new StreamingContext(sparkConf, Seconds(2))
  12.     val lines = ssc.socketTextStream("hadoop101", 9999)
  13.     val words = lines.flatMap(_.split(" +"))
  14.     // Convert RDDs of the words DStream to DataFrame and run SQL query
  15.     //将Dstream中的rdd通过foreachRDD拿出来,
  16.     //再通过toDF()转换成DataFrame,
  17.     //再通过createOrReplaceTempView将DataFrame注册成一个临时表
  18.     //每次单词统计的时候,用SQL的方式将结果统计出来
  19.     words.foreachRDD { (rdd: RDD[String], time: Time) =>
  20.       // Get the singleton instance of SparkSession
  21.       val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
  22.       import spark.implicits._
  23.       // Convert RDD[String] to RDD[case class] to DataFrame
  24.       val wordsDataFrame = rdd.map(w => Record(w)).toDF()
  25.       // Creates a temporary view using the DataFrame
  26.       wordsDataFrame.createOrReplaceTempView("words")
  27.       // Do word count on table using SQL and print it
  28.       val wordCountsDataFrame =
  29.         spark.sql("select word, count(*) as total from words group by word")
  30.       println(s"========= $time =========")
  31.       wordCountsDataFrame.show()
  32.     }
  33.     ssc.start()
  34.     ssc.awaitTermination()
  35.   }
  36. }


 

  1. /** Case class for converting RDD to DataFrame */
  2. case class Record(word: String)
  3. /** Lazily instantiated singleton instance of SparkSession */
  4. object SparkSessionSingleton {
  5.   @transient private var instance: SparkSession = _
  6.   def getInstance(sparkConf: SparkConf) = {
  7.     if (instance == null) {
  8.       SparkSession
  9.         .builder()
  10.         .config(sparkConf)
  11.         .getOrCreate()
  12.     }
  13.     instance
  14.   }
  15. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/674507
推荐阅读
相关标签
  

闽ICP备14008679号