当前位置:   article > 正文

Spark集群安装和使用_export spark_library_path

export spark_library_path

本文主要记录 CDH5 集群中 Spark 集群模式的安装过程配置过程并测试 Spark 的一些基本使用方法。

安装环境如下:

  • 操作系统:CentOs 6.5
  • Hadoop 版本:cdh-5.3.0
  • Spark 版本:cdh5-1.2.0_5.3.0

关于 yum 源的配置以及 Hadoop 集群的安装,请参考 使用yum安装CDH Hadoop集群

1. 安装

首先查看 Spark 相关的包有哪些:

  1. $ yum list |grep spark
  2. spark-core.noarch 1.2.0+cdh5.3.0+364-1.cdh5.3.0.p0.36.el6 @cdh
  3. spark-history-server.noarch 1.2.0+cdh5.3.0+364-1.cdh5.3.0.p0.36.el6 @cdh
  4. spark-master.noarch 1.2.0+cdh5.3.0+364-1.cdh5.3.0.p0.36.el6 @cdh
  5. spark-python.noarch 1.2.0+cdh5.3.0+364-1.cdh5.3.0.p0.36.el6 @cdh
  6. spark-worker.noarch 1.2.0+cdh5.3.0+364-1.cdh5.3.0.p0.36.el6 @cdh
  7. hue-spark.x86_64 3.7.0+cdh5.3.0+134-1.cdh5.3.0.p0.24.el6 cdh

以上包作用如下:

  • spark-core: spark 核心功能
  • spark-worker: spark-worker 初始化脚本
  • spark-master: spark-master 初始化脚本
  • spark-python: spark 的 Python 客户端
  • hue-spark: spark 和 hue 集成包
  • spark-history-server

在已经存在的 Hadoop 集群中,选择一个节点来安装 Spark Master,其余节点安装 Sparl worker ,例如:在 cdh1 上安装 master,在 cdh1、cdh2、cdh3 上安装 worker:

  1. # 在 cdh1 节点上运行
  2. $ sudo yum install spark-core spark-master spark-worker spark-python spark-history-server -y
  3. # 在 cdh2、cdh3 上运行
  4. $ sudo yum install spark-core spark-worker spark-python -y

安装成功后,我的集群部署如下:

  1. cdh1节点: spark-master spark-history-server
  2. cdh2节点: spark-worker
  3. cdh3节点: spark-worker

2. 配置

2.1 修改配置文件

设置环境变量,在 .bashrc 中加入下面一行,并使其生效:

export SPARK_HOME=/usr/lib/spark

可以修改配置文件 /etc/spark/conf/spark-env.sh,其内容如下,你可以根据需要做一些修改,例如,修改 master 的主机名称。

  1. # 设置 master 主机名称
  2. export STANDALONE_SPARK_MASTER_HOST=cdh1
  3. export SPARK_MASTER_IP=$STANDALONE_SPARK_MASTER_HOST
  4. ### Let's run everything with JVM runtime, instead of Scala
  5. export SPARK_LAUNCH_WITH_SCALA=0
  6. export SPARK_LIBRARY_PATH=${SPARK_HOME}/lib
  7. export SCALA_LIBRARY_PATH=${SPARK_HOME}/lib
  8. export SPARK_MASTER_WEBUI_PORT=18080
  9. export SPARK_MASTER_PORT=7077
  10. export SPARK_WORKER_PORT=7078
  11. export SPARK_WORKER_WEBUI_PORT=18081
  12. export SPARK_WORKER_DIR=/var/run/spark/work
  13. export SPARK_LOG_DIR=/var/log/spark
  14. if [ -n "$HADOOP_HOME" ]; then
  15. export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${HADOOP_HOME}/lib/native
  16. fi
  17. export HIVE_CONF_DIR=${HIVE_CONF_DIR:-/etc/hive/conf}
  18. export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}
  19. ### Comment above 2 lines and uncomment the following if
  20. ### you want to run with scala version, that is included with the package
  21. #export SCALA_HOME=${SCALA_HOME:-/usr/lib/spark/scala}
  22. #export PATH=$PATH:$SCALA_HOME/bin

