赞
踩
how-to-run-python-script-on-spark
写作缘由:最近想学习一下 spark MLlib,看了一下机器学习实战,其中推荐使用 python作为编程语言;另一方面 spark也提供 python api,所以想测试一下在 run python script on spark,但测试过程发现不少问题,故记录一下。
Self-Contained Applications python
测试结果:20150318
* run python script in local模式,运行正常
* run python script in spark standalone 模式,运行失败, 查看 spark1:8080上 executor状态 exited,日志提示
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.PromiseDefaultPromise.ready(Promise.scala:219)atscala.concurrent.impl.Promise DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
15/03/18 19:15:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, spark2): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.PythonRDDanonanon$1.read(PythonRDD.scala:172)atorg.apache.spark.api.python.PythonRDD 1.(PythonRDD.scala:176)atorg.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)atorg.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)atorg.apache.spark.rdd.RDD.iterator(RDD.scala:244)atorg.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)atorg.apache.spark.scheduler.Task.run(Task.scala:64)atorg.apache.spark.executor.Executor TaskRunner.run(Executor.scala:197)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:615)atjava.lang.Thread.run(Thread.java:745)Causedby:java.io.EOFExceptionatjava.io.DataInputStream.readInt(DataInputStream.java:392)atorg.apache.spark.api.python.PythonRDD anon 1.read(PythonRDD.scala:108) ... 10 more 15/03/18 19:15:36 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 2, spark2, NODE_LOCAL, 1305 bytes) 15/03/18 19:15:36 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 2) on executor spark2: org.apache.spark.SparkException (Python worker exited unexpectedly (crashed)) [duplicate 1] 15/03/18 19:15:36 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 3, spark2, NODE_LOCAL, 1305 bytes) 15/03/18 19:15:36 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 3) on executor spark2: org.apache.spark.SparkException (Python worker exited unexpectedly (crashed)) [duplicate 2] 15/03/18 19:15:36 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 4, spark2, NODE_LOCAL, 1305 bytes) 15/03/18 19:15:36 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 4) on executor spark2: org.apache.spark.SparkException (Python worker exited unexpectedly (crashed)) [duplicate 3] 15/03/18 19:15:36 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 15/03/18 19:15:36 INFO cluster.YarnScheduler: Cancelling stage 0 15/03/18 19:15:36 INFO cluster.YarnScheduler: Stage 0 was cancelled 15/03/18 19:15:36 INFO scheduler.DAGScheduler: Job 0 failed: count at /Volumes/2/data/datadir_github/spark/datadir_test/python/SimpleApp2.py:10, took 39.603014 s Traceback (most recent call last): File "/Volumes/2/data/datadir_github/spark/datadir_test/python/SimpleApp2.py", line 10, in <module> numAs = logData.filter(lambda s: 'a' in s).count() File "/data01/data/datadir_github/spark/python/pyspark/rdd.py", line 933, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/data01/data/datadir_github/spark/python/pyspark/rdd.py", line 924, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File "/data01/data/datadir_github/spark/python/pyspark/rdd.py", line 740, in reduce vals = self.mapPartitions(func).collect() File "/data01/data/datadir_github/spark/python/pyspark/rdd.py", line 701, in collect bytesInJava = self._jrdd.collect().iterator() File "/data01/data/datadir_github/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/data01/data/datadir_github/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o26.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 4, spark2): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDDanonanon$1.read(PythonRDD.scala:172)atorg.apache.spark.api.python.PythonRDD 1.<init>(PythonRDD.scala:176)atorg.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)atorg.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)atorg.apache.spark.rdd.RDD.iterator(RDD.scala:244)atorg.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)atorg.apache.spark.scheduler.Task.run(Task.scala:64)atorg.apache.spark.executor.Executor TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:615)atjava.lang.Thread.run(Thread.java:745)Causedby:java.io.EOFExceptionatjava.io.DataInputStream.readInt(DataInputStream.java:392)atorg.apache.spark.api.python.PythonRDD anon 1.read(PythonRDD.scala:108)
… 10 more
spark开发环境(略)(待整理)
如何提交 spark-app-written-in-pyton, 参考Self-Contained Applications python
cd /data01/data/datadir_github/spark/
mkdir -p datadir_test/python
1)
vi datadir_test/python/SimpleApp.py
"""SimpleApp.py"""
from pyspark import SparkContext
logFile = "file:/data01/data/datadir_github/spark/README.md" # Should be some file on your system
sc = SparkContext(appName="Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
vi datadir_test/python/SimpleApp2.py
from pyspark import SparkContext
if __name__ == "__main__":
"""SimpleApp2.py"""
logFile = "file:/data01/data/datadir_github/spark/README.md" # Should be some file on your system
sc = SparkContext(appName="Simple App2")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
cd /data01/data/datadir_github/spark/
bin/spark-submit \
--master local[2] \
datadir_test/python/SimpleApp.py
cd /data01/data/datadir_github/spark/
bin/spark-submit \
--master local[2] \
datadir_test/python/SimpleApp2.py
问题1(已解决):run python script on spark 需要使用 java6编译,使用java7编译会报错;重新使用java6编译,能够正常运行bin/pyspark
错误信息:
15/03/18 15:34:11 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, spark3): org.apache.spark.SparkException:
Error from python worker:
/usr/bin/python: No module named pyspark
PYTHONPATH was:
/home/hadoop/data/hadoop_tmp_dir/nm-local-dir/usercache/tsingfu/filecache/18/spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:86)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:123)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.ExecutorTaskRunner.run(Executor.scala:197)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)atjava.util.concurrent.ThreadPoolExecutor Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
处理方法1:调整使用 java6 重新编译
参考:
No module named pyspark - latest built
[SPARK-2172] PySpark cannot import mllib modules in YARN-client mode
这个pr已merge,不是问题;
hadoop + yarn + spark
该 mail 提到
There is an issue with PySpark-on-YARN that requires users build with
Java 6. The issue has to do with how Java 6 and 7 package jar files
differently.
使用 java6重新编译
mvn clean
mvn generate-sources
mvn package -Dhadoop.version=2.3.0-cdh5.1.0 -DskipTests -Phadoop-2.3 -Pyarn -Phive-0.13.1 -Phive-thriftserver -Pspark-ganglia-lgpl
结果:/usr/bin/python: No module named pyspark
解决,run python on spark in local mode正常运行
bin/spark-submit \
--master yarn-client \
datadir_test/python/SimpleApp2.py
bin/spark-submit \
--master yarn-cluster \
datadir_test/python/SimpleApp2.py
问题2:提示 FileNotFoundException
15/03/18 18:04:44 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, spark3): java.io.FileNotFoundException: File file:/data01/data/datadir_github/spark/README.md does not exist
处理方法1: 设置 –py-files
bin/spark-submit \
--master local[2] \
--py-files file:/data01/data/datadir_github/spark/README.md \
datadir_test/python/SimpleApp2.py
结果:问题依旧
处理方法2:文件存放到 hdfs 上
hadoop fs -put /data01/data/datadir_github/spark/README.md /user/tsingfu/
vi datadir_test/python/SimpleApp2.py
logFile = "/user/tsingfu/README.md" # Should be some file on your system
sc = SparkContext(appName="Simple App2")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
结果:FileNotFoundException 问题解决,还有其他问题
问题3: run python script in spark-on-yarn mode python版本问题
bin/spark-submit \
--master yarn-client \
datadir_test/python/SimpleApp2.py
报错:
java.io.IOException (Cannot run program “python2.7”: error=2, No such file or directory) [duplicate 7]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 215, spark2): java.io.IOException: Cannot run program “python2.7”: error=2, No such file or directory
结果:bin/pyspark –master yarn-client 测试正常
cd /data01/data/datadir_github/spark/
bin/spark-submit \
--master spark://spark1:7077 \
datadir_test/python/SimpleApp2.py
问题4: spark app挂起,提示
15/03/26 19:17:02 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
处理方法1:
SPARK_WORKER_CORES=4
SPARK_WORKER_MEMORY=1500m
SPARK_WORKER_INSTANCES=1
SPARK_WORKER_DIR=${SPARK_HOME}/worker/
\#SPARK_WORKER_OPTS
SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=600 -Dspark.worker.cleanup.appDataTtl=1200"
\#SPARK_EXECUTOR_MEMORY=300m
cd /data01/data/datadir_github/spark/
bin/spark-submit \
--master spark://spark1:7077 \
--total-executor-cores 2 \
--executor-memory 300m \
datadir_test/python/SimpleApp2.py
or
vi conf/spark-env.sh
SPARK_JAVA_OPTS +="-Dspark.deploy.defaultCores=2 -Dspark.cores.max=2"
结果: 无效,问题依旧
处理方法2
原因分析:
executor 启动时有报错:
15/03/26 20:13:13 WARN security.UserGroupInformation: PriviledgedActionException as:tsingfu (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
参考:Re: can’t submit my application on standalone spark cluster
该mail中提到可能是提交app 的jar 中 部分jar的版本(如netty)导致
测试验证是否与 spark/lib/spark-assembly*jar 中某个文件版本不同有关
on spark1
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark1:7077 \
--total-executor-cores 1 \
--executor-memory 300m \
lib/spark-examples-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar
cd /data01/data/datadir_github/spark/
cd assembly/target/scala-2.10/
mv spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar.org
mv /Users/Users_datadir_docker/app-libs/spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar .
on mac os x
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://spark1:7077 \
--total-executor-cores 1 \
--executor-memory 300m \
examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar
#on mac os x
cd /data01/data/datadir_github/spark/
cd assembly/target/scala-2.10/
mv spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar.from-cluster
mv spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar.org spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar
cp spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar /Users/Users_datadir_docker/app-libs/
#on spark-standalone
cd ~/app/spark/lib
mv spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar.org
ln -s /docker_vol01/app-libs/spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.1.0.jar .
how to run python script in spark job?
vi datadir_test/python/PythonPi.py
import sys
from random import random
from operator import add
from pyspark import SparkContext
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="PythonPi")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)
sc.stop()
cd /data01/data/datadir_github/spark/
bin/spark-submit \
--master local[2] \
datadir_test/python/PythonPi.py
bin/spark-submit \
--master spark://spark1:7077 \
datadir_test/python/PythonPi.py 1
结果:运行正常
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。