当前位置:   article > 正文

头歌:Spark Streaming_第1关:套接字流实现黑名单过滤

第1关:套接字流实现黑名单过滤

第1关:套接字流实现黑名单过滤
 

简介

套接字流是通过监听Socket端口接收的数据,相当于Socket之间的通信,任何用户在用Socket(套接字)通信之前,首先要先申请一个Socket号,Socket号相当于该用户的电话号码。同时要知道对方的Socket,相当于对方也有一个电话号码。然后向对方拨号呼叫,相当于发出连接请求。对方假如在场并空闲,相当于通信的另一主机开机且可以接受连接请求,拿起电话话筒,双方就可以正式通话,相当于连接成功。Spark Streaming通过监听套接字端口获取流数据信息并处理。
黑名单过滤是获取套接字流发送的名单信息,通过指定的黑名单信息对其进行过滤,输出结果为没有黑名单的名单信息。

任务描述


本关任务:本关是利用套接字流监听方法,监听名单信息并过滤黑名单信息。首先需要模拟名单的生成,首先需要建一个文档,每行为一个姓名。然后编写代码,当有指定套接字连接产生时,从文件中依次选取所有名单,发送给套接字端口。另外在编写代码,通过连接套接字端口,监听端口的数据,获取发送的名单,并过滤黑名单。需用提供的文档文件内的名单来完成此实验。名单文档内容如下:
Jim
Mary
Tom
Jack
Abby
Bee
Belle
Babs
Carla
Dale
Dan
Gary
Ken
Jane
Paige

相关知识


1.名单生成器的创建


代码依次获取指定文件中的一个名单。内部是首先获取指定名单文件,然后指定监听端口,然后等待连接建立,连接建立后依次获取一个名单总个数内的整数,并索引到名单,将其发送至套接字端口。

  1. import java.io.{PrintWriter}
  2. import java.net.ServerSocket
  3. import scala.io.Source
  4. object NameProducer {
  5.   def main(args: Array[String]): Unit = {
  6.     //获取指定文件
  7.     val filename = "/root/data/Namelist"
  8.     //指定文件按行切分为list
  9.     val lines = Source.fromFile(filename).getLines.toList
  10.     //获取list长度
  11.     val filerow = lines.length
  12.     //指定监听窗口,当外部程序请求时建立连接
  13.     val listener = new ServerSocket(5566)
  14.       //等待socket连接成功
  15.       println("bengin11111111111!!!")
  16.       val socket = listener.accept()
  17.       new Thread() {
  18.         override def run = {
  19.           println("Got client connected from: " + socket.getInetAddress)
  20.           //创建写入socket对象
  21.           val out = new PrintWriter(socket.getOutputStream(), true)
  22.           println("bengin222222222222222222!!!")
  23.             for (i <- 0 to filerow-1) {
  24.               //逐行取一个名字
  25.               val content = lines(i)
  26.               println(content)
  27.               //写入socket
  28.               out.write(content + '\n')
  29.               //清空缓冲区数据
  30.               out.flush()
  31.             }
  32.           socket.close()
  33.         }
  34.       }.start()
  35.   }
  36. }

2.监听套接字端口过滤黑名单


首先要设置需要过滤的黑名单:

val BlackList = Array("name1", "name2")

接着把名单转换成RDD,然后连接套接字端口,连接端口代码如下:

ssc.socketTextStream("localhost", 5566)

编程要求


根据提示,补充监听套接字并过滤黑名单代码文TransformBlackList。

测试说明


平台会对你编写的代码进行测试:
测试输入:自动获取指定的Namelist文件。
预期输出:

Jim
Tom
Jack
Bee
Belle
Babs
Dale
Dan
Gary
Jane

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.StreamingContext
  3. import org.apache.spark.streaming.Seconds
  4. object TransformBlackList {
  5. def main(args: Array[String]): Unit = {
  6. /********** Begin **********/
  7. // 初始化
  8. val sparkConf = new SparkConf().setAppName("TransformBlackList").setMaster("local[2]")
  9. // 创建StreamingContext,设置每五秒刷新一次
  10. val ssc = new StreamingContext(sparkConf, Seconds(5))
  11. ssc.sparkContext.setLogLevel("ERROR")
  12. // 设置需要过滤的黑名单(Abby、 Paige、 Carla、 Mary、 Ken)
  13. val BlackList = Array("Abby", "Paige", "Carla", "Mary", "Ken")
  14. // 把黑名单数组转换成rdd
  15. val BlackListRdd = ssc.sparkContext.parallelize(BlackList)
  16. // 设置主机名localhost,端口号5566
  17. val NameList = ssc.socketTextStream("localhost", 5566)
  18. // 过滤黑名单算法
  19. val ValidName = NameList.transform(rdd => {
  20. rdd.filter(name => !BlackList.contains(name))
  21. })
  22. ValidName.print()
  23. ssc.start()
  24. // 等待足够长的时间以触发至少一次计算
  25. Thread.sleep(5000)
  26. ssc.stop(false, false)
  27. /********** End **********/
  28. }
  29. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/674506
推荐阅读
相关标签
  

闽ICP备14008679号