当前位置:   article > 正文

大数据篇:Spark入门第一个Spark应用程序详解:WordCount_头歌通过spark api编写一个独立应用程序

头歌通过spark api编写一个独立应用程序

任务要求

编写一个Spark应用程序,对某个文件中的单词进行词频统计。

备注:本文spark的根目录名:spark-1.6.3-bin-hadoop2.6

  1. #准备工作
  2. cd /usr/local/spark-1.6.3-bin-hadoop2.6
  3. mkdir mycode
  4. cd mycode
  5. mkdir wordcount
  6. cd wordcount
  7. #新建一个包含了一些语句的文本文件word.txt
  8. vi word.txt

在spark-shell中执行词频统计

【1】启动spark-shell

  1. cd /usr/local/spark-1.6.3-bin-hadoop2.6
  2. ./bin/spark-shell
  3. ....
  4. scala>
  5. #建议配置系统环境变量,会方便许多哦!
  • 加载本地文件

在开始具体词频统计代码之前,需要解决一个问题,就是如何加载文件?

要注意,文件可能位于本地文件系统中,也有可能存放在分布式文件系统HDFS中,所以,下面我们分别介绍如何加载本地文件,以及如何加载HDFS中的文件。
首先,请在第二个终端窗口下操作,用下面命令到达“/usr/local/spark/mycode/wordcount”目录,查看一下上面已经建好的word.txt的内容:

  1. cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount
  2. cat word.txt ##cat命令会把word.txt文件的内容全部显示到屏幕上。

执行结果:

  1. [root@master ~]# cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount
  2. [root@master wordcount]# cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount
  3. [root@master wordcount]# cat word.txt
  4. Hello,Spark
  5. Hello,master
  6. Hello,slave1
  7. Hello,slave2

切换回到第一个终端

scala> val textFile = sc.textFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt")

val后面的是变量textFile,而sc.textFile()中的这个textFile是sc的一个方法名称,这个方法用来加载文件数据。这两个textFile不是一个东西,不要混淆。实际上,val后面的是变量textFile,你完全可以换个变量名称。

如,val lines = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)。这里使用相同名称,就是有意强调二者的区别。注意,要加载本地文件,必须采用“file:///”开头的这种格式。执行上上面这条命令以后,并不会马上显示结果,因为,Spark采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。所以,下面我们执行一条“行动”类型的语句,就可以看到结果

scala> textFile.first()

first()是一个“行动”(Action)类型的操作,会启动真正的计算过程,从文件中加载数据到变量textFile中,并取出第一行文本。屏幕上会显示很多反馈信息,这里不再给出,你可以从这些结果信息中,找到word.txt文件中的第一行的内容。
正因为Spark采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句,spark-shell也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来

  1. val textFile = sc.textFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word123.txt")
  2. #注:上面我们使用了一个根本就不存在的word123.txt,执行上面语句时,spark-shell根本不会报错,因为,没有遇到“行动”类型的first()操作之前,这个加载操作时不会真正执行的。然后,我们执行一个“行动”类型的操作first()
  3. scala> textFile.first()
  4. #注:执行上面语句后,你会发现,会返回错误信息,其中有四个醒目的中文文字“拒绝连接”,因为,这个word123.txt文件根本就不存在

下面练习一下如何把textFile变量中的内容再次写回到另外一个文本文件wordback.txt中?

  1. val textFile = sc.textFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt")
  2. textFile.saveAsTextFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/writeback")

上面的saveAsTextFile()括号里面的参数是保存文件的路径,不是文件名。saveAsTextFile()是一个“行动”(Action)类型的操作,所以,马上会执行真正的计算过程,从word.txt中加载数据到变量textFile中。

  1. ##另一个终端查看回写的内容
  2. cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/writeback/
  3. ls
  4. cat part-00000 ##查看写入的内容

执行结果:

  1. [root@master wordcount]# cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/writeback/
  2. [root@master writeback]# ls
  3. part-00000 _SUCCESS
  4. [root@master writeback]# cat part-00000
  5. Hello,Spark
  6. Hello,master
  7. Hello,slave1
  8. Hello,slave2
cat part-00000 ##查看写入的内容
  • 加载HDFS中的文件

