当前位置:   article > 正文

Python3:Python+spark编程实战 总结_spark与pytorch的集成案例

spark与pytorch的集成案例

 

win10安装spark:https://blog.csdn.net/songhaifengshuaige/article/details/79480491

不宜妄自菲薄,引喻失义。

0、前提

0.1 配置

可参考:
windows上配置 Python+spark开发环境

0.2 有关spark

说明:
安装注意版本

一、实例分析

1.1 数据 student.txt

  1. yang 85 90 30
  2. wang 20 60 50
  3. zhang 90 90 100
  4. zhang 90 90 100
  5. li 100 54 0
  6. li 100 54 0
  7. yanf 0 0 0

1.2 代码

  1. #r = reduce(lambda x, y: x+y, [4,4,5,5]) # 使用 lambda 匿名函数
  2. from functools import reduce
  3. def add(x, y) : # 两数相加
  4. return x + y
  5. print(reduce(add, [1,2,3,4,5]))
  6. print(reduce(lambda x,y:x+y,[1,2,3,4,5]))
  7. print(type(1))
  8. print(type([2]))
  9. class y(object):
  10. z=5
  11. x=type('y',(object,),dict(z=5))
  12. print(x)
  13. '''
  14. def map_func(x):
  15. s = x.split()
  16. return (s[0], [int(s[1]),int(s[2]),int(s[3])]) #返回为(key,vaklue)格式,其中key:x[0],value:x[1]且为有三个元素的列表
  17. #return (s[0],[int(s[1],s[2],s[3])]) #注意此用法不合法
  18. from pyspark import SparkContext #导入模块
  19. sc=SparkContext(appName='Student') #命名
  20. lines=sc.textFile("student.txt").map(lambda x:map_func(x)).cache() #导入数据且保持在内存中,其中cache():数据保持在内存中
  21. count=lines.count() #对RDD中的数据个数进行计数;其中,RDD一行为一个数据集
  22. print(count)
  23. '''
  24. #studentExample 例子 练习
  25. def map_func(x):
  26. s = x.split()
  27. return (s[0], [int(s[1]),int(s[2]),int(s[3])]) #返回为(key,vaklue)格式,其中key:x[0],value:x[1]且为有三个元素的列表
  28. #return (s[0],[int(s[1],s[2],s[3])]) #注意此用法不合法
  29. def has100(x):
  30. for y in x:
  31. if(y == 100): #把x、y理解为 x轴、y轴
  32. return True
  33. return False
  34. def allis0(x):
  35. if(type(x)==list and sum(x) == 0): #类型为list且总分为0 者为true;其中type(x)==list :判断类型是否相同
  36. return True
  37. return False
  38. def subMax(x,y):
  39. m = [x[1][i] if(x[1][i] > y[1][i]) else y[1][i] for i in range(3)]
  40. return('Maximum subject score', m)
  41. def sumSub(x,y):
  42. n = [x[1][i]+y[1][i] for i in range(3)]
  43. #或者 n = ([x[1][0]+y[1][0],x[1][1]+y[1][0],x[1][2]+y[1][2]])
  44. return('Total subject score', n)
  45. def sumPer(x):
  46. return (x[0],sum(x[1]))
  47. #停止之前的SparkContext,不然重新运行或者创建工作会失败;另外,只有 sc.stop()也可以,但是首次运行会有误
  48. try:
  49. sc.stop()
  50. except:
  51. pass
  52. from pyspark import SparkContext #导入模块
  53. sc=SparkContext(appName='Student') #命名
  54. lines=sc.textFile("student.txt").map(lambda x:map_func(x)).cache() #导入数据且保持在内存中,其中cache():数据保持在内存中
  55. count=lines.count() #对RDD中的数据个数进行计数;其中,RDD一行为一个数据集
  56. #RDD'转换'运算 (筛选 关键字filter)
  57. whohas100 = lines.filter(lambda x: has100(x[1])).collect() #注意:处理的是value列表,也就是x[1]
  58. whois0 = lines.filter(lambda x: allis0(x[1])).collect()
  59. sumScore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
  60. #‘动作’运算
  61. maxScore = max(sumScore,key=lambda x: x[1]) #总分最高者
  62. minScore = min(sumScore,key=lambda x: x[1]) #总分最低者
  63. sumSubScore = lines.reduce(lambda x,y: sumSub(x,y))
  64. avgScore = [x/count for x in sumSubScore[1]]#单科成绩平均值
  65. #RDD key-value‘转换’运算
  66. subM = lines.reduce(lambda x,y: subMax(x,y))
  67. redByK = lines.reduceByKey(lambda x,y: [x[i]+y[i] for i in range(3)]).collect() #合并key相同的value值x[0]+y[0],x[1]+y[1],x[2]+y[2]
  68. #RDD'转换'运算
  69. sumPerSore = lines.map(lambda x: sumPer(x)).collect() #每个人的总分 #sumSore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
  70. sorted = lines.sortBy(lambda x: sum(x[1])) #总成绩低到高的学生成绩排序
  71. sortedWithRank = sorted.zipWithIndex().collect()#按总分排序
  72. first3 = sorted.takeOrdered(3,key=lambda x:-sum(x[1])) #总分前三者
  73. #限定以空格的形式输出到文件中
  74. first3RDD = sc.parallelize(first3)\
  75. .map(lambda x:str(x[0])+' '+str(x[1][0])+' '+str(x[1][1])+' '+str(x[1][2])).saveAsTextFile("result")
  1. #print(lines.collect())
  2. print("数据集个数(行):",count)
  3. print("单科满分者:",whohas100)
  4. print("单科零分者:",whois0)
  5. print("单科最高分者:",subM)
  6. print("单科总分:",sumSubScore)
  7. print("合并名字相同的分数:",redByK)
  8. print("总分/(人)",sumPerSore)
  9. print("最高总分者:",maxScore)
  10. print("最低总分者:",minScore)
  11. print("每科平均成绩:",avgScore)
  12. print("总分倒序:",sortedWithRank)
  13. print("总分前三者:",first3)
  14. print(first3RDD)
  15. sc.stop()