如果你和我一样使用的是虚拟机运行 spark,则你可能需要修改 spark 进程使用的 jvm 大小(关于 jvm 大小设置的相关逻辑见 /usr/lib/spark/bin/spark-class):

export SPARK_DAEMON_MEMORY=256m

修改完 cdh1 节点上的配置文件之后,需要同步到其他节点:

  1. scp -r /etc/spark/conf cdh2:/etc/spark
  2. scp -r /etc/spark/conf cdh3:/etc/spark

2.2 配置 Spark History Server

执行下面命令:

  1. $ sudo -u hdfs hadoop fs -mkdir /user/spark
  2. $ sudo -u hdfs hadoop fs -mkdir /user/spark/applicationHistory
  3. $ sudo -u hdfs hadoop fs -chown -R spark:spark /user/spark
  4. $ sudo -u hdfs hadoop fs -chmod 1777 /user/spark/applicationHistory

在 Spark 客户端创建 /etc/spark/conf/spark-defaults.conf

cp /etc/spark/conf/spark-defaults.conf.template /etc/spark/conf/spark-defaults.conf

/etc/spark/conf/spark-defaults.conf 添加两行:

  1. spark.eventLog.dir=/user/spark/applicationHistory
  2. spark.eventLog.enabled=true

如果想 YARN ResourceManager 访问 Spark History Server ,则添加一行:

spark.yarn.historyServer.address=http://HISTORY_HOST:HISTORY_PORT

最后,spark-defaults.conf 内容如下:

  1. spark.master=spark://cdh1:7077
  2. spark.eventLog.dir=/user/spark/applicationHistory
  3. spark.eventLog.enabled=true
  4. spark.yarn.historyServer.address=http://cdh1:19888

Spark History Server 中的 spark.history.provider 参数默认配置为 org.apache.spark.deploy.history.FsHistoryProvider 时,需要配置 spark.history.fs.logDirectory 参数,该参数在 spark-env.sh 中添加 SPARK_HISTORY_OPTS 环境变量:

  1. #这里配置的是本地目录,也可以改为 hdfs 上的目录
  2. export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=/var/log/spark"

如果,集群配置了 kerberos ,则还需要开启 kerberos 认证,涉及到下面三个参数:

  • spark.history.kerberos.enabled:是否开启 kerberos 认证
  • spark.history.kerberos.principal:HistoryServer 的 kerberos 主体名称,注意:这里直接使用机器的 hostname 而不要使用 _HOST
  • spark.history.kerberos.keytab:HistoryServer 的kerberos keytab文件位置

另外,还开启了 spark.history.ui.acls.enable (授权用户查看应用程序信息的时候是否检查acl),在 spark-env.sh 中继续添加:

  1. HOSTNAME=`hostname -f`
  2. export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.kerberos.enabled=true -Dspark.history.kerberos.principal=spark/${HOSTNAME}@LASHOU.COM -Dspark.history.kerberos.keytab=/etc/spark/conf/spark.keytab -Dspark.history.ui.acls.enable=true"

关于 SPARK_HISTORY_OPTS 中的更多可配置的参数,见 http://spark.apache.org/docs/latest/monitoring.html

3. 启动和停止

使用系统服务管理集群

启动脚本:

  1. # 在 cdh1 节点上运行
  2. $ sudo service spark-master start
  3. # 在 cdh1 节点上运行,如果 hadoop 集群配置了 kerberos,则运行之前需要先获取 spark 用户的凭证
  4. # kinit -k -t /etc/spark/conf/spark.keytab spark/cdh1@JAVACHEN.COM
  5. $ sudo service spark-history-server start
  6. # 在 cdh1、cdh2、cdh3 节点上运行
  7. $ sudo service spark-worker start