为了能够读取HDFS中的文件,请首先启动Hadoop中的HDFS组件。注意,之前我们在“Spark安装”这章内容已经介绍了如何安装Hadoop和Spark,所以,这里我们可以使用以下命令直接启动Hadoop中的HDFS组件(由于用不到MapReduce组件,所以,不需要启动MapReduce或者YARN)。请到第二个终端窗口,使用Linux Shell命令提示符状态,然后输入下面命令:

  1. cd /usr/local/hadoop
  2. ./sbin/start-dfs.sh
  3. #我添加了环境变量比较方便

Shell 命令

启动结束后,HDFS开始进入可用状态。如果你在HDFS文件系统中,还没有为当前Linux登录用户创建目录(我这里创建的名字叫root的用户),请使用下面命令创建:

./bin/hdfs dfs -mkdir -p /user/root

也就是说,HDFS文件系统为Linux登录用户开辟的默认目录是“/user/用户名”(注意:是user,不是usr),本教程统一使用用户名hadoop登录Linux系统,所以,上面创建了“/user/root”目录,再次强调,这个目录是在HDFS文件系统中,不在本地文件系统中。创建好以后,下面我们使用命令查看一下HDFS文件系统中的目录和文件:

./bin/hdfs dfs -ls .

上面命令中,最后一个点号“.”,表示要查看Linux当前登录用户hadoop在HDFS文件系统中与hadoop对应的目录下的文件,也就是查看HDFS文件系统中“/user/root/”目录下的文件,所以,下面两条命令是等价的:

  1. ./bin/hdfs dfs -ls .
  2. ./bin/hdfs dfs -ls /user/root

如果要查看HDFS文件系统根目录下的内容,需要使用下面命令:

./bin/hdfs dfs -ls /

下面,我们把本地文件系统中的“/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt”上传到分布式文件系统HDFS中(放到hadoop用户目录下):

./bin/hdfs dfs -put /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt .

然后,用命令查看一下HDFS的hadoop用户目录下是否多了word.txt文件,可以使用下面命令列出hadoop目录下的内容:

./bin/hdfs dfs -ls .

可以看到,确实多了一个word.txt文件,我们使用cat命令查看一个HDFS中的word.txt文件的内容,命令如下:

./bin/hdfs dfs -cat ./word.txt

上面命令执行后,就会看到HDFS中word.txt的内容了。

 

现在,让我们切换回到spark-shell窗口,编写语句从HDFS中加载word.txt文件,并显示第一行文本内容:

  1. scala> val textFile = sc.textFile("hdfs://localhost:9000/user/root/word.txt")
  2. scala> textFile.first()

执行上面语句后,就可以看到HDFS文件系统中(不是本地文件系统)的word.txt的第一行内容了。

备注:我这里通过master:9000连接不通,hdfs拒绝连接,所以我才用了下面等价的三条替换了需要9000端口的表达!