1.3 结果展示

  1. 数据集个数(行): 7
  2. 单科满分者: [('li', [100, 54, 0]), ('li', [100, 54, 0])]
  3. 单科零分者: [('yanf', [0, 0, 0])]
  4. 单科最高分者: ('Maximum subject score', [100, 90, 100])
  5. 单科总分: ('Total subject score', [485, 438, 280])
  6. 合并名字相同的分数: [('zhang', [180, 180, 200]), ('li', [200, 108, 0]), ('yang', [85, 90, 30]), ('wang', [20, 60, 50]), ('yanf', [0, 0, 0])]
  7. 总分/(人) [('yang', 205), ('wang', 130), ('zhang', 280), ('zhang', 280), ('li', 154), ('li', 154), ('yanf', 0)]
  8. 最高总分者: ('zhang', 280)
  9. 最低总分者: ('yanf', 0)
  10. 每科平均成绩: [69.28571428571429, 62.57142857142857, 40.0]
  11. 总分倒序: [(('yanf', [0, 0, 0]), 0), (('wang', [20, 60, 50]), 1), (('li', [100, 54, 0]), 2), (('li', [100, 54, 0]), 3), (('yang', [85, 90, 30]), 4), (('zhang', [90, 90, 100]), 5), (('zhang', [90, 90, 100]), 6)]
  12. 总分前三者: [('zhang', [90, 90, 100]), ('zhang', [90, 90, 100]), ('yang', [85, 90, 30])]
  13. None

二、代码解析

2.1函数解析

2.1.1 collect()

RDD的特性

RDD特性

在进行基本RDD“转换”运算时不会立即执行,结果不会显示在显示屏中,collect()是一个“动作”运算,会立刻执行,显示结果。

2.1.2 reduce()

说明

reduce()函数会对参数序列中的元素进行累积。

语法

reduce(function, iterable[, initializer])

参数

  • function – 函数,有两个参数
  • iterable – 可迭代对象
  • initializer – 可选,初始参数

实例

说明:Python3的内建函数移除了reduce函数,reduce函数放在functools模块

  1. In [24]:
  2. #r = reduce(lambda x, y: x+y, [4,4,5,5]) # 使用 lambda 匿名函数
  3. from functools import reduce
  4. def add(x, y) : # 两数相加
  5. return x + y
  6. reduce(add, [1,2,3,4,5])
  7. Out[24]:
  8. 15
  9. In [25]:
  10. reduce(lambda x, y: x+y, [1,2,3,4,5]) # 使用 lambda 匿名函数
  11. Out[25]:
  12. 15

2.1.3 type()

语法

class type(name, bases, dict)

参数

  • name – 类的名称。
  • bases – 基类的元组。
  • dict – 字典,类内定义的命名空间变量。

返回值

一个参数返回对象类型, 三个参数,返回新的类型对象。

实例

  1. # 一个参数实例
  2. In [1]:
  3. type(1)
  4. Out[1]:
  5. int
  6. In [2]:
  7. type([2])
  8. Out[2]:
  9. list
  10. In [3]:
  11. type({3:'three'})
  12. Out[3]:
  13. dict
  14. In [5]:
  15. x = 5
  16. type(x) == list #判断x的类型是否为list
  17. Out[5]:
  18. False
  1. # 三个参数实例
  2. class y(object):
  3. z = 5
  4. x = type('y',(object,),dict(z=5))
  5. print(x)
  6. <class '__main__.y'> #产生一个新的类型