停止脚本:

  1. $ sudo service spark-master stop
  2. $ sudo service spark-worker stop
  3. $ sudo service spark-history-server stop

当然,你还可以设置开机启动:

  1. $ sudo chkconfig spark-master on
  2. $ sudo chkconfig spark-worker on
  3. $ sudo chkconfig spark-history-server on

运行日志保存在 /var/log/spark,如果你配置了 kerberos,则 /var/log/spark/spark-history-server.out 日志如下:

  1. 15/04/02 11:31:56 INFO HistoryServer: Registered signal handlers for [TERM, HUP, INT]
  2. 15/04/02 11:31:57 INFO UserGroupInformation: Login successful for user spark/cdh1@JAVACHEN.COM using keytab file /etc/spark/conf/spark.keytab
  3. 15/04/02 11:31:57 INFO SecurityManager: Changing view acls to: spark
  4. 15/04/02 11:31:57 INFO SecurityManager: Changing modify acls to: spark
  5. 15/04/02 11:31:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark)
  6. 15/04/02 11:31:58 WARN Utils: Service could not bind on port 18080. Attempting port 18081.
  7. 15/04/02 11:31:58 INFO Utils: Successfully started service on port 18081.
  8. 15/04/02 11:31:58 INFO HistoryServer: Started HistoryServer at http://cdh1:18081

你可以通过 http://cdh1:18080/ 访问 spark master 的 web 界面。

spark-master-web-ui

Ports Used by Spark:

  • 7077 – Default Master RPC port
  • 7078 – Default Worker RPC port
  • 18080 – Default Master web UI port
  • 18081 – Default Worker web UI port

注意:
我这里使用的是 CDH 版本的 Spark,spark master UI 的端口为18080,不是 Apache Spark 的 8080 端口。

使用 spark 自带脚本管理集群

另外,你也可以使用 spark 自带的脚本来启动和停止,这些脚本在 /usr/lib/spark/sbin 目录下:

  1. $ ls /usr/lib/spark/sbin
  2. slaves.sh spark-daemons.sh start-master.sh stop-all.sh
  3. spark-config.sh spark-executor start-slave.sh stop-master.sh
  4. spark-daemon.sh start-all.sh start-slaves.sh stop-slaves.sh

这时候,还需要修改 /etc/spark/conf/slaves 文件:

  1. # A Spark Worker will be started on each of the machines listed below.
  2. cdh1
  3. cdh2
  4. cdh3

然后,你也可以通过下面脚本启动 master:

  1. $ cd /usr/lib/spark/sbin
  2. $ ./start-master.sh

通过下面命令启动所有节点上的 worker:

$ ./start-slaves.sh

当然,你也可以通过下面方式启动:

$ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://cdh1:18080

4. 测试

4.1 运行测试例子

你可以在官方站点查看官方的例子。 除此之外,Spark 在发布包的 examples 的文件夹中包含了几个例子( ScalaJava、Python)。运行 Java 和 Scala 例子时你可以传递类名给 Spark 的 bin/run-example脚本, 例如:

  1. $ ./bin/run-example SparkPi 10
  2. 15/04/02 11:45:50 INFO SecurityManager: Changing view acls to: root
  3. 15/04/02 11:45:50 INFO SecurityManager: Changing modify acls to: root
  4. 15/04/02 11:45:50 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
  5. Pi is roughly 3.141808

说明:以上省略了一些日志,请注意观察日志中的 SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 这段输出:运行用户为 root,而不是当前获取 kerberos 凭证的用户。

pyspark

