当前位置:   article > 正文

pyspark模型训练_pyspark训练模型

pyspark训练模型

1、pyspark启动

部署文档:pyspark部署

正常情况pyspark shell的启动成功后的界面:

  1. [admin@datacenter4 ~]$ pyspark
  2. Python 2.7.5 (default, Nov 16 2020, 22:23:17)
  3. [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2
  4. Type "help", "copyright", "credits" or "license" for more information.
  5. Welcome to
  6.       ____              __
  7.      / __/__  ___ _____/ /__
  8.     _\ \/ _ \/ _ `/ __/  '_/
  9.    /__ / .__/\_,_/_/ /_/\_\   version 2.3.2.3.1.4.0-315
  10.       /_/
  11. Using Python version 2.7.5 (default, Nov 16 2020 22:23:17)
  12. SparkSession available as 'spark'.

2、pyspark shell脚本测试

2.1 sc的生成方式

2.1.1 正常启动pyspark  shell

这时候不能用structure_1()中的方式生成sc,会报pyspark shell已创建了sc,不能重复生成,也就是这种情况下默认是可以直接适用sc变量的。 

  1. [admin@datacenter4 ~]$ pyspark
  2. Python 2.7.5 (default, Nov 16 2020, 22:23:17)
  3. [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2
  4. Type "help", "copyright", "credits" or "license" for more information.
  5. Welcome to
  6.       ____              __
  7.      / __/__  ___ _____/ /__
  8.     _\ \/ _ \/ _ `/ __/  '_/
  9.    /__ / .__/\_,_/_/ /_/\_\   version 2.3.2.3.1.4.0-315
  10.       /_/
  11. Using Python version 2.7.5 (default, Nov 16 2020 22:23:17)
  12. SparkSession available as 'spark'.
  13. >>>
  14. >>> import numpy
  15. >>> from pyspark.mllib.fpm import PrefixSpan
  16. >>> data = [[["a", "b"], ["c"]],[["a"], ["c", "b"], ["a", "b"]],[["a", "b"], ["e"]],
  17. [["f"]]]
  18. >>> # 这里的sc用的默认的
  19. >>> rdd = sc.parallelize(data)

备注:正常启动下,如果还想自定sc,可以用structure_2()中的方式生成sc 

2.1.2 非正常启动pyspark shell

pyspark shell启动还未显示“spark”标志就被强制终止,有时会仍然进入pyspark指令页面,如下: 

  1. [admin@datacenter3 site-packages]$ pyspark
  2. Python 2.7.5 (default, Aug  7 2019, 00:51:29)
  3. [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux2
  4. Type "help", "copyright", "credits" or "license" for more information.
  5. 22/03/30 10:16:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
  6. # 这里ctrl+c强制退出,可以看到最后仍出现">>>",说明仍然进入pyspark指令页面
  7. ^CTraceback (most recent call last):
  8.   File "/usr/hdp/current/spark2-client/python/pyspark/shell.py", line 45, in <module>
  9.     spark = SparkSession.builder\
  10.   File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 173, in getOrCreate
  11.     sc = SparkContext.getOrCreate(sparkConf)
  12.   File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 358, in getOrCreate
  13.     SparkContext(conf=conf or SparkConf())
  14.   File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 119, in __init__
  15.     conf, jsc, profiler_cls)
  16.   File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 181, in _do_init
  17.     self._jsc = jsc or self._initialize_context(self._conf._jconf)
  18.   File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 297, in _initialize_context
  19.     return self._jvm.JavaSparkContext(jconf)
  20.   File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1523, in __call__
  21.   File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
  22.   File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1152, in send_command
  23.   File "/usr/lib64/python2.7/socket.py", line 447, in readline
  24.     data = self._sock.recv(self._rbufsize)
  25. KeyboardInterrupt
  26. >>>
  27. >>> import numpy

这种情况下,默认的sc是还没有生成的,直接调用sc会报错,此时structure_1()、structure_2()都可以用来生成sc。  

 2.2 模型训练-代码测试

  1. # Autor chenfeng
  2. #!/usr/bin/env Python
  3. # coding=utf-8
  4. def structure_1():
  5. #计算文件中包含a和b的行数
  6. from pyspark import SparkConf,SparkContext
  7. conf = SparkConf().setMaster("local[*]").setAppName("My App")#配置环境信息
  8. sc = SparkContext(conf=conf)#创建指挥官
  9. # logFile = "file:///usr/local/spark/README.md" #注意本地file文件需要///
  10. return sc
  11. '''
  12. 以下代码来源 https://blog.csdn.net/u013719780/article/details/51822346
  13. '''
  14. def structure_2():
  15. from pyspark.sql import SparkSession, HiveContext
  16. spark = SparkSession.builder.master("local[*]")\
  17. .appName("sdl-test")\
  18. .config("spark.executor.memory", "3g")\
  19. .config("spark.driver.memory","1g")\
  20. .enableHiveSupport()\
  21. .getOrCreate()
  22. sc = spark.sparkContext
  23. sql = spark.sql
  24. hive_sql = HiveContext(sc)
  25. print(sc.master)
  26. return sc
  27. # !!!!!!!!spark-shell模式,这里可以不用写;集群命令提交模式,需要放开!!!!!!!!
  28. #sc = structure_1()
  29. # 这里如果是本地文件,需要以‘file///’开头,否则默认就是hdfs地址
  30. path = '/cf_sdl/hour_noheader.csv'
  31. raw_data = sc.textFile(path)
  32. num_data = raw_data.count()
  33. records = raw_data.map(lambda x: x.split(','))
  34. first = records.first()
  35. print('数据的第一行:', first)
  36. print('数据样本数:', num_data)
  37. # 因为变量records下文经常要用到,此处对其进行缓存:
  38. records.cache()
  39. # 为了将类型特征表示成二维形式,我们将特征值映射到二元向量中非0的位置。下面定义这样一个映射函数:
  40. def get_mapping(rdd, idx):
  41. return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()
  42. print('第三个特征的类别编码: %s '%get_mapping(records,2))
  43. mappings = [get_mapping(records, i) for i in range(2,10)] #对类型变量的列(第2~9列)应用映射函数
  44. print('类别特征打编码字典:',mappings)
  45. cat_len = sum(map(len,[i for i in mappings])) #类别特征的个数
  46. #cat_len = sum(map(len, mappings))
  47. #print(map(len,mappings)
  48. num_len = len(records.first()[11:15]) #数值特征的个数
  49. total_len = num_len+cat_len #所有特征的个数
  50. print('类别特征的个数: %d'% cat_len)
  51. print('数值特征的个数: %d'% num_len)
  52. print('所有特征的个数::%d' % total_len)
  53. # 1.1 为线性模型创建特征向量
  54. from pyspark.mllib.regression import LabeledPoint
  55. import numpy as np
  56. def extract_features(record):
  57. cat_vec = np.zeros(cat_len)
  58. step = 0
  59. for i,raw_feature in enumerate(record[2:9]):
  60. dict_code = mappings[i]
  61. index = dict_code[raw_feature]
  62. cat_vec[index+step] = 1
  63. step = step+len(dict_code)
  64. num_vec = np.array([float(raw_feature) for raw_feature in record[10:14]])
  65. return np.concatenate((cat_vec, num_vec))
  66. def extract_label(record):
  67. return float(record[-1])
  68. data = records.map(lambda point: LabeledPoint(extract_label(point),extract_features(point)))
  69. first_point = data.first()
  70. print('原始特征向量:' +str(first[2:]))
  71. print('标签:' + str(first_point.label))
  72. print('对类别特征进行独热编码之后的特征向量: \n' + str(first_point.features))
  73. print('对类别特征进行独热编码之后的特征向量长度:' + str(len(first_point.features)))
  74. # 1.2 为决策树创建特征向量
  75. def extract_features_dt(record):
  76. # return np.array(map(float, record[2:14])) # python2.x
  77. return np.array(record[2:14]) # python3.x
  78. data_dt = records.map(lambda point: LabeledPoint(extract_label(point), extract_features_dt(point)))
  79. first_point_dt = data_dt.first()
  80. print('决策树特征向量: '+str(first_point_dt.features))
  81. print('决策树特征向量长度: '+str(len(first_point_dt.features)))
  82. # 2 模型的训练和应用
  83. from pyspark.mllib.regression import LinearRegressionWithSGD
  84. from pyspark.mllib.tree import DecisionTree
  85. # help(LinearRegressionWithSGD.train)
  86. # 2.1 在bike sharing 数据上训练回归模型
  87. linear_model = LinearRegressionWithSGD.train(data, iterations=10, step=0.1, intercept =False)
  88. true_vs_predicted = data.map(lambda point:(point.label,linear_model.predict(point.features)))
  89. print('线性回归模型对前5个样本的预测值: '+ str(true_vs_predicted.take(5)))
  90. # 2.2 决策树
  91. dt_model = DecisionTree.trainRegressor(data_dt,{})
  92. preds = dt_model.predict(data_dt.map(lambda p: p.features))
  93. actual = data.map(lambda p:p.label)
  94. true_vs_predicted_dt = actual.zip(preds)
  95. print('决策树回归模型对前5个样本的预测值: '+str(true_vs_predicted_dt.take(5)))
  96. print('决策树模型的深度: ' + str(dt_model.depth()))
  97. print('决策树模型的叶子节点个数: '+str(dt_model.numNodes()))
  98. # 3 评估回归模型的性能
  99. # 3.1 均方误差和均方根误差
  100. def squared_error(actual, pred):
  101. return (pred-actual)**2
  102. # 3.2 平均绝对误差
  103. def abs_error(actual, pred):
  104. return np.abs(pred-actual)
  105. # 3.3 均方根对数误差
  106. def squared_log_error(pred, actual):
  107. return (np.log(pred+1)-np.log(actual+1))**2
  108. # 3.5 计算不同度量下的性能
  109. # 3.5.1 线性模型
  110. # mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean() #python2.x
  111. mse = true_vs_predicted.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
  112. mae = true_vs_predicted.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
  113. rmsle = np.sqrt(true_vs_predicted.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
  114. print('Linear Model - Mean Squared Error: %2.4f' % mse)
  115. print('Linear Model - Mean Absolute Error: %2.4f' % mae)
  116. print('Linear Model - Root Mean Squared Log Error: %2.4f' % rmsle)
  117. # 3.5.2 决策树
  118. mse_dt = true_vs_predicted_dt.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
  119. mae_dt = true_vs_predicted_dt.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
  120. rmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
  121. print('Decision Tree - Mean Squared Error: %2.4f' % mse_dt)
  122. print('Decision Tree - Mean Absolute Error: %2.4f' % mae_dt)
  123. print('Decision Tree - Root Mean Squared Log Error: %2.4f' %rmsle_dt)
  124. # 4 改进模型性能和参数调优
  125. import matplotlib
  126. from matplotlib.pyplot import hist
  127. import numpy as np
  128. import matplotlib.pyplot as plt
  129. targets = records.map(lambda r: float(r[-1])).collect()
  130. # hist(targets, bins=40, color='lightblue', normed=True) # python2.x
  131. hist(targets, bins=40, color='lightblue')
  132. fig = matplotlib.pyplot.gcf()
  133. fig.set_size_inches(12, 6)
  134. plt.show()
  135. # 绘制对目标变量进行对数变换后的分布直方图。
  136. log_targets = records.map(lambda r : np.log(float(r[-1]))).collect()
  137. # plt.hist(log_targets, bins = 40, color ='lightblue', normed =True)
  138. plt.hist(log_targets, bins = 40, color ='lightblue')
  139. fig = plt.gcf()
  140. fig.set_size_inches(12,6)
  141. plt.show()
  142. # 绘制对目标变量进行平方根变换后的分布直方图。
  143. sqrt_targets = records.map(lambda r: np.sqrt(float(r[-1]))).collect()
  144. plt.hist(sqrt_targets, bins=40, color='lightblue')
  145. # plt.hist(sqrt_targets, bins=40, color='lightblue', normed=True)
  146. fig = matplotlib.pyplot.gcf()
  147. fig.set_size_inches(12, 6)
  148. plt.show()
  149. # 4.1 目标变量变换对模型的影响
  150. # 线性回归
  151. data_log = data.map(lambda lp:LabeledPoint(np.log(lp.label),lp.features))
  152. model_log =LinearRegressionWithSGD.train(data_log, iterations=10, step=0.1)
  153. true_vs_predicted_log = data_log.map(lambda p:(np.exp(p.label),np.exp(model_log.predict(p.features))))
  154. #计算模型的MSE,MAE,RMSLE
  155. mse_log = true_vs_predicted_log.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
  156. mae_log = true_vs_predicted_log.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
  157. rmsle_log = np.sqrt(true_vs_predicted_log.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
  158. print('Linear Model —— Mean Squared Error:%2.4f'% mse_log)
  159. print('Linear Model —— Mean Absolue Error:%2.4f'% mae_log)
  160. print('Linear Model —— Root Mean Squared Log Error:%2.4f'% rmsle_log)
  161. print('Linear Model —— Non log-transformed predictions:\n'+ str(true_vs_predicted.take(3)))
  162. print('Linear Model —— Log-transformed predictions:\n'+ str(true_vs_predicted_log.take(3)))
  163. # 决策树
  164. data_dt_log = data_dt.map(lambda lp:LabeledPoint(np.log(lp.label), lp.features))
  165. dt_model_log = DecisionTree.trainRegressor(data_dt_log,{})
  166. preds_log = dt_model_log.predict(data_dt_log.map(lambda p:p.features))
  167. actual_log = data_dt_log.map(lambda p: p.label)
  168. true_vs_predicted_dt_log = actual_log.zip(preds_log).map(lambda t_p:(np.exp(t_p[0]), np.exp(t_p[1])))
  169. #计算模型的MSE,MAE,RMSLE
  170. mse_log_dt = true_vs_predicted_dt_log.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
  171. mae_log_dt = true_vs_predicted_dt_log.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
  172. rmsle_log_dt = np.sqrt(true_vs_predicted_dt_log.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
  173. print('Decision Tree —— Mean Squared Error:%2.4f'% mse_log_dt)
  174. print('Decision Tree —— Mean Absolue Error:%2.4f'% mae_log_dt)
  175. print('Decision Tree —— Root Mean Squared Log Error:%2.4f'% rmsle_log_dt)
  176. print('Decision Tree —— Non log-transformed predictions:\n'+ str(true_vs_predicted_dt.take(3)))
  177. print('Decision Tree —— Log-transformed predictions:\n'+str(true_vs_predicted_dt_log.take(3)))
  178. # 4.2 模型参数调优
  179. '''
  180. 到目前为止,本文讨论了同一个数据集上对MLlib中的回归模型进行训练和评估的基本概率。接下来,我们使用交叉验证方法来评估不同参数对模型性能的影响。
  181. 首先,我们将原始数据按比率划分为train,test数据集,原书当中pyspark版本还没有randomSplit这个函数,所以用如下的方式处理:
  182. '''
  183. # 4.2.1 线性回归
  184. data_with_idx = data.zipWithIndex().map(lambda k_v: (k_v[1],k_v[0]))
  185. test = data_with_idx.sample(False,0.2,42)
  186. train = data_with_idx.subtractByKey(test)
  187. train_test_data_split = data.randomSplit([0.8,0.2],123)
  188. train = train_test_data_split[0]
  189. test = train_test_data_split[1]
  190. print('测试集的样本数:',test.count())
  191. print('训练集的样本数:',train.count())
  192. # 决策树
  193. train_test_data_dt_split = data_dt.randomSplit([0.8,0.2],123)
  194. train_dt = train_test_data_dt_split[0]
  195. test_dt = train_test_data_dt_split[1]
  196. '''
  197. 前面已经得到了训练集和测试集,下面研究不同参数设置对模型性能的影响,首先需要为线性模型设置一个评估方法,
  198. 同时创建一个辅助函数,实现在不同参数设置下评估训练集和测试集上的性能。
  199. 本文依然使用Kaggle竞赛中的RMSLE作为评价指标。这样可以和在竞赛排行榜的成绩进行比较。
  200. '''
  201. # 评估函数定义如下:
  202. def evaluate(train, test, iterations, step, regParam, regType, intercept):
  203. model =LinearRegressionWithSGD.train(train, iterations, step, regParam=regParam,
  204. regType=regType,intercept=intercept)
  205. testLabel_vs_testPrediction = test.map(lambda point:(point.label, model.predict(point.features)))
  206. rmsle = np.sqrt(testLabel_vs_testPrediction.map(lambda t_p:squared_log_error(t_p[0],t_p[1])).mean())
  207. return rmsle
  208. # 4.2.1.1 迭代次数对模型的影响:
  209. params = [1, 5, 10, 20, 50, 100]
  210. metrics = [evaluate(train, test, param, 0.01, 0.0, 'l2', False) for param in params]
  211. print(params)
  212. print(metrics)
  213. # 绘制迭代次数与RMSLE的关系图:
  214. plt.plot(params, metrics)
  215. fig = matplotlib.pyplot.gcf()
  216. fig.set_size_inches(12, 6)
  217. plt.xscale('log')
  218. plt.show()
  219. # 4.2.1.2 评估step对模型的影响
  220. '''
  221. 从结果可以看出为什么不使用默认步长(默认1.0)来训练线性模型,因为其得到的RMSLE结果为nan。这说明SGD模型收敛到了最差的局部最优解。这种情况在步长较大的时候容易出现,原因是算法收敛太快导致不能得到最优解。
  222. 另外,小的步长与相对较小的迭代次数(比如上面的10次)对应的训练模型性能一般较差,而较小的步长与较大的迭代次数通常可以收敛到较好的结果。
  223. '''
  224. params=[0.01,0.025,0.05,0.1,0.5,1.0]
  225. metrics =[evaluate(train, test,10,param,0.0,'l2',False)for param in params]
  226. for i in range(len(params)):
  227. print('the rmsle:%f when step :%f'%(metrics[i],params[i]))
  228. #绘制步长与RMSLE的关系图:
  229. plt.plot(params, metrics)
  230. fig = matplotlib.pyplot.gcf()
  231. fig.set_size_inches(12, 6)
  232. plt.xscale('log')
  233. plt.xlabel('step')
  234. plt.ylabel('RMSLE')
  235. plt.show()
  236. # 4.2.1.3 不同正则化系数对模型的影响
  237. '''
  238. 我们知道随着正则化的提高,训练集的预测性能会下降,因为模型不能很好的拟合数据。
  239. 但是我们希望设置合适的正则化参数,能够在测试集上达到最好的性能,最终得到一个泛化能力最优的模型。
  240. '''
  241. # (1) 先看L2正则化系数对模型的影响
  242. params=[0.0,0.01,0.1,1.0,5.0,10.0,20.0]
  243. metrics =[evaluate(train, test,10,0.1, param,'l2',False) for param in params]
  244. for i in range(len(params)):
  245. print('the rmsle:%f when regParam :%f'%(metrics[i],params[i]))
  246. #绘制L2正则化系数与RMSLE的关系图:
  247. plt.plot(params, metrics)
  248. fig = matplotlib.pyplot.gcf()
  249. fig.set_size_inches(12, 8)
  250. plt.xscale('log')
  251. plt.xlabel('regParam')
  252. plt.ylabel('RMSLE')
  253. plt.show()
  254. # (2) 再看L1正则化系数对模型的影响
  255. params=[0.0,0.01,0.1,1.0,10.0,100.0,1000.0]
  256. metrics =[evaluate(train, test,10,0.1, param,'l1',False) for param in params]
  257. for i in range(len(params)):
  258. print('the rmsle:%f when regParam :%f'%(metrics[i],params[i]))
  259. #绘制L2正则化系数与RMSLE的关系图:
  260. plt.plot(params, metrics)
  261. fig = matplotlib.pyplot.gcf()
  262. fig.set_size_inches(12, 8)
  263. plt.xscale('log')
  264. plt.xlabel('regParam')
  265. plt.ylabel('RMSLE')
  266. plt.show()
  267. '''
  268. 从上图可以看到,当使用一个较大的正则化参数时,RMSLE性能急剧下降。
  269. 想必大家都知道,使用L1正则化可以得到稀疏的权重向量,我们看看刚刚得到的L1正则化模型是否真是如此呢?
  270. 从下面的结果可以看到,与我们预料的基本一致。随着L1正则化参数越来越大,模型的权重向量中0的个数越来越多。
  271. '''
  272. model_l1 = LinearRegressionWithSGD.train(train,10,0.1,regParam=1.0, regType='l1', intercept=False)
  273. model_l2 = LinearRegressionWithSGD.train(train,10,0.1,regParam=1.0, regType='l2', intercept=False)
  274. model_l1_10 = LinearRegressionWithSGD.train(train,10,0.1,regParam=10.0, regType='l1', intercept=False)
  275. model_l2_10 = LinearRegressionWithSGD.train(train,10,0.1,regParam=10.0, regType='l2', intercept=False)
  276. model_l1_100 = LinearRegressionWithSGD.train(train,10,0.1,regParam=100.0, regType='l1', intercept=False)
  277. model_l2_100 = LinearRegressionWithSGD.train(train,10,0.1,regParam=100.0, regType='l2', intercept=False)
  278. # model_l1.weights.array 把DenseVector转array
  279. print('L2 (1.0) number of zero weights:'+ str(sum(model_l1.weights.array == 0))) # 这里可以正常运行
  280. print('L2 (1.0) number of zero weights:'+ str(sum(model_l2.weights.array == 0)))
  281. print('L1 (10.0) number of zeros weights:'+ str(sum(model_l1_10.weights.array == 0)))
  282. print('L2 (10.0) number of zeros weights:'+ str(sum(model_l2_10.weights.array == 0)))
  283. print('L1 (100.0) number of zeros weights:'+ str(sum(model_l1_100.weights.array == 0)))
  284. print('L2 (100.0) number of zeros weights:'+ str(sum(model_l2_100.weights.array == 0)))
  285. # 4.2.1.4 截距对模型的影响
  286. params=[False, True]
  287. metrics =[evaluate(train, test, 10, 0.1, 1.0,'l2', param) for param in params]
  288. for i in range(len(params)):
  289. print('the rmsle:%f when intercept:%f'%(metrics[i],params[i]))
  290. #绘制L2正则化系数与RMSLE的关系图:
  291. plt.bar(params, metrics, color='r')
  292. fig = matplotlib.pyplot.gcf()
  293. fig.set_size_inches(12, 8)
  294. plt.xlabel('intercept')
  295. plt.ylabel('RMSLE')
  296. plt.show()
  297. # 4.2.2 决策树
  298. def evaluate_dt(train, test, maxDepth, maxBins):
  299. model =DecisionTree.trainRegressor(train,{},impurity='variance', maxDepth=maxDepth, maxBins=maxBins)
  300. predictions = model.predict(test.map(lambda point: point.features))
  301. actual = test.map(lambda point: point.label)
  302. actual_vs_predictions = actual.zip(predictions)
  303. rmsle = np.sqrt(actual_vs_predictions.map(lambda t_p: squared_log_error(t_p[0],t_p[1])).mean())
  304. return rmsle
  305. # 4.2.2.1 树的不同最大深度对性能影响:
  306. '''
  307. 我们通常希望用更复杂(更深)的决策树提升模型的性能。而较小的树深度类似正则化形式,
  308. 如线性模型的L2正则化和L1正则化,存在一个最优的树深度能在测试集上获得最优的性能。
  309. '''
  310. params=[1,2,3,4,5,10,20]
  311. metrics =[evaluate_dt(train_dt, test_dt, param,32) for param in params]
  312. for i in range(len(params)):
  313. print('the rmsle:%f when maxDepth :%d'%(metrics[i],params[i]))
  314. #绘制树的最大深度与RMSLE的关系图:
  315. plt.plot(params, metrics)
  316. fig = matplotlib.pyplot.gcf()
  317. fig.set_size_inches(12, 8)
  318. plt.xlabel('maxDepth')
  319. plt.ylabel('RMSLE')
  320. plt.show()
  321. # 4.2.2.2 最大划分数(每个节点分支时最大bin数)对模型的影响
  322. '''
  323. 最后,我们来看看划分数对决策树性能的影响。和树的深度一样,更多的划分数会使模型变得更加复杂,并且有助于提升特征维度较大的模型性能。划分数到一定程度之后,对性能的提升帮助不大。
  324. 实际上, 由于过拟合的原因会导致测试集的性能变差。
  325. 从结果可以看出,最大划分数会影响模型的性能,但是当最大划分数达到30之后,模型性能基本上没有获得提升。最优的最大划分数是在30到35之间。
  326. '''
  327. params=[2,4,8,16,32,64,100]
  328. metrics =[evaluate_dt(train_dt, test_dt,5, param) for param in params]
  329. for i in range(len(params)):
  330. print('the rmsle:%f when maxBins :%d'%(metrics[i],params[i]))
  331. #绘制树的最大划分数与RMSLE的关系图:
  332. plt.plot(params, metrics)
  333. fig = matplotlib.pyplot.gcf()
  334. fig.set_size_inches(12, 8)
  335. plt.xlabel('maxDepth')
  336. plt.ylabel('RMSLE')
  337. plt.show()

3、集群提交python文件

把2.2中的代码写在spark-test.py文件,并上传到spark集群中的某台上(目录:/root/cf_temp/spark_test.py)

在这台机器上执行:(每个参数的含义可以看“大数据相关”收藏夹里有关spark的文档)

  1. nohup /usr/hdp/current/spark2-client/bin/spark-submit --master yarn \
  2. --num-executors 6 \ # 集群所有机器总共的executor数量
  3. --executor-memory 1g \ # 每个executor的内存
  4. --executor-cores 2 \ # 每个executor执行task的最大并行度
  5. --driver-memory 1G \ # 提交当前application的driver占有的内存
  6. --driver-cores 1G \ # 提交当前application的driver占有的cpu cores个数
  7. /root/cf_temp/spark_test.py &

备注:

在spark平台上执行Python算法涉及到Python程序的改写,其中import部分需要额外注意。如果我们在执行某个 test.py 程序时需要调用另外一个 common.py,需要在 test.py 内部添加 import common ,而此时的 import common 不能放在程序头部,需要放在context之后。同时在执行程序时需要--py-files 参数指定引用模块的位置。

  1. nohup /usr/hdp/current/spark2-client/bin/spark-submit --master yarn \
  2. --num-executors 6 \
  3. --executor-memory 1g \
  4. --executor-cores 2 \
  5. --driver-memory 1G \
  6. --driver-cores 1G \
  7. --py-files /xx/xx/common.py \
  8. /xx/xx/test.py &


 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/955385
推荐阅读
相关标签
  

闽ICP备14008679号