三、问题分析

  1. An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
  2. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

解析

1、检查拼写是否有误
2、检查缩进是否合规
3、检查()是否一一配对

四、实例 小练

4.1 数据 user_small(注意没有后缀)

  1. 1441900799.728000 1441900802.452000 8618245698655 0134730038729312 2 1 1 IPHONE_5 17999 20693 10.67.23.157 111.13.34.100 6 58986 80 GET mmsns.qpic.cn /mmsns/PdibpV1sFDHdaOTqNXb8VGSNicyYpOVa9R7icxSr4BkwbsSyzJbBTmE5Zz5aZichejbkKuia7twzraqk/150?tp=webp&length=1136&width=640 weixin.qq.com/?version=369229843&uin=2925174340&nettype=0&scene=moment WeChat/6.2.0.19 CFNetwork/711.3.18 Darwin/14.0.0 200 59 image/webp 7504 706 8212 7 1827
  2. 1441900750.023000 1441900754.063000 8613836044032 0136210021269713 2 1 1 IPHONE_5 17752 25632 10.67.21.71 117.144.242.26 6 52941 80 POST short.weixin.qq.com http://short.weixin.qq.com/cgi-bin/micromsg-bin/tenpay - MicroMessenger Client - - - - 715 0 7 1827
  3. 1441900755.480472 1441900756.762000 8618246899077 0131830068670612 2 1 1 IPHONE_4S 17875 61433 10.67.43.51 120.192.84.86 6 58684 31271 GET i.gtimg.cn http://i.gtimg.cn/qqshow/admindata/comdata/vip_emoji_aio_ios_new_config/xydata.json - QQ/5.7.0.469 CFNetwork/672.0.8 Darwin/14.0.0 304 83 x-json - 0 0 18 1041
  4. 1441900754.860000 1441900755.480472 8618246899077 0131830068670612 2 1 1 IPHONE_4S 17875 61433 10.67.43.51 120.192.84.86 6 58684 31271 GET i.gtimg.cn http://i.gtimg.cn/club/item/avatar/zip/0/i0/all.zip - QQ/5.7.0.469 CFNetwork/672.0.8 Darwin/14.0.0 404 210 text/html 85 487 411 18 1041
  5. 1441900753.786000 1441900755.726000 8618246195634 9900026543899411 2 1 1 IPHONE_4S 17783 19302 10.67.29.55 111.40.194.207 6 49412 80 GET sb.symcd.com /MFYwVKADAgEAME0wSzBJMAkGBSsOAwIaBQAEFDmvGLQcAh85EJZW%2FcbTWO90hYuZBBROQ8gddu83U3pP8lhvlPM44tW93wIQd9jUM82by0%2FVy957MNapGQ%3D%3D - securityd (unknown version) CFNetwork/672.0.2 Darwin/14.0.0 - - - - 522 0 18 1041
  6. 1441900761.308739 1441900761.408000 8615045213668 0127590050857822 2 1 1 IPHONE_4 17772 50621 10.67.63.219 183.232.95.61 6 49337 80 POST szminorshort.weixin.qq.com http://szminorshort.weixin.qq.com/cgi-bin/micromsg-bin/rtkvreport - MicroMessenger Client - - - - 500 16 7 1827
  7. 1441900696.427624 1441900761.308739 8615045213668 0127590050857822 2 1 1 IPHONE_4 17772 50621 10.67.63.219 183.232.95.61 6 49337 80 POST szminorshort.weixin.qq.com http://szminorshort.weixin.qq.com/cgi-bin/micromsg-bin/rtkvreport - MicroMessenger Client - - - - 500 16 7 1827
  8. 1441900693.219000 1441900696.427624 8615045213668 0127590050857822 2 1 1 IPHONE_4 17772 50621 10.67.63.219 183.232.95.61 6 49337 80 POST szminorshort.weixin.qq.com http://szminorshort.weixin.qq.com/cgi-bin/micromsg-bin/rtkvreport - MicroMessenger Client - - - - 502 16 7 1827
  9. 1441900750.845345 1441900753.537000 8618246195634 9900026543899411 2 1 1 IPHONE_4S 17783 19302 10.67.29.55 117.135.169.124 6 49411 80 GET b227.photo.store.qq.com /psb?/V12jlwSP30SPej/VE1V5LlXFMzHeg5gTzpyuCueaEVEGV*0X6BbSyJZRhs!/b/dCWGUIc.HQAA&ek=1&kp=1&pt=0&bo=yAD6AAAAAAABBxI!&t=5 v1_iph_sq_5.6.0_1_app_a-4-2 QQ/5.6.0.438 CFNetwork/672.0.2 Darwin/14.0.0 - - - - 792 0 18 1041
  10. 1441900748.094000 1441900750.845345 8618246195634 9900026543899411 2 1 1 IPHONE_4S 17783 19302 10.67.29.55 117.135.169.124 6 49411 80 GET b227.photo.store.qq.com /psb?/V12jlwSP30SPej/VE1V5LlXFMzHeg5gTzpyuCueaEVEGV*0X6BbSyJZRhs!/b/dCWGUIc.HQAA&ek=1&kp=1&pt=0&bo=yAD6AAAAAAABBxI!&t=5 v1_iph_sq_5.6.0_1_app_a-4-2 QQ/5.6.0.438 CFNetwork/672.0.2 Darwin/14.0.0 - - - - 792 0 18 1041

