赞
踩
前一章中我们介绍了Spark的Standalone
模式的安装. 本章我们介绍下Spark Shell操作窗口的基本的安装.
./bin
目录, 使用spark-shell
即可启动. 未链接集群, 直接启动了一个Worker结点
. 可以通过 http://localhost:4040 进行访问.localhost:bin Sean$ spark-shell Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/03/29 17:15:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/03/29 17:15:01 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.31.80 instead (on interface en0) 19/03/29 17:15:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Spark context Web UI available at http://192.168.31.80:4040 Spark context available as 'sc' (master = local[*], app id = local-1553850902335). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/
Using Scala version 2.11.8 (Java HotSpot™ 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
spark-shell --master spark://localhost:7077 --total-executor-cores 1 --executor-memory 1g
localhost:bin Sean$ spark-shell --master spark://localhost:7077 --total-executor-cores 1 --executor-memory 1g Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/03/30 15:25:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://127.0.0.1:4040 Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20190330152508-0001). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/
Using Scala version 2.11.8 (Java HotSpot™ 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@66d2885c
scala>
启动成功后, 我们可以在http://localhost:8080上面看到spark-shell进程.
随后,我们可以使用spark-shell
内使用Scala语言
完成一定的操作.
当我们在生产部署与发布的时候通常是使用spark-submit
脚本进行提交的.(./bin
目录下.) 我们通常是使用Maven
将程序进行打包, 随后通过spark-submit
提交进行.
(注: Maven打全码包这边就不再叙述了, 更多请看Maven 打包实战.)
localhost:bin Sean$ spark-shell --master spark://192.168.31.80:7077 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/03/29 18:11:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/03/29 18:11:38 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.31.80 instead (on interface en0) 19/03/29 18:11:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 19/03/29 18:11:39 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077 org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100) at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) ... 4 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ... 1 more 19/03/29 18:11:59 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077 org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100) at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) ... 4 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ... 1 more 19/03/29 18:12:19 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077 org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100) at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) ... 4 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ... 1 more 19/03/29 18:12:39 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 19/03/29 18:12:39 WARN StandaloneSchedulerBackend: Application ID is not initialized yet. 19/03/29 18:12:39 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master 19/03/29 18:12:40 ERROR SparkContext: Error initializing SparkContext. java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem at scala.Predef$.require(Predef.scala:224) at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91) at org.apache.spark.SparkContext.<init>(SparkContext.scala:524) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910) at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101) at $line3.$read$$iw$$iw.<init>(<console>:15) at $line3.$read$$iw.<init>(<console>:42) at $line3.$read.<init>(<console>:44) at $line3.$read$.<init>(<console>:48) at $line3.$read$.<clinit>(<console>) at $line3.$eval$.$print$lzycompute(<console>:7) at $line3.$eval$.$print(<console>:6) at $line3.$eval.$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395) at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38) at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37) at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37) at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37) at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:74) at org.apache.spark.repl.Main$.main(Main.scala:54) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem at scala.Predef$.require(Predef.scala:224) at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91) at org.apache.spark.SparkContext.<init>(SparkContext.scala:524) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910) at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101) ... 47 elided <console>:14: error: not found: value spark import spark.implicits._ ^ <console>:14: error: not found: value spark import spark.sql ^ Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/
Using Scala version 2.11.8 (Java HotSpot™ 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
解决措施: 这种问题的主要形成问题主要有如下几种:
- 检查防火墙信息, 查看到Spark的地址是否打开;
- 检查本地的Scala版本是否与远程Spark提交的Scala版本一致.(
conf/spark-env.sh文件
//etc/profile
/spark-shell直接启动时显示的版本号
)- 本地的
/etc/hosts
的配置错误.- 本地启动伪集群时, 需要将
spark-env.sh
文件与slaves
进行如下配置.# spark-env.sh export SPARK_MASTER_IP=127.0.0.1 export SPARK_LOCAL_IP=127.0.0.1 export SPARK_MASTER_PORT=7077
- 1
- 2
- 3
- 4
- 1
- 2
- 3
- 4
# slaves localhost
- 1
- 2
- 1
- 2
[1]. 关于Spark报错不能连接到Server的解决办法(Failed to connect to master master_hostname:7077)
[2]. Unable to connect to Spark master
[3]. Spark报错——AnnotatedConnectException拒绝连接
[4]. 单机spark绑定端口
localhost:bin Sean$ spark-shell --master spark://127.0.0.1:7077 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/03/30 15:23:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/03/30 15:23:55 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 19/03/30 15:23:55 ERROR Inbox: Ignoring error org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: FAILED at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:509) at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:146) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:168) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:216) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 19/03/30 15:23:55 ERROR SparkContext: Error initializing SparkContext. java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem at scala.Predef$.require(Predef.scala:224) at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91) at org.apache.spark.SparkContext.<init>(SparkContext.scala:524) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910) at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101) at $line3.$read$$iw$$iw.<init>(<console>:15) at $line3.$read$$iw.<init>(<console>:42) at $line3.$read.<init>(<console>:44) at $line3.$read$.<init>(<console>:48) at $line3.$read$.<clinit>(<console>) at $line3.$eval$.$print$lzycompute(<console>:7) at $line3.$eval$.$print(<console>:6) at $line3.$eval.$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395) at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38) at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37) at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37) at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37) at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:74) at org.apache.spark.repl.Main$.main(Main.scala:54) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem at scala.Predef$.require(Predef.scala:224) at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91) at org.apache.spark.SparkContext.<init>(SparkContext.scala:524) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910) at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101) ... 47 elided <console>:14: error: not found: value spark import spark.implicits._ ^ <console>:14: error: not found: value spark import spark.sql ^ Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/
Using Scala version 2.11.8 (Java HotSpot™ 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala> localhost:bin Sean$
- 本地硬盘 & 内存 & CPU核数资源不够时都可能造成上述问题.
spark-shell任务完成, 但是资源不够失败了.()我们可以通过-- total-executors 1
减少所占用的CPU核数与内存数目.
由于本地的机器限制, 我们这边就直接使用spark-shell
进行下面的算子操作.
bin/spark-shell
localhost:bin Sean$ ./spark-shell Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/05/25 16:26:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://127.0.0.1:4040 Spark context available as 'sc' (master = local[*], app id = local-1558772763953). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/
Using Scala version 2.11.8 (Java HotSpot™ 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
# 传输到本地服务器
hadoop fs -put / hello2019.sh /
# 传输到远程服务器
hadoop fs -put hello2019.sh hdfs://localhost:9000/
# sc 即sparkContext
scala> sc
res4: org.apache.spark.SparkContext = org.apache.spark.SparkContext@25198ead
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split("")).map((,1)).reduceByKey(+).sortBy(_._2,false).collect
res0: Array[(String, Int)] = Array((t,8), (l,3), (a,3), (i,3), (y,3), (p,2), (e,2), (c,2), (0,1), (b,1), (h,1), (2,1), (" ",1), (k,1), (o,1), (9,1), (1,1))
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split("")).map((,1)).reduceByKey(+).sortBy(_._2,false).saveAsTextFile(“hdfs://localhost:9000/wordcount/20190525/out”)
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split("")).map((,1)).reduceByKey(+,1).sortBy(_._2,false).saveAsTextFile(“hdfs://localhost:9000/wordcount/20190525-2/out”)
localhost:~ Sean$ hadoop fs -cat /wordcount/20190525/out/*
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
19/05/25 16:29:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(t,8)
(l,3)
(a,3)
(i,3)
(y,3)
(p,2)
(e,2)
(c,2)
(0,1)
(b,1)
(h,1)
(2,1)
( ,1)
(k,1)
(o,1)
(9,1)
(1,1)
localhost:~ Sean$ hadoop fs -cat /wordcount/20190525-2/out/*
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
19/05/25 16:30:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(t,8)
(a,3)
(i,3)
(y,3)
(l,3)
(e,2)
(p,2)
(c,2)
(0,1)
(k,1)
(b,1)
(h,1)
(2,1)
( ,1)
(o,1)
(9,1)
(1,1)
textFile()
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").collect
res10: Array[String] = Array(hello 2019, cat, pitty, kitty, able, pitty, cat)
map()
遍历所有的元素.
split()
拆分.
collect
搜集到主结点.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").map(_.split(" ")).collect
res12: Array[Array[String]] = Array(Array(hello, 2019), Array(cat), Array(pitty), Array(kitty), Array(able), Array(pitty), Array(cat))
flatMap()
& map().flatten
map().flatten
是Scala
内到写法, Spark
内貌似没有这样到写法.scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).collect
res17: Array[String] = Array(hello, 2019, cat, pitty, kitty, able, pitty, cat)
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).collect
res19: Array[(String, Int)] = Array((hello,1), (2019,1), (cat,1), (pitty,1), (kitty,1), (able,1), (pitty,1), (cat,1))
reduceByKey()
key
值进行划分. reduceByKey
是RDD
独有的算子, Scala
内不存在.((F),1)
的1
是什么含义, 指定分区数目.(上面的输出例子, 将结果写入1个文件还是3个文件.)scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y) => (x+y)).collect
res20: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (cat,2), (kitty,1))
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split(" ")).map((,1)).reduceByKey(+).collect
res21: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (cat,2), (kitty,1))
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split(" ")).map((,1)).reduceByKey(+,1).collect
res22: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (kitty,1), (cat,2))
reduceByKey
与 groupByKey
的区别?
reduceByKey
先在各个分片进行计算, 最后进行汇总计算. groupByKey
直接进行汇总计算.
深入理解groupByKey、reduceByKey区别——本质就是一个local machine的reduce操作
reduceByKey应用举例
sortBy(_._2,false)
Spark
上的sortBy(X,false)
, 后一个参数可以表示是生序还是降序的.
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res0: Array[(String, Int)] = Array((t,8), (l,3), (a,3), (i,3), (y,3), (p,2), (e,2), (c,2), (0,1), (b,1), (h,1), (2,1), (" ",1), (k,1), (o,1), (9,1), (1,1))
saveAsTextFile()
scala> sc.textFile("hdfs://localhost:9000/wordcount/input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://localhost:9000/wordcount/20190525/out")
[1]. 深入理解groupByKey、reduceByKey区别——本质就是一个local machine的reduce操作
[2]. reduceByKey应用举例
</div>
<link href="https://csdnimg.cn/release/phoenix/mdeditor/markdown_views-095d4a0b23.css" rel="stylesheet">
</div>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。