当前位置:   article > 正文

Spark Streaming编程实战(开发实例)_sparkstreaming案例实操广告运行结果截图

sparkstreaming案例实操广告运行结果截图

本节介绍如何编写 Spark Streaming 应用程序,由简到难讲解使用几个核心概念来解决实际应用问题。

流数据模拟器

在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境,首先需要定义流数据模拟器。该模拟器的主要功能是通过 Socket 方式监听指定的端口号,当外部程序通过该端口进行连接并请求数据时,模拟器将定时将指定的文件数据进行随机获取,并发送给外部程序。

流数据模拟器的代码如下。

  1. import java.io.{PrintWriter}
  2. import java.net.ServerSocket
  3. import scala.io.Source
  4.  
  5. object StreamingSimulation {
  6.  
  7. //定义随机获取整数的方法
  8. def index(length:Int) = {
  9. import java.util.Random
  10. val rdm = new Random
  11. rdm.nextInt(length)
  12. }
  13.  
  14. def main(args: Array[String]) {
  15. //调用该模拟器需要 3 个参数,分别为文件路径、端口号和间隔时间(单位为毫秒)
  16. if (args.length != 3) {
  17. System.err.printIn(“Usage:<filename> <port><millisecond>”)
  18. System.exit(1)
  19. }
  20.  
  21. //获取指定文件总的行数
  22. val filename = args(0)
  23. val lines = Source.fromFile(filename).getLines.toList
  24. val filerow = lines.length
  25.  
  26. //指定监听某端口,当外部程序请求时建立连接
  27. val listener = new ServerSocket(args(1).toInt)
  28. while (true) {
  29. val socket = listener.accept()
  30. new Thread() {
  31. override def run = {
  32. printIn(“Got client connected from: “ + socket.getInetAddress)
  33. val out = new PrintWriter(socket.getOutputStream(), true)
  34. while (true) {
  35. Thread.sleep(args(2).toLong)
  36. //当该端口接受请求时,随机获取某行数据发送给对方
  37. val content = lines(index(filerow))
  38. printIn(content)
  39. out.write(content + ‘n‘) out.flush()
  40. }
  41. socket.close()
  42. }
  43. }.start()
  44. }
  45. }
  46. }

在 IDEA 开发环境打包配置界面中:

  • 首先需要在 ClassPath 加入 Jar 包(/app/scala-2.10.4/lib/scala—swing.jar/app/scala—2.10.4/lib/scala—library.jar/app/scala—2.10.4/lib/scala—actors.jar)。
  • 然后单击“Build”→“Build Artifacts”,选择“Build”或者“Rebuild”动作。
  • 最后使用以下命令复制打包文件到 Spark 根目录下。

cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_j

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/975670
推荐阅读
相关标签
  

闽ICP备14008679号