4.2 用户上网记录统计(一行为一条记录).(用户:第3列)

  1. #test 1_1 用户上网记录统计
  2. from pyspark import SparkContext
  3. sc = SparkContext(appName='test1')
  4. rdd = sc.textFile('user_small')\
  5. .map(lambda x:x.split('\t'))\
  6. .map(lambda x:(x[3],1))\
  7. .reduceByKey(lambda x,y:x+y)\
  8. .map(lambda x:str(x[0])+' '+str(x[0][1])).collect()
  9. #.saveAsTextFile('text1_1') #限定为空格键输出到文件
  10. print(rdd)
  11. sc.stop()

运行结果: 

['0127590050857822 1', '0134730038729312 1', '0136210021269713 1', '0131830068670612 1', '9900026543899411 9']

4.2用户流量统计。分别统计上行流量及下行流量并将结果各列以空格键隔开输出到文件。(用户:第3列;上行流量:第25列;下行流量:第26列)

  1. #test 1_2 统计用户上网 分别为上、下行流量
  2. def map_func(x):
  3. s = x.split('\t')
  4. return (s[2],[int(s[24]),int(s[25])])#返回为(key,vaklue)格式,其中key:x[0],value:x[1]且为有三个元素的列表
  5. #return (s[0],[int(s[1],s[2],s[3])]) #注意此用法不合法
  6. try:
  7. sc.stop() #停止之前的SparkContext,不然重新运行或者创建工作会失败
  8. except:
  9. pass
  10. from pyspark import SparkContext
  11. sc=SparkContext(appName='test')
  12. lines=sc.textFile("user_small").map(lambda x:map_func(x)).cache()
  13. redByK = lines.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
  14. sum_flow = redByK.map(lambda x:str(x[0])+' '+str(x[1][0])+' '+str(x[1][1]))\
  15. .saveAsTextFile('text1_2')
  16. sc.stop()

4.3 统计用户总流量

  1. #test 1_2 统计用户上网 总流量
  2. try:
  3. sc.stop() #停止之前的SparkContext,不然重新运行或者创建工作会失败
  4. except:
  5. pass
  6. from pyspark import SparkContext
  7. sc = SparkContext(appName='test1')
  8. rdd = sc.textFile('user_small')\
  9. .map(lambda x:x.split('\t'))\
  10. .map(lambda x:(x[2],int(x[24])+int(x[25])))\
  11. .reduceByKey(lambda x,y:x+y)\
  12. .map(lambda x:str(x[0])+' '+str(x[1])).collect()
  13. print(rdd)
  14. sc.stop()

运行结果:

['8618246899077 898', '8615045213668 1550', '8618245698655 8918', '8613836044032 715', '8618246195634 2106']

4.4、微信APP流量统计。(微信APP特征MicroMessenger,位于第20列,统计对应的下行流量值——第26列的数值。)

  1. # test 1_3
  2. from pyspark import SparkContext
  3. sc = SparkContext(appName='test1')
  4. rdd = sc.textFile('user_small') \
  5. .map(lambda x: x.split('\t')) \
  6. .map(lambda x: (x[19], int(x[25]))) \
  7. .filter(lambda x: 'WeChat' or 'MicroMessenger' in x[1])\
  8. .reduceByKey(lambda x, y: x + y) \
  9. .map(lambda x: str(x[0]) + ' ' + str(x[1])).collect()
  10. print(rdd)

运行结果:

['QQ/5.7.0.469 CFNetwork/672.0.8 Darwin/14.0.0 411', 'securityd (unknown version) CFNetwork/672.0.2 Darwin/14.0.0 0', 'QQ/5.6.0.438 CFNetwork/672.0.2 Darwin/14.0.0 0', 'WeChat/6.2.0.19 CFNetwork/711.3.18 Darwin/14.0.0 8212', 'MicroMessenger Client 48']

 
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/620330
推荐阅读
相关标签
  

闽ICP备14008679号