通过 Python API 来运行交互模式:

  1. # 使用2个 Worker 线程本地化运行 Spark(理想情况下,该值应该根据运行机器的 CPU 核数设定)
  2. $ ./bin/pyspark --master local[2]
  3. Python 2.6.6 (r266:84292, Nov 22 2013, 12:16:22)
  4. [GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
  5. Type "help", "copyright", "credits" or "license" for more information.
  6. Welcome to
  7. ____ __
  8. / __/__ ___ _____/ /__
  9. _\ \/ _ \/ _ / __/ _/
  10. /__ / .__/\_,_/_/ /_/\_\ version 1.2.0
  11. /_/
  12. Using Python version 2.6.6 (r266:84292, Nov 22 2013 12:16:22)
  13. SparkContext available as sc.
  14. >>>

spark shell

你还可以运行 spark shell 的交互模式,在 spark shell 中,已经自动引入了 SparkContext 变量 sc:

spark-shell 不添加 master 参数情况下,默认是本地运行模式,例如,创建一个 RDD:

  1. $ ./bin/spark-shell --master local[2]
  2. 15/04/02 11:49:26 INFO SecurityManager: Changing view acls to: root
  3. 15/04/02 11:49:26 INFO SecurityManager: Changing modify acls to: root
  4. 15/04/02 11:49:26 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
  5. 15/04/02 11:49:26 INFO HttpServer: Starting HTTP Server
  6. 15/04/02 11:49:26 INFO Utils: Successfully started service 'HTTP class server' on port 47031.
  7. Welcome to
  8. ____ __
  9. / __/__ ___ _____/ /__
  10. _\ \/ _ \/ _ / __/ _/
  11. /___/ .__/\_,_/_/ /_/\_\ version 1.2.0
  12. /_/
  13. Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_45)
  14. scala> val textFile = sc.textFile("README.md")
  15. textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

说明:因为我这里部署的是基于 HDFS 的 Spark 集群模式,textFile 方法中的文件为 HDFS 上的路径,故需要将 README.md 上传到 HDFS 。

RDD 有一些转换和动作(请参考 Spark编程指南笔记),下面执行一些动作:

  1. scala> textFile.count() // Number of items in this RDD
  2. res0: Long = 126
  3. scala> textFile.first() // First item in this RDD
  4. res1: String = # Apache Spark

下面运行 filter 转换返回一个新的 RDD:

  1. scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
  2. linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

上面的代码可以连起来:

  1. scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
  2. res3: Long = 15

spark-shell 后面设置 master 参数,可以支持更多的模式,请参考 http://spark.apache.org/docs/latest/submitting-applications.html#master-urls

运行 spark-shell --help 可以查看更多的参数,例如:spark-shell 连接到 master 运行:

$ spark-shell --master spark://cdh1:7077 

也可以设置运行核数和增加 jars 参数:

$ spark-shell --master spark://cdh1:7077 --cores 2 --jars code.jar

spark-shell 在 yarn 集群上以客户端方式运行:

$ spark-shell --deploy-mode client --master yarn

需要说明的是,Standalone mode does not support talking to a kerberized HDFS,如果你以 spark-shell --master spark://cdh1:7077 方式访问安装有 kerberos 的 HDFS 集群上访问数据时,会出现下面异常:

  1. 15/04/02 11:58:32 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
  2. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, bj03-bi-pro-hdpnamenn): java.io.IOException: Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: "cdh1/192.168.56.121"; destination host is: "192.168.56.121":8020;
  3. org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
  4. org.apache.hadoop.ipc.Client.call(Client.java:1415)
  5. org.apache.hadoop.ipc.Client.call(Client.java:1364)
  6. org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
  7. com.sun.proxy.$Proxy17.getBlockLocations(Unknown Source)

至于具体原因,待以后根据源码进行分析和追踪;Spark 什么版本的 Standalone 模式会支持访问配置有 kerberos 的 HDFS 集群呢?如果谁知道这两点,欢迎告诉我!

另外,如果 Spark 运行在 YARN 之上,是可以访问配置有 kerberos 的 HDFS 集群的,命令为:

$ spark-shell --deploy-mode client --master yarn

