当前位置:   article > 正文

Spark Streaming实战之黑名单过滤_第1关:套接字流实现黑名单过滤

第1关:套接字流实现黑名单过滤

1.需求场景

访问日志:

  1. 201801,zs
  2. 201802,ls
  3. 201803,ww
  4. .....

黑名单:

zs,ls...

现在需要把黑名单中的人从访问日志中给过滤掉,然后得到一份新的访问日志

2.思路分析

要实现上边的需求,首先要进行思路分析,即如何实现

我们可以把黑名单数据先变成一个RDD,将它变成(zs,true) (ls,true)这样的形式,然后再将访问日志变成(zs,<201801,zs>) (ls,<201802,ls>)  (ww,<201803,ww>)的形式,使用leftjoin把它们变成(zs,[<201801,zs>,true]) (ls,[<201802,ls>,true])  (ww,[<201803,ww>,true])的形式,如果是true的话就输出

3.代码实现

  1. package cn.ysjh
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. object TranFormSpark {
  7. def main(args: Array[String]): Unit = {
  8. val cf: SparkConf = new SparkConf().setAppName("TranForm").setMaster("local[2]")
  9. val stream: StreamingContext = new StreamingContext(cf,Seconds(5))
  10. /*
  11. 构建黑名单
  12. */
  13. val block = List("zs","ls")
  14. val blocks: RDD[(String, Boolean)] = stream.sparkContext.parallelize(block).map(x => (x,true))
  15. val socket: ReceiverInputDStream[String] = stream.socketTextStream("192.168.220.134",6789)
  16. val result: DStream[String] = socket.map(x => (x.split(",")(1), x)).transform(rdd => {
  17. rdd.leftOuterJoin(blocks)
  18. .filter(x => x._2._2.getOrElse(false) != true)
  19. .map(x => x._2._1)
  20. })
  21. result.print()
  22. stream.start()
  23. stream.awaitTermination()
  24. }
  25. }

4.运行测试

在虚拟机中使用nc来输送socket数据,,然后看在IDEA中Spark Streaming程序的运行结果

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/674519
推荐阅读
相关标签
  

闽ICP备14008679号