需要注意的是,sc.textFile(“hdfs://localhost:9000/user/root/word.txt”)中,“hdfs://localhost:9000/”是前面介绍Hadoop安装内容时确定下来的端口地址9000。实际上,也可以省略不写,如下三条语句都是等价的:

  1. val textFile = sc.textFile("hdfs://localhost:9000/user/root/word.txt")
  2. val textFile = sc.textFile("/user/root/word.txt")
  3. val textFile = sc.textFile("word.txt")

下面,我们再把textFile的内容写回到HDFS文件系统中(写到root用户目录下):

  1. scala> val textFile = sc.textFile("word.txt")
  2. scala> textFile.saveAsTextFile("writeback")

执行上面命令后,文本内容会被写入到HDFS文件系统的“/user/root/writeback”目录下,我们可以切换到Linux Shell命令提示符窗口另一个终端查看一下:

  1. ./bin/hdfs dfs -ls .
  2. #执行上述命令后,在执行结果中,可以看到有个writeback目录,下面我们查看该目录下有什么文件:
  3. ./bin/hdfs dfs -ls ./writeback

执行结果中,可以看到存在两个文件:part-00000和_SUCCESS。我们使用下面命令输出part-00000文件的内容(注意:part-00000里面有五个零):

./bin/hdfs dfs -cat ./writeback/part-00000

执行结果中,就可以看到和word.txt文件中一样的文本内容,结果如下:

  1. [root@master ~]# hdfs dfs -ls .
  2. Found 2 items
  3. -rw-r--r-- 2 root supergroup 51 2018-11-03 14:13 word.txt
  4. drwxr-xr-x - root supergroup 0 2018-11-03 14:30 writeback
  5. [root@master ~]# hdfs dfs -ls ./writeback
  6. Found 2 items
  7. -rw-r--r-- 2 root supergroup 0 2018-11-03 14:30 writeback/_SUCCESS
  8. -rw-r--r-- 2 root supergroup 51 2018-11-03 14:30 writeback/part-00000
  9. [root@master ~]# hdfs dfs -cat ./writeback/part-00000
  10. Hello,Spark
  11. Hello,master
  12. Hello,slave1
  13. Hello,slave2
  • 【2】词频统计

有了前面的铺垫性介绍,下面我们就可以开始第一个Spark应用程序:WordCount。
请切换到spark-shell窗口:

  1. scala> val textFile = sc.textFile("file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt")
  2. scala> val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
  3. scala> wordCount.collect()

上面只给了代码,省略了执行过程中返回的结果信息,因为返回信息很多。
下面简单解释一下上面的语句:

textFile包含了多行文本内容,textFile.flatMap(line => line.split(” “))会遍历textFile中的每行文本内容,当遍历到其中一行文本内容时,会把文本内容赋值给变量line,并执行Lamda表达式line => line.split(” “)。line => line.split(” “)是一个Lamda表达式,左边表示输入参数,右边表示函数里面执行的处理逻辑,这里执行line.split(” “),也就是针对line中的一行文本内容,采用空格作为分隔符进行单词切分,从一行文本切分得到很多个单词构成的单词集合。这样,对于textFile中的每行文本,都会使用Lamda表达式得到一个单词集合,最终,多行文本,就得到多个单词集合。textFile.flatMap()操作就把这多个单词集合“拍扁”得到一个大的单词集合。

然后,针对这个大的单词集合,执行map()操作,也就是map(word => (word, 1)),这个map操作会遍历这个集合中的每个单词,当遍历到其中一个单词时,就把当前这个单词赋值给变量word,并执行Lamda表达式word => (word, 1),这个Lamda表达式的含义是,word作为函数的输入参数,然后,执行函数处理逻辑,这里会执行(word, 1),也就是针对输入的word,构建得到一个tuple,形式为(word,1),key是word,value是1(表示该单词出现1次)。

程序执行到这里,已经得到一个RDD,这个RDD的每个元素是(key,value)形式的tuple。最后,针对这个RDD,执行reduceByKey((a, b) => a + b)操作,这个操作会把所有RDD元素按照key进行分组,然后使用给定的函数(这里就是Lamda表达式:(a, b) => a + b),对具有相同的key的多个value进行reduce操作,返回reduce后的(key,value),比如(“hadoop”,1)和(“hadoop”,1),具有相同的key,进行reduce以后就得到(“hadoop”,2),这样就计算得到了这个单词的词频。

  • 【3】编写独立应用程序执行词频统计

下面我们编写一个Scala应用程序来实现词频统计。
请登录Linux系统(本教程统一采用用户名hadoop进行登录),进入Shell命令提示符状态,然后,执行下面命令:

  1. cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/
  2. mkdir -p src/main/scala //这里加入-p选项,可以一起创建src目录及其子目录

请在“/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/src/main/scala”目录下新建一个test.scala文件,里面包含如下代码:

  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkContext._
  3. import org.apache.spark.SparkConf
  4. object WordCount {
  5. def main(args: Array[String]) {
  6. val inputFile = "file:///usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/word.txt"
  7. val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
  8. val sc = new SparkContext(conf)
  9. val textFile = sc.textFile(inputFile)
  10. val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
  11. wordCount.foreach(println)
  12. }
  13. }

注意,SparkConf().setAppName(“WordCount”).setMaster(“local[2]”)这句语句,也可以删除.setMaster(“local[2]”),只保留 val conf = new SparkConf().setAppName(“WordCount”)。
如果test.scala没有调用SparkAPI,那么,只要使用scalac命令编译后执行即可。但是,这个test.scala程序依赖 Spark API,因此我们需要通过 sbt 进行编译打包(“Spark-shell的测试及Scala独立应用程序的编写与sbt打包”这部分已经介绍过如何使用sbt进行编译打包)。下面再编译一次。

请执行如下命令:

  1. cd /usr/local/spark/mycode/wordcount/
  2. vi simple.sbt

通过上面代码,新建一个simple.sbt文件,请在该文件中输入下面代码:

下面是我的spark及scala版本:

  1. name := "Simple Project"
  2. version := "1.0"
  3. scalaVersion := "2.10.5"
  4. libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.3"

注意, “org.apache.spark”后面是两个百分号,千万不要少些一个百分号%,如果少了,编译时候会报错。
下面我们使用 sbt 打包 Scala 程序。为保证 sbt 能正常运行,先执行如下命令检查整个应用程序的文件结构:

  1. cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/
  2. find .

应该是类似下面的文件结构:

  1. .
  2. ./src
  3. ./src/main
  4. ./src/main/scala
  5. ./src/main/scala/test.scala
  6. ./simple.sbt
  7. ./word.txt

接着,我们就可以通过如下代码将整个应用程序打包成 JAR(首次运行同样需要下载依赖包 ):

  1. cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/ //请一定把这目录设置为当前目录
  2. /usr/local/sbt/sbt package

上面执行过程需要消耗几分钟时间,屏幕上会返回一下信息:

  1. [root@master wordcount]# cd /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/
  2. [root@master wordcount]# /usr/local/sbt/sbt package
  3. Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
  4. [info] Set current project to Simple Project (in build file:/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/)
  5. [info] Updating {file:/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/}wordcount...
  6. [info] Resolving org.fusesource.jansi#jansi;1.4 ...
  7. [info] downloading https://repo1.maven.org/maven2/org/apache/avro/avro/1.7.7/avro-1.7.7.jar ...
  8. [info] [SUCCESSFUL ] org.apache.avro#avro;1.7.7!avro.jar (32854ms)
  9. [info] Done updating.
  10. [info] Compiling 1 Scala source to /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/classes...
  11. [info] Packaging /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar ...
  12. [info] Done packaging.
  13. [success] Total time: 145 s, completed 2018-11-3 14:52:13
  14. #屏幕上返回上述信息表明打包成功

生成的 jar 包的位置为 /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar。
最后,通过 spark-submit 运行程序。我们就可以将生成的 jar 包通过 spark-submit 提交到 Spark 中运行了,命令如下:

/usr/local/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class "WordCount" /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar

下面是我的word.txt进行词频统计后的结果:

Problem:

 Input path does not exist: file:/usr/local/spark/mycode/wordcount/word.txt

 解析:InputFile找不到的时候,当你修改了test.scala的时候,也不可以直接用jar来跑,你需要重新编译用sbt重新打包生成新的jar包,然后再来跑。否则用之前打包的jar,它还是之前的错误编译生成的jar,程序即使main中源程序的test.scala程序源代码修改了,也会出错。

解决:sbt重新编译生成jar,重新运行!

  1. [root@master wordcount]# /usr/local/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class "WordCount" /usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar
  2. SLF4J: Class path contains multiple SLF4J bindings.
  3. SLF4J: Found binding in [jar:file:/usr/local/spark-1.6.3-bin-hadoop2.6/lib/spark-assembly-1.6.3-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  4. SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.8.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  5. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  6. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  7. 18/11/03 15:01:36 INFO spark.SparkContext: Running Spark version 1.6.3
  8. 18/11/03 15:01:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  9. 18/11/03 15:01:36 INFO spark.SecurityManager: Changing view acls to: root
  10. 18/11/03 15:01:36 INFO spark.SecurityManager: Changing modify acls to: root
  11. 18/11/03 15:01:36 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
  12. 18/11/03 15:01:37 INFO util.Utils: Successfully started service 'sparkDriver' on port 34369.
  13. 18/11/03 15:01:37 INFO slf4j.Slf4jLogger: Slf4jLogger started
  14. 18/11/03 15:01:38 INFO Remoting: Starting remoting
  15. 18/11/03 15:01:38 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 42135.
  16. 18/11/03 15:01:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.10.251:42135]
  17. 18/11/03 15:01:38 INFO spark.SparkEnv: Registering MapOutputTracker
  18. 18/11/03 15:01:38 INFO spark.SparkEnv: Registering BlockManagerMaster
  19. 18/11/03 15:01:38 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-a907e9a7-0ec9-4a2c-84ca-4e97b450043c
  20. 18/11/03 15:01:38 INFO storage.MemoryStore: MemoryStore started with capacity 517.4 MB
  21. 18/11/03 15:01:38 INFO spark.SparkEnv: Registering OutputCommitCoordinator
  22. 18/11/03 15:01:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
  23. 18/11/03 15:01:38 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
  24. 18/11/03 15:01:38 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
  25. 18/11/03 15:01:38 INFO ui.SparkUI: Started SparkUI at http://192.168.10.251:4040
  26. 18/11/03 15:01:38 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-8dfe652a-0642-4289-9687-0ff0af307dea/httpd-bde01377-d78e-4cdc-9f6b-d95543c17cd0
  27. 18/11/03 15:01:38 INFO spark.HttpServer: Starting HTTP Server
  28. 18/11/03 15:01:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
  29. 18/11/03 15:01:38 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:33810
  30. 18/11/03 15:01:38 INFO util.Utils: Successfully started service 'HTTP file server' on port 33810.
  31. 18/11/03 15:01:38 INFO spark.SparkContext: Added JAR file:/usr/local/spark-1.6.3-bin-hadoop2.6/mycode/wordcount/target/scala-2.10/simple-project_2.10-1.0.jar at http://192.168.10.251:33810/jars/simple-project_2.10-1.0.jar with timestamp 1541228498915
  32. 18/11/03 15:01:38 INFO executor.Executor: Starting executor ID driver on host localhost
  33. 18/11/03 15:01:39 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45968.
  34. 18/11/03 15:01:39 INFO netty.NettyBlockTransferService: Server created on 45968
  35. 18/11/03 15:01:39 INFO storage.BlockManagerMaster: Trying to register BlockManager
  36. 18/11/03 15:01:39 INFO storage.BlockManagerMasterEndpoint: Registering block manager localhost:45968 with 517.4 MB RAM, BlockManagerId(driver, localhost, 45968)
  37. 18/11/03 15:01:39 INFO storage.BlockManagerMaster: Registered BlockManager
  38. 18/11/03 15:01:39 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.5 KB, free 517.3 MB)
  39. 18/11/03 15:01:39 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.2 KB, free 517.3 MB)
  40. 18/11/03 15:01:39 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:45968 (size: 14.2 KB, free: 517.4 MB)
  41. 18/11/03 15:01:39 INFO spark.SparkContext: Created broadcast 0 from textFile at test.scala:10
  42. Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/usr/local/spark/mycode/wordcount/word.txt
  43. at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
  44. at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
  45. at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
  46. at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
  47. at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
  48. at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
  49. at scala.Option.getOrElse(Option.scala:120)
  50. at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
  51. at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  52. at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
  53. at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
  54. at scala.Option.getOrElse(Option.scala:120)
  55. at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
  56. at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  57. at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
  58. at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
  59. at scala.Option.getOrElse(Option.scala:120)
  60. at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
  61. at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  62. at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
  63. at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
  64. at scala.Option.getOrElse(Option.scala:120)
  65. at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
  66. at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
  67. at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
  68. at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:331)
  69. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
  70. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
  71. at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
  72. at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:330)
  73. at WordCount$.main(test.scala:11)
  74. at WordCount.main(test.scala)
  75. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  76. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  77. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  78. at java.lang.reflect.Method.invoke(Method.java:498)
  79. at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
  80. at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
  81. at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
  82. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
  83. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  84. 18/11/03 15:01:40 INFO spark.SparkContext: Invoking stop() from shutdown hook
  85. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
  86. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
  87. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
  88. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
  89. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
  90. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
  91. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
  92. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
  93. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
  94. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
  95. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
  96. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
  97. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
  98. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
  99. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
  100. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
  101. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
  102. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
  103. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
  104. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
  105. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
  106. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
  107. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
  108. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
  109. 18/11/03 15:01:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
  110. 18/11/03 15:01:40 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.10.251:4040
  111. 18/11/03 15:01:40 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
  112. 18/11/03 15:01:40 INFO storage.MemoryStore: MemoryStore cleared
  113. 18/11/03 15:01:40 INFO storage.BlockManager: BlockManager stopped
  114. 18/11/03 15:01:40 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
  115. 18/11/03 15:01:40 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
  116. 18/11/03 15:01:40 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
  117. 18/11/03 15:01:40 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
  118. 18/11/03 15:01:41 INFO spark.SparkContext: Successfully stopped SparkContext
  119. 18/11/03 15:01:41 INFO util.ShutdownHookManager: Shutdown hook called
  120. 18/11/03 15:01:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-8dfe652a-0642-4289-9687-0ff0af307dea/httpd-bde01377-d78e-4cdc-9f6b-d95543c17cd0
  121. 18/11/03 15:01:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-8dfe652a-0642-4289-9687-0ff0af307dea

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/701103
推荐阅读
相关标签
  

闽ICP备14008679号