spark-submit

对于 python 程序,我们可以直接使用 spark-submit:

  1. $ mkdir -p /usr/lib/spark/examples/python
  2. $ tar zxvf /usr/lib/spark/lib/python.tar.gz -C /usr/lib/spark/examples/python
  3. $ ./bin/spark-submit examples/python/pi.py 10

对于 Java 程序,我们需要先编译代码然后打包运行:

$ spark-submit --class "SimpleApp" --master local[4] simple-project-1.0.jar

4.2 在集群上运行

Spark 目前支持三种集群管理模式:

  • Standalone – 即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。
  • Apache Mesos – 这是很多公司采用的模式,官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。
  • Hadoop YARN – 这是一种最有前景的部署模式。

另外 Spark 的 EC2 launch scripts 可以帮助你容易地在Amazon EC2上启动standalone cluster.

  • 在集群不是特别大,并且没有 mapReduce 和 Spark 同时运行的需求的情况下,用 Standalone 模式效率最高。
  • Spark可以在应用间(通过集群管理器)和应用中(如果一个 SparkContext 中有多项计算任务)进行资源调度。

Standalone 模式

该模式,包括一个 Spark master 进程和多个 Spark worker 进程,可单独部署到一个集群中,也可以部署到一个节点,以方便测试。该模式无需依赖任何其他资源管理系统,如 Yarn、Mesos。

提交应用

接下来,你可以使用下面的交互式命令连接到集群:

$ spark-shell --master spark://cdh1:7077

也可以使用 spark-submit script 脚本来提交一个 Spark 应用到集群。对于 Standalone 模式,Spark 当前支持两种部署模式,一种是 client 模式, driver 程序在客户端提交应用的进程中启动;一种是 cluster 模式, driver 程序在集群中的 一个 worker 进程中启动,客户端程序在提交任务之后就立即退出。

如果你的应用是通过 Spark submit 启动,则应用程序 jar 包 会自动分发到各个 worker 节点。对于你的应用依赖的额外 jar 包,你需要通过 --jars参数来定义,例如:--jars jar1,jar2,更多参数定义,请参考 Spark Configuration

另外,Standalone 的 cluster 模式支持你的应用在以非 0 状态退出后自动重启。为了使用该特性,你需要添加 --supervise 参数到 spark-submit 之后。这时候,如果你愿意,你可以杀掉运行失败的任务:

$ ./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

Driver ID 可以从 Master web UI 查看。

资源调度

Standalone 模式目前仅仅支持 FIFO 的资源调度器。为了控制多个并发的用户,你可以设置每一个应用使用的最大资源。默认情况下, 每次启动一个时,其会使用一个集群中的所有 core。你可以设置 spark.cores.max 参数来控制应用使用的 core 数:

  1. val conf = new SparkConf()
  2. .setMaster(...)
  3. .setAppName(...)
  4. .set("spark.cores.max", "10")
  5. val sc = new SparkContext(conf)

另外,你也可以在 conf/spark-env.sh 配置一个全局的默认参数 spark.deploy.defaultCores

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
高可用

请参考 https://spark.apache.org/docs/latest/spark-standalone.html#high-availability

测试

你可以通过 spark-shel l 运行下面的 wordcount 例子:

  1. $ echo "hello world" >test.txt
  2. $ hadoop fs -put test.txt /tmp
  3. $ spark-shell --master spark://cdh1:7077
  4. scala> val file = sc.textFile("hdfs://cdh1:8020/tmp/test.txt")
  5. scala> file.count()

如果出现下面异常,可能是因为 系统可用内存不够

/usr/lib/spark/bin/spark-shell: line 48:  5385 Killed                  "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"

