赞
踩
正常情况pyspark shell的启动成功后的界面:
- [admin@datacenter4 ~]$ pyspark
- Python 2.7.5 (default, Nov 16 2020, 22:23:17)
- [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2
- Type "help", "copyright", "credits" or "license" for more information.
- Welcome to
- ____ __
- / __/__ ___ _____/ /__
- _\ \/ _ \/ _ `/ __/ '_/
- /__ / .__/\_,_/_/ /_/\_\ version 2.3.2.3.1.4.0-315
- /_/
- Using Python version 2.7.5 (default, Nov 16 2020 22:23:17)
- SparkSession available as 'spark'.
2.1.1 正常启动pyspark shell
这时候不能用structure_1()中的方式生成sc,会报pyspark shell已创建了sc,不能重复生成,也就是这种情况下默认是可以直接适用sc变量的。
- [admin@datacenter4 ~]$ pyspark
- Python 2.7.5 (default, Nov 16 2020, 22:23:17)
- [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2
- Type "help", "copyright", "credits" or "license" for more information.
- Welcome to
- ____ __
- / __/__ ___ _____/ /__
- _\ \/ _ \/ _ `/ __/ '_/
- /__ / .__/\_,_/_/ /_/\_\ version 2.3.2.3.1.4.0-315
- /_/
- Using Python version 2.7.5 (default, Nov 16 2020 22:23:17)
- SparkSession available as 'spark'.
- >>>
- >>> import numpy
- >>> from pyspark.mllib.fpm import PrefixSpan
- >>> data = [[["a", "b"], ["c"]],[["a"], ["c", "b"], ["a", "b"]],[["a", "b"], ["e"]],
- [["f"]]]
- >>> # 这里的sc用的默认的
- >>> rdd = sc.parallelize(data)
备注:正常启动下,如果还想自定sc,可以用structure_2()中的方式生成sc
2.1.2 非正常启动pyspark shell
pyspark shell启动还未显示“spark”标志就被强制终止,有时会仍然进入pyspark指令页面,如下:
- [admin@datacenter3 site-packages]$ pyspark
- Python 2.7.5 (default, Aug 7 2019, 00:51:29)
- [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux2
- Type "help", "copyright", "credits" or "license" for more information.
- 22/03/30 10:16:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
-
- # 这里ctrl+c强制退出,可以看到最后仍出现">>>",说明仍然进入pyspark指令页面
- ^CTraceback (most recent call last):
- File "/usr/hdp/current/spark2-client/python/pyspark/shell.py", line 45, in <module>
- spark = SparkSession.builder\
- File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 173, in getOrCreate
- sc = SparkContext.getOrCreate(sparkConf)
- File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 358, in getOrCreate
- SparkContext(conf=conf or SparkConf())
- File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 119, in __init__
- conf, jsc, profiler_cls)
- File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 181, in _do_init
- self._jsc = jsc or self._initialize_context(self._conf._jconf)
- File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 297, in _initialize_context
- return self._jvm.JavaSparkContext(jconf)
- File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1523, in __call__
- File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
- File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1152, in send_command
- File "/usr/lib64/python2.7/socket.py", line 447, in readline
- data = self._sock.recv(self._rbufsize)
- KeyboardInterrupt
- >>>
- >>> import numpy
这种情况下,默认的sc是还没有生成的,直接调用sc会报错,此时structure_1()、structure_2()都可以用来生成sc。
- # Autor chenfeng
- #!/usr/bin/env Python
- # coding=utf-8
-
- def structure_1():
- #计算文件中包含a和b的行数
- from pyspark import SparkConf,SparkContext
- conf = SparkConf().setMaster("local[*]").setAppName("My App")#配置环境信息
- sc = SparkContext(conf=conf)#创建指挥官
- # logFile = "file:///usr/local/spark/README.md" #注意本地file文件需要///
- return sc
-
-
- '''
- 以下代码来源 https://blog.csdn.net/u013719780/article/details/51822346
- '''
- def structure_2():
- from pyspark.sql import SparkSession, HiveContext
- spark = SparkSession.builder.master("local[*]")\
- .appName("sdl-test")\
- .config("spark.executor.memory", "3g")\
- .config("spark.driver.memory","1g")\
- .enableHiveSupport()\
- .getOrCreate()
-
- sc = spark.sparkContext
- sql = spark.sql
- hive_sql = HiveContext(sc)
- print(sc.master)
- return sc
-
- # !!!!!!!!spark-shell模式,这里可以不用写;集群命令提交模式,需要放开!!!!!!!!
- #sc = structure_1()
-
- # 这里如果是本地文件,需要以‘file///’开头,否则默认就是hdfs地址
- path = '/cf_sdl/hour_noheader.csv'
- raw_data = sc.textFile(path)
- num_data = raw_data.count()
- records = raw_data.map(lambda x: x.split(','))
- first = records.first()
- print('数据的第一行:', first)
- print('数据样本数:', num_data)
-
- # 因为变量records下文经常要用到,此处对其进行缓存:
- records.cache()
-
- # 为了将类型特征表示成二维形式,我们将特征值映射到二元向量中非0的位置。下面定义这样一个映射函数:
- def get_mapping(rdd, idx):
- return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()
-
-
- print('第三个特征的类别编码: %s '%get_mapping(records,2))
-
- mappings = [get_mapping(records, i) for i in range(2,10)] #对类型变量的列(第2~9列)应用映射函数
- print('类别特征打编码字典:',mappings)
- cat_len = sum(map(len,[i for i in mappings])) #类别特征的个数
- #cat_len = sum(map(len, mappings))
- #print(map(len,mappings)
-
- num_len = len(records.first()[11:15]) #数值特征的个数
- total_len = num_len+cat_len #所有特征的个数
- print('类别特征的个数: %d'% cat_len)
- print('数值特征的个数: %d'% num_len)
- print('所有特征的个数::%d' % total_len)
-
-
- # 1.1 为线性模型创建特征向量
- from pyspark.mllib.regression import LabeledPoint
- import numpy as np
-
- def extract_features(record):
- cat_vec = np.zeros(cat_len)
- step = 0
- for i,raw_feature in enumerate(record[2:9]):
- dict_code = mappings[i]
- index = dict_code[raw_feature]
- cat_vec[index+step] = 1
- step = step+len(dict_code)
- num_vec = np.array([float(raw_feature) for raw_feature in record[10:14]])
- return np.concatenate((cat_vec, num_vec))
-
- def extract_label(record):
- return float(record[-1])
-
-
- data = records.map(lambda point: LabeledPoint(extract_label(point),extract_features(point)))
- first_point = data.first()
-
- print('原始特征向量:' +str(first[2:]))
- print('标签:' + str(first_point.label))
- print('对类别特征进行独热编码之后的特征向量: \n' + str(first_point.features))
- print('对类别特征进行独热编码之后的特征向量长度:' + str(len(first_point.features)))
-
-
- # 1.2 为决策树创建特征向量
- def extract_features_dt(record):
- # return np.array(map(float, record[2:14])) # python2.x
- return np.array(record[2:14]) # python3.x
-
- data_dt = records.map(lambda point: LabeledPoint(extract_label(point), extract_features_dt(point)))
- first_point_dt = data_dt.first()
-
- print('决策树特征向量: '+str(first_point_dt.features))
- print('决策树特征向量长度: '+str(len(first_point_dt.features)))
-
- # 2 模型的训练和应用
- from pyspark.mllib.regression import LinearRegressionWithSGD
- from pyspark.mllib.tree import DecisionTree
- # help(LinearRegressionWithSGD.train)
-
- # 2.1 在bike sharing 数据上训练回归模型
- linear_model = LinearRegressionWithSGD.train(data, iterations=10, step=0.1, intercept =False)
- true_vs_predicted = data.map(lambda point:(point.label,linear_model.predict(point.features)))
- print('线性回归模型对前5个样本的预测值: '+ str(true_vs_predicted.take(5)))
-
- # 2.2 决策树
- dt_model = DecisionTree.trainRegressor(data_dt,{})
- preds = dt_model.predict(data_dt.map(lambda p: p.features))
- actual = data.map(lambda p:p.label)
- true_vs_predicted_dt = actual.zip(preds)
- print('决策树回归模型对前5个样本的预测值: '+str(true_vs_predicted_dt.take(5)))
- print('决策树模型的深度: ' + str(dt_model.depth()))
- print('决策树模型的叶子节点个数: '+str(dt_model.numNodes()))
-
-
- # 3 评估回归模型的性能
- # 3.1 均方误差和均方根误差
- def squared_error(actual, pred):
- return (pred-actual)**2
-
- # 3.2 平均绝对误差
- def abs_error(actual, pred):
- return np.abs(pred-actual)
-
- # 3.3 均方根对数误差
- def squared_log_error(pred, actual):
- return (np.log(pred+1)-np.log(actual+1))**2
-
- # 3.5 计算不同度量下的性能
- # 3.5.1 线性模型
- # mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean() #python2.x
- mse = true_vs_predicted.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
- mae = true_vs_predicted.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
- rmsle = np.sqrt(true_vs_predicted.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
- print('Linear Model - Mean Squared Error: %2.4f' % mse)
- print('Linear Model - Mean Absolute Error: %2.4f' % mae)
- print('Linear Model - Root Mean Squared Log Error: %2.4f' % rmsle)
-
-
- # 3.5.2 决策树
- mse_dt = true_vs_predicted_dt.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
- mae_dt = true_vs_predicted_dt.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
- rmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
- print('Decision Tree - Mean Squared Error: %2.4f' % mse_dt)
- print('Decision Tree - Mean Absolute Error: %2.4f' % mae_dt)
- print('Decision Tree - Root Mean Squared Log Error: %2.4f' %rmsle_dt)
-
- # 4 改进模型性能和参数调优
- import matplotlib
- from matplotlib.pyplot import hist
- import numpy as np
- import matplotlib.pyplot as plt
-
- targets = records.map(lambda r: float(r[-1])).collect()
- # hist(targets, bins=40, color='lightblue', normed=True) # python2.x
- hist(targets, bins=40, color='lightblue')
- fig = matplotlib.pyplot.gcf()
- fig.set_size_inches(12, 6)
- plt.show()
-
- # 绘制对目标变量进行对数变换后的分布直方图。
- log_targets = records.map(lambda r : np.log(float(r[-1]))).collect()
- # plt.hist(log_targets, bins = 40, color ='lightblue', normed =True)
- plt.hist(log_targets, bins = 40, color ='lightblue')
- fig = plt.gcf()
- fig.set_size_inches(12,6)
- plt.show()
-
- # 绘制对目标变量进行平方根变换后的分布直方图。
- sqrt_targets = records.map(lambda r: np.sqrt(float(r[-1]))).collect()
- plt.hist(sqrt_targets, bins=40, color='lightblue')
- # plt.hist(sqrt_targets, bins=40, color='lightblue', normed=True)
- fig = matplotlib.pyplot.gcf()
- fig.set_size_inches(12, 6)
- plt.show()
-
-
- # 4.1 目标变量变换对模型的影响
- # 线性回归
- data_log = data.map(lambda lp:LabeledPoint(np.log(lp.label),lp.features))
- model_log =LinearRegressionWithSGD.train(data_log, iterations=10, step=0.1)
- true_vs_predicted_log = data_log.map(lambda p:(np.exp(p.label),np.exp(model_log.predict(p.features))))
-
- #计算模型的MSE,MAE,RMSLE
- mse_log = true_vs_predicted_log.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
- mae_log = true_vs_predicted_log.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
- rmsle_log = np.sqrt(true_vs_predicted_log.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
-
- print('Linear Model —— Mean Squared Error:%2.4f'% mse_log)
- print('Linear Model —— Mean Absolue Error:%2.4f'% mae_log)
- print('Linear Model —— Root Mean Squared Log Error:%2.4f'% rmsle_log)
-
- print('Linear Model —— Non log-transformed predictions:\n'+ str(true_vs_predicted.take(3)))
- print('Linear Model —— Log-transformed predictions:\n'+ str(true_vs_predicted_log.take(3)))
-
-
- # 决策树
- data_dt_log = data_dt.map(lambda lp:LabeledPoint(np.log(lp.label), lp.features))
- dt_model_log = DecisionTree.trainRegressor(data_dt_log,{})
- preds_log = dt_model_log.predict(data_dt_log.map(lambda p:p.features))
- actual_log = data_dt_log.map(lambda p: p.label)
- true_vs_predicted_dt_log = actual_log.zip(preds_log).map(lambda t_p:(np.exp(t_p[0]), np.exp(t_p[1])))
-
- #计算模型的MSE,MAE,RMSLE
- mse_log_dt = true_vs_predicted_dt_log.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
- mae_log_dt = true_vs_predicted_dt_log.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
- rmsle_log_dt = np.sqrt(true_vs_predicted_dt_log.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
-
- print('Decision Tree —— Mean Squared Error:%2.4f'% mse_log_dt)
- print('Decision Tree —— Mean Absolue Error:%2.4f'% mae_log_dt)
- print('Decision Tree —— Root Mean Squared Log Error:%2.4f'% rmsle_log_dt)
-
- print('Decision Tree —— Non log-transformed predictions:\n'+ str(true_vs_predicted_dt.take(3)))
- print('Decision Tree —— Log-transformed predictions:\n'+str(true_vs_predicted_dt_log.take(3)))
-
-
- # 4.2 模型参数调优
- '''
- 到目前为止,本文讨论了同一个数据集上对MLlib中的回归模型进行训练和评估的基本概率。接下来,我们使用交叉验证方法来评估不同参数对模型性能的影响。
- 首先,我们将原始数据按比率划分为train,test数据集,原书当中pyspark版本还没有randomSplit这个函数,所以用如下的方式处理:
- '''
- # 4.2.1 线性回归
- data_with_idx = data.zipWithIndex().map(lambda k_v: (k_v[1],k_v[0]))
- test = data_with_idx.sample(False,0.2,42)
- train = data_with_idx.subtractByKey(test)
-
- train_test_data_split = data.randomSplit([0.8,0.2],123)
- train = train_test_data_split[0]
- test = train_test_data_split[1]
- print('测试集的样本数:',test.count())
- print('训练集的样本数:',train.count())
-
- # 决策树
- train_test_data_dt_split = data_dt.randomSplit([0.8,0.2],123)
- train_dt = train_test_data_dt_split[0]
- test_dt = train_test_data_dt_split[1]
-
- '''
- 前面已经得到了训练集和测试集,下面研究不同参数设置对模型性能的影响,首先需要为线性模型设置一个评估方法,
- 同时创建一个辅助函数,实现在不同参数设置下评估训练集和测试集上的性能。
- 本文依然使用Kaggle竞赛中的RMSLE作为评价指标。这样可以和在竞赛排行榜的成绩进行比较。
- '''
- # 评估函数定义如下:
- def evaluate(train, test, iterations, step, regParam, regType, intercept):
- model =LinearRegressionWithSGD.train(train, iterations, step, regParam=regParam,
- regType=regType,intercept=intercept)
- testLabel_vs_testPrediction = test.map(lambda point:(point.label, model.predict(point.features)))
- rmsle = np.sqrt(testLabel_vs_testPrediction.map(lambda t_p:squared_log_error(t_p[0],t_p[1])).mean())
- return rmsle
-
- # 4.2.1.1 迭代次数对模型的影响:
- params = [1, 5, 10, 20, 50, 100]
- metrics = [evaluate(train, test, param, 0.01, 0.0, 'l2', False) for param in params]
- print(params)
- print(metrics)
-
- # 绘制迭代次数与RMSLE的关系图:
- plt.plot(params, metrics)
- fig = matplotlib.pyplot.gcf()
- fig.set_size_inches(12, 6)
- plt.xscale('log')
- plt.show()
-
-
- # 4.2.1.2 评估step对模型的影响
- '''
- 从结果可以看出为什么不使用默认步长(默认1.0)来训练线性模型,因为其得到的RMSLE结果为nan。这说明SGD模型收敛到了最差的局部最优解。这种情况在步长较大的时候容易出现,原因是算法收敛太快导致不能得到最优解。
- 另外,小的步长与相对较小的迭代次数(比如上面的10次)对应的训练模型性能一般较差,而较小的步长与较大的迭代次数通常可以收敛到较好的结果。
- '''
- params=[0.01,0.025,0.05,0.1,0.5,1.0]
- metrics =[evaluate(train, test,10,param,0.0,'l2',False)for param in params]
- for i in range(len(params)):
- print('the rmsle:%f when step :%f'%(metrics[i],params[i]))
-
- #绘制步长与RMSLE的关系图:
- plt.plot(params, metrics)
- fig = matplotlib.pyplot.gcf()
- fig.set_size_inches(12, 6)
- plt.xscale('log')
- plt.xlabel('step')
- plt.ylabel('RMSLE')
- plt.show()
-
- # 4.2.1.3 不同正则化系数对模型的影响
- '''
- 我们知道随着正则化的提高,训练集的预测性能会下降,因为模型不能很好的拟合数据。
- 但是我们希望设置合适的正则化参数,能够在测试集上达到最好的性能,最终得到一个泛化能力最优的模型。
- '''
- # (1) 先看L2正则化系数对模型的影响
- params=[0.0,0.01,0.1,1.0,5.0,10.0,20.0]
- metrics =[evaluate(train, test,10,0.1, param,'l2',False) for param in params]
- for i in range(len(params)):
- print('the rmsle:%f when regParam :%f'%(metrics[i],params[i]))
-
- #绘制L2正则化系数与RMSLE的关系图:
- plt.plot(params, metrics)
- fig = matplotlib.pyplot.gcf()
- fig.set_size_inches(12, 8)
- plt.xscale('log')
- plt.xlabel('regParam')
- plt.ylabel('RMSLE')
- plt.show()
-
- # (2) 再看L1正则化系数对模型的影响
- params=[0.0,0.01,0.1,1.0,10.0,100.0,1000.0]
- metrics =[evaluate(train, test,10,0.1, param,'l1',False) for param in params]
- for i in range(len(params)):
- print('the rmsle:%f when regParam :%f'%(metrics[i],params[i]))
-
- #绘制L2正则化系数与RMSLE的关系图:
- plt.plot(params, metrics)
- fig = matplotlib.pyplot.gcf()
- fig.set_size_inches(12, 8)
- plt.xscale('log')
- plt.xlabel('regParam')
- plt.ylabel('RMSLE')
- plt.show()
-
- '''
- 从上图可以看到,当使用一个较大的正则化参数时,RMSLE性能急剧下降。
- 想必大家都知道,使用L1正则化可以得到稀疏的权重向量,我们看看刚刚得到的L1正则化模型是否真是如此呢?
- 从下面的结果可以看到,与我们预料的基本一致。随着L1正则化参数越来越大,模型的权重向量中0的个数越来越多。
- '''
- model_l1 = LinearRegressionWithSGD.train(train,10,0.1,regParam=1.0, regType='l1', intercept=False)
- model_l2 = LinearRegressionWithSGD.train(train,10,0.1,regParam=1.0, regType='l2', intercept=False)
- model_l1_10 = LinearRegressionWithSGD.train(train,10,0.1,regParam=10.0, regType='l1', intercept=False)
- model_l2_10 = LinearRegressionWithSGD.train(train,10,0.1,regParam=10.0, regType='l2', intercept=False)
- model_l1_100 = LinearRegressionWithSGD.train(train,10,0.1,regParam=100.0, regType='l1', intercept=False)
- model_l2_100 = LinearRegressionWithSGD.train(train,10,0.1,regParam=100.0, regType='l2', intercept=False)
-
-
- # model_l1.weights.array 把DenseVector转array
- print('L2 (1.0) number of zero weights:'+ str(sum(model_l1.weights.array == 0))) # 这里可以正常运行
- print('L2 (1.0) number of zero weights:'+ str(sum(model_l2.weights.array == 0)))
- print('L1 (10.0) number of zeros weights:'+ str(sum(model_l1_10.weights.array == 0)))
- print('L2 (10.0) number of zeros weights:'+ str(sum(model_l2_10.weights.array == 0)))
- print('L1 (100.0) number of zeros weights:'+ str(sum(model_l1_100.weights.array == 0)))
- print('L2 (100.0) number of zeros weights:'+ str(sum(model_l2_100.weights.array == 0)))
-
-
- # 4.2.1.4 截距对模型的影响
- params=[False, True]
- metrics =[evaluate(train, test, 10, 0.1, 1.0,'l2', param) for param in params]
- for i in range(len(params)):
- print('the rmsle:%f when intercept:%f'%(metrics[i],params[i]))
-
- #绘制L2正则化系数与RMSLE的关系图:
- plt.bar(params, metrics, color='r')
- fig = matplotlib.pyplot.gcf()
- fig.set_size_inches(12, 8)
- plt.xlabel('intercept')
- plt.ylabel('RMSLE')
- plt.show()
-
- # 4.2.2 决策树
- def evaluate_dt(train, test, maxDepth, maxBins):
- model =DecisionTree.trainRegressor(train,{},impurity='variance', maxDepth=maxDepth, maxBins=maxBins)
- predictions = model.predict(test.map(lambda point: point.features))
- actual = test.map(lambda point: point.label)
- actual_vs_predictions = actual.zip(predictions)
- rmsle = np.sqrt(actual_vs_predictions.map(lambda t_p: squared_log_error(t_p[0],t_p[1])).mean())
- return rmsle
-
-
- # 4.2.2.1 树的不同最大深度对性能影响:
- '''
- 我们通常希望用更复杂(更深)的决策树提升模型的性能。而较小的树深度类似正则化形式,
- 如线性模型的L2正则化和L1正则化,存在一个最优的树深度能在测试集上获得最优的性能。
- '''
- params=[1,2,3,4,5,10,20]
- metrics =[evaluate_dt(train_dt, test_dt, param,32) for param in params]
- for i in range(len(params)):
- print('the rmsle:%f when maxDepth :%d'%(metrics[i],params[i]))
-
- #绘制树的最大深度与RMSLE的关系图:
- plt.plot(params, metrics)
- fig = matplotlib.pyplot.gcf()
- fig.set_size_inches(12, 8)
- plt.xlabel('maxDepth')
- plt.ylabel('RMSLE')
- plt.show()
-
-
- # 4.2.2.2 最大划分数(每个节点分支时最大bin数)对模型的影响
- '''
- 最后,我们来看看划分数对决策树性能的影响。和树的深度一样,更多的划分数会使模型变得更加复杂,并且有助于提升特征维度较大的模型性能。划分数到一定程度之后,对性能的提升帮助不大。
- 实际上, 由于过拟合的原因会导致测试集的性能变差。
- 从结果可以看出,最大划分数会影响模型的性能,但是当最大划分数达到30之后,模型性能基本上没有获得提升。最优的最大划分数是在30到35之间。
- '''
- params=[2,4,8,16,32,64,100]
- metrics =[evaluate_dt(train_dt, test_dt,5, param) for param in params]
- for i in range(len(params)):
- print('the rmsle:%f when maxBins :%d'%(metrics[i],params[i]))
-
- #绘制树的最大划分数与RMSLE的关系图:
- plt.plot(params, metrics)
- fig = matplotlib.pyplot.gcf()
- fig.set_size_inches(12, 8)
- plt.xlabel('maxDepth')
- plt.ylabel('RMSLE')
- plt.show()
把2.2中的代码写在spark-test.py文件,并上传到spark集群中的某台上(目录:/root/cf_temp/spark_test.py)
在这台机器上执行:(每个参数的含义可以看“大数据相关”收藏夹里有关spark的文档)
- nohup /usr/hdp/current/spark2-client/bin/spark-submit --master yarn \
- --num-executors 6 \ # 集群所有机器总共的executor数量
- --executor-memory 1g \ # 每个executor的内存
- --executor-cores 2 \ # 每个executor执行task的最大并行度
- --driver-memory 1G \ # 提交当前application的driver占有的内存
- --driver-cores 1G \ # 提交当前application的driver占有的cpu cores个数
- /root/cf_temp/spark_test.py &
备注:
在spark平台上执行Python算法涉及到Python程序的改写,其中import部分需要额外注意。如果我们在执行某个 test.py 程序时需要调用另外一个 common.py,需要在 test.py 内部添加 import common ,而此时的 import common 不能放在程序头部,需要放在context之后。同时在执行程序时需要--py-files 参数指定引用模块的位置。
- nohup /usr/hdp/current/spark2-client/bin/spark-submit --master yarn \
- --num-executors 6 \
- --executor-memory 1g \
- --executor-cores 2 \
- --driver-memory 1G \
- --driver-cores 1G \
- --py-files /xx/xx/common.py \
- /xx/xx/test.py &
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。