运行过程中,还可能会出现下面的错误:

  1. 14/10/24 14:51:40 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
  2. 14/10/24 14:51:40 ERROR lzo.GPLNativeCodeLoader: Could not load native gpl library
  3. java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
  4. at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738)
  5. at java.lang.Runtime.loadLibrary0(Runtime.java:823)
  6. at java.lang.System.loadLibrary(System.java:1028)
  7. at com.hadoop.compression.lzo.GPLNativeCodeLoader.<clinit>(GPLNativeCodeLoader.java:32)
  8. at com.hadoop.compression.lzo.LzoCodec.<clinit>(LzoCodec.java:71)
  9. at java.lang.Class.forName0(Native Method)
  10. at java.lang.Class.forName(Class.java:249)
  11. at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1836)
  12. at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1801)
  13. at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)

该异常的解决方法可以参考 Spark连接Hadoop读取HDFS问题小结 这篇文章。

解决方法:

  1. cp /usr/lib/hadoop/lib/native/libgplcompression.so $JAVA_HOME/jre/lib/amd64/
  2. cp /usr/lib/hadoop/lib/native/libhadoop.so $JAVA_HOME/jre/lib/amd64/
  3. cp /usr/lib/hadoop/lib/native/libsnappy.so $JAVA_HOME/jre/lib/amd64/

更复杂的一个例子,运行 mapreduce 统计单词数:

  1. $ spark-shell --master spark://cdh1:7077
  2. scala> val file = sc.textFile("hdfs://cdh1:8020/tmp/data.txt")
  3. scala> val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
  4. scala> counts.saveAsTextFile("hdfs://cdh1:8020/tmp/output")

运行完成之后,你可以查看 hdfs://cdh1:8020/tmp/output 目录下的文件内容。

  1. $ hadoop fs -cat /tmp/output/part-00000
  2. (hello,1)
  3. (world,1)

使用 spark-submit 以 Standalone 模式运行 SparkPi 程序,并制定 master 参数为连接某一个 master 节点:

$ spark-submit --class org.apache.spark.examples.SparkPi  --master spark://cdh1:7077 /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar 10

Spark On Mesos 模式

参考 http://dongxicheng.org/framework-on-yarn/apache-spark-comparing-three-deploying-ways/

Spark on Yarn 模式

Spark on Yarn 模式同样也支持两种在 Yarn 上启动 Spark 的方式,一种是 cluster 模式,Spark driver 在 Yarn 的 application master 进程中运行,客户端在应用初始化完成之后就会退出;一种是 client 模式,Spark driver 运行在客户端进程中。

Spark on Yarn 模式是可以访问配置有 kerberos 的 HDFS 文件的。

以 cluster 模式启动,命令如下:

$ spark-submit --class path.to.your.Class --deploy-mode cluster --master yarn [options] <app jar> [app options]

以 client 模式启动,命令如下:

$ spark-submit --class path.to.your.Class --deploy-mode client --master yarn [options] <app jar> [app options]

举例:

  1. $ spark-submit --class org.apache.spark.examples.SparkPi \
  2. --deploy-mode cluster \
  3. --master yarn \
  4. --num-executors 3 \
  5. --driver-memory 4g \
  6. --executor-memory 2g \
  7. --executor-cores 1 \
  8. --queue thequeue \
  9. /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar \
  10. 10

注意:Apache 版本的 spark 中的启动命令为:

$ spark-submit --class path.to.your.Class --master yarn-cluster [options] <app jar> [app options]

运行在 YARN 集群之上的时候,可以手动把 spark-assembly 相关的 jar 包拷贝到 hdfs 上去,然后设置 SPARK_JAR 环境变量:

  1. $ hdfs dfs -mkdir -p /user/spark/share/lib
  2. $ hdfs dfs -put $SPARK_HOME/lib/spark-assembly.jar /user/spark/share/lib/spark-assembly.jar
  3. $ SPARK_JAR=hdfs://<nn>:<port>/user/spark/share/lib/spark-assembly.jar

5. Spark-SQL

Spark 安装包中包括了 Spark-SQL ,运行 spark-sql 命令,在 cdh5.2 中会出现下面异常:

  1. $ cd /usr/lib/spark/bin
  2. $ ./spark-sql
  3. java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
  4. at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
  5. at java.security.AccessController.doPrivileged(Native Method)
  6. at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
  7. at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
  8. at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
  9. at java.lang.Class.forName0(Native Method)
  10. at java.lang.Class.forName(Class.java:247)
  11. at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:319)
  12. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
  13. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  14. Failed to load Spark SQL CLI main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
  15. You need to build Spark with -Phive.

在 cdh5.3 中会出现下面异常:

  1. Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.cli.CliDriver
  2. at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  3. at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  4. at java.security.AccessController.doPrivileged(Native Method)
  5. at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  6. at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  7. at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  8. at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  9. ... 18 more

从上可以知道 Spark-SQL 编译时没有集成 Hive,故需要重新编译 spark 源代码。

编译 Spark-SQL

下载代码:

  1. $ git clone git@github.com:cloudera/spark.git
  2. $ cd spark
  3. $ git checkout -b origin/cdh5-1.2.0_5.3.0

编译代码,集成 yarn 和 hive,有三种方式:

$ sbt/sbt -Dhadoop.version=2.5.0-cdh5.3.0 -Pyarn -Phive assembly

等很长很长一段时间,会提示错误。

改为 maven 编译:

修改根目录下的 pom.xml,添加一行 <module>sql/hive-thriftserver</module>

  1. <modules>
  2. <module>core</module>
  3. <module>bagel</module>
  4. <module>graphx</module>
  5. <module>mllib</module>
  6. <module>tools</module>
  7. <module>streaming</module>
  8. <module>sql/catalyst</module>
  9. <module>sql/core</module>
  10. <module>sql/hive</module>
  11. <module>sql/hive-thriftserver</module> <!--添加的一行-->
  12. <module>repl</module>
  13. <module>assembly</module>
  14. <module>external/twitter</module>
  15. <module>external/kafka</module>
  16. <module>external/flume</module>
  17. <module>external/flume-sink</module>
  18. <module>external/zeromq</module>
  19. <module>external/mqtt</module>
  20. <module>examples</module>
  21. </modules>

然后运行:

  1. $ export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
  2. $ mvn -Pyarn -Dhadoop.version=2.5.0-cdh5.3.0 -Phive -Phive-thriftserver -DskipTests clean package

如果编译成功之后, 会在 assembly/target/scala-2.10 目录下生成:spark-assembly-1.2.0-cdh5.3.0.jar,在 examples/target/scala-2.10 目录下生成:spark-examples-1.2.0-cdh5.3.0.jar,然后将 spark-assembly-1.2.0-cdh5.3.0.jar 拷贝到 /usr/lib/spark/lib 目录,然后再来运行 spark-sql。

但是,经测试 cdh5.3.0 版本中的 spark 的 sql/hive-thriftserver 模块存在编译错误,最后无法编译成功,故需要等到 cloudera 官方更新源代码或者等待下一个 cdh 版本集成 spark-sql。

虽然 spark-sql 命令用不了,但是我们可以在 spark-shell 中使用 SQLContext 来运行 sql 语句,限于篇幅,这里不做介绍,你可以参考 http://www.infoobjects.com/spark-sql-schemardd-programmatically-specifying-schema/

6. 总结

本文主要介绍了 CDH5 集群中 Spark 的安装过程以及集群运行模式:

  • Standalone – spark-shell --master spark://host:port 或者 spark-shell --master local
  • Apache Mesos – spark-shell --master mesos://host:port
  • Hadoop YARN – spark-shell --master yarn

关于 Spark 的更多介绍可以参考官网或者一些中文翻译的文章

7. 参考文章

转载: http://blog.javachen.com/2014/07/01/spark-install-and-usage/
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/66422
推荐阅读
相关标签
  

闽ICP备14008679号