赞
踩
本文的内容参考Spark编程基础(Python版) 厦门大学 林子雨
>>> data = [1, 2, 3, 4, 5]
>>> for i in map((lambda x: x+5), data):
... print(i)
...
6
7
8
9
10
>>> from functools import reduce
>>> data = [1, 2, 3, 4, 5]
>>> reduce((lambda x, y: x+y), data)
15
若使用Python运行代码,首先安装spark库,该库不是软件,两百多M,换源下载快一点
pip3 install spark -i https://pypi.douban.com/simple
注意:在Ubuntu环境中一定要安装有spark环境,否则报错,并且在导入SparkConf和SparkText对象前要初始化环境。下面演示代码,具体代码的讲解后面会讲到
# test.py
import findspark
findspark.init() # 初始化找到本机安装的spark的环境
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster('local').setAppName('MyApp')
sc = SparkContext(conf=conf)
array = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(array)
rdd.foreach(print)
写好代码之后使用Python解释器运行
python3 test.py
使用Python运行代码,首先安装spark库,该库不是软件,两百多M,换源下载快一点
pip3 install spark -i https://pypi.douban.com/simple
注意,在Windows中直接操作时,不依赖Hadoop和spark软件的限制,运行时使用的计算机资源是由Windows提供,只是单机模式运行,且只是利用了spark的计算优势,在大数据的其它方面的组件的优势没有使用并发挥到,所以只能当做是练习
# test.py
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster('local').setAppName('MyApp')
sc = SparkContext(conf=conf)
array = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(array)
rdd.foreach(print)
写好代码之后使用Python解释器运行
python test.py
# test.py
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster('local').setAppName('MyApp')
sc = SparkContext(conf=conf)
sc.setLogLevel('ERROR') # 设置日志输出级别,能有效减少信息的输出
array = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(array)
rdd.foreach(print)
写好代码之后使用spark-submit运行
spark-submit test.py
在命令行直接敲pyspark
就可以进入里面,下面的所有代码都是在该环境下运行
我并没有使用该方法执行,但微软官网上有提供类似的方式的安装教程,我使用的是在vscode中远程连接,然后在vscode中打开远程主机的文件然后编写python代码,然后使用第一种方式提交运行我的spark代码
总结,如果只是平常的练习可以使用第2和第4种方法运行,在开发中推荐使用第1种方式运行,不推荐使用第3种方式,因为第3种方式输出的无关的调试信息太多,扰乱视线
弹性可复原的分布式数据集
单从逻辑上的表现来说,他就是一个数据集合
如果在pyspark中写代码,系统自动创建sc对象,如果使用自己的代码,那么就需要自己写sc对象
import findspark
findspark.init() # 初始化找到本机安装的spark的环境
from pyspark import SparkContext, SparkConf
# conf = SparkConf().setMaster('local').setAppName('MyApp')
# sc = SparkContext(conf=conf)
sc = SparkContext('local', 'MyApp') # 设置运行模式和工程名称,和上面两行的效果是一样的
sc.setLogLevel('ERROR') # 设置日志输出信息级别
导入之后,默认每一行就是一个RDD,也就是下面的lines
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") # 导入本地文件
lines.foreach(print) # 执行函数
lines = sc.textFile("hdfs://localhost:9000/user/zhong/word.txt") # 导入hdfs文件
lines.foreach(print) # 执行函数
lines = sc.textFile("/user/zhong/word.txt") # 导入hdfs文件,默认选择hdfs文件系统
lines.foreach(print) # 执行函数
lines = sc.textFile("word.txt") # 导入hdfs文件,默认指向了/user/zhong/目录,zhong是我的用户名
lines.foreach(print) # 执行函数
也可以自行创建,下面的foreach( )
函数接收一个函数名,然后执行该函数,后面常使用以查看结果,但不是重点
array = [1, 2, 3, 4, 5, 6, 7]
rdd = sc.parallelize(array)
rdd.foreach(print)
# 1
# 2
# ...
# 7
操作 | 含义 |
---|---|
filter( func ) | 筛选出满足函数func的元素,并返回一个新的数据集 |
map( func ) | 将每个元素传递到函数func中,并将结果返回为一个新的数据集 |
flatMap( func ) | 每个输入元素都可以映射到0或多个输出结果 |
groupByKey() | 应用于( K, V)键值对的数据集时,返回一个新的(K, Iterable形式的数据集 |
reduceByKey( func ) | 应用于(K, V)键值对的数据集时返回一个新的(K, V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果 |
下面实现从文件中读取数据,然后筛选出含有Spark字符串的RDD
lines = sc.textFile("word.txt") # 导入hdfs文件,默认指向了/user/zhong/目录,zhong是我的用户名
linesWithSpark = lines.filter(lambda line:"Spark" in line)
linesWithSpark.foreach(print) # 执行输出函数
下面先创建元素,然后将数据转为RDD,最后实现每个元素加10操作
data= [1, 2, 3, 4, 5]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x: x+10)
rdd2.foreach(print)
# 11
# 12
# 13
# 14
# 15
下面先导入数据集,然后实现分割每一行的数据,分割标志为一个分号
lines = sc.textFile("word.txt")
words = lines.map(lambda line: line.split(","))
words.foreach(print) # 执行输出函数
# ['Hadoop', 'is', 'good']
# ['Spark', 'is', 'fast']
# ['Spark', 'is', 'better']
下面先导入数据集,然后实现分割每一行的数据,分割标志为一个分号,最后再将一个RDD“拍扁”,内部形成多个子元素
lines = sc.textFile("word.txt")
words = lines.flatMap(lambda line: line.split(","))
words.foreach(print) # 执行输出函数
# [['Hadoop'], ['is'], ['good']]
# [['Spark'], ['is'], ['fast']
# [['Spark'], ['is'], ['better']
下面先创建数据集,然后对其根据键来分组,相同的键作为一组,它的值由pyspark.resultiterable.Resultiterable
对象封装为一个可迭代对象
words=sc.parallelize([("Hadoop", 1), ("is", 1), ("good", 1), ("Spark", 1), ("is", 1), ("fast", 1), ("Spark", 1), ("is", 1), ("better", 1)])
wordsl= words.groupByKey()
wordsl.foreach(print)
# ('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x0000019C1701CAC8>)
# ('is', <pyspark.resultiterable.ResultIterable object at 0x0000019C1701C788>)
# ('good', <pyspark.resultiterable.ResultIterable object at 0x0000019C1701C848>)
# ('Spark', <pyspark.resultiterable.ResultIterable object at 0x0000019C1701CAC8>)
# ('fast', <pyspark.resultiterable.ResultIterable object at 0x0000019C1701C788>)
# ('better', <pyspark.resultiterable.ResultIterable object at 0x0000019C1701C848>)
# 可以理解为这样:('is', (1, 1, 1))
下面先实现数据集的创建,然后根据键进行分组,然后再对根据键分组生成的值操作,每次从一个值的可迭代对象中取两个值出来,代入匿名函数中进行计算,第一次计算得到的值作为第二次计算的第一个值,然后和第三个值再次进行计算,依次循环直到可迭代对象迭代完毕(本质上是迭代器抛异常然后结束),最后形成新的值。通俗理解就是在groupByKey( )的基础上添加函数对可迭代对象进行操作
words=sc.parallelize([("Hadoop", 1), ("is", 1), ("good", 1), ("Spark", 1), ("is", 1), ("fast", 1), ("Spark", 1), ("is", 1), ("better", 1)])
wordsl= words.reduceByKey(lambda x,y: x+y)
words1.forseach(print)
# ('good', 1)
# ('Hadoop', 1)
# ('better', 1)
# ('Spark', 2)
# ('fast', 1)
# ('is', 3)
上面所讲的都是转换操作(除forseach( )函数),实际上,spark执行转换操作的时候并不会真的去取数据并对数据进行操作,而是记录你要进行的转换行为,记录一步步的转换,然后形成有关系的有向无环图结构。
为什么是有关系的,看上面的例子,先是words=sc.parallelize(……),然后wordsl= words.reduceByKey(……),那么words是不是就是words1的父节点,如果对一个数据进行多次转换,那么就形成了爷父子孙……。为什么是有向的,爷到父,父到子,这不就是有向的吗。为什么是无环的图,一个爷可以有多个父,一个父可以有多个子,但无论怎样,子都不会变成父或爷,在思维图上看也就是无环,一直转换下去就类似一个树形图,所以称为图结构。
直到遇到行动操作才会从头开始,取数据,一步步进行转换,然后执行行动操作进行计算,形成新的RDD。所以这整个从头到尾的计算过程是一种惰性机制
行动操作计算得到的RDD可以选择保存。正是由于是有有关系的有向无环图结构,所以当在后面进行行动操作时发生了不可预料的错误,那么就可以退回到上一个行动操作节点,然后再次计算,这样体现了安全性
行动操作函数 | 含义 |
---|---|
*.count( ) | 返回数据集中的元素个数 |
*.collect( ) | 以数组的形式返回数据集中的所有元素 |
*.first( ) | 返回数据集中的第一个元素 |
*.take( n ) | 以数组的形式返回数据集中的前n个元素 |
*.reduce( func ) | 通过函数 func(输入两个参数并返回一个值)聚合数据集中的元素 |
*.foreach( func ) | 将数据集中的每个元素传递到函数func中运行 |
rdd = sc.parallelize([1, 2, 3, 4, 5]) rdd.count() # 5 rdd.first() # 1 rdd.take(3) # [1, 2, 3] rdd.reduce(lambda x, y: x+y) # 15 rdd.collect() # [1, 2, 3, 4, 5] rdd.foreach(lambda x: print(x)) # 1 # 2 # 3 # 4 # 5
首先得知道,当一个数据过大时(超128M)hdfs对数据进行切割分块存储到多个节点当中,当mapreduce或spark要取数据进行计算时,就需要将每一个数据块发送到各个节点,各个节点才能进行计算,具体哪块数据到哪个节点,都是靠yarn进行资源的调度,而yarn考虑的是资源的平均分配。所以会发现每个Datanode节点会将数据分发到各个节点,这样会造成通信的开销。
如果使用分区,简单的说就是,将存放该数据的Datanode节点设为计算节点,当要进行计算时,数据就可以不用跑到其它节点了,这样减少了通信的开销,同时也增加了计算的并行度
在local模式下默认为CPU数目,启动时若设置为local[ N ]则默认为N
在Apache Messos模式下默认为8
在Standalone和Yarn模式下,默认值为,“集群中所有CPU核心数目总和”和“2”之间取一个最大值
手动设置分区个数
# 在加载文件的方法中,设置分区数目为5
sc.textFile("/user/zhong/test.txt", 5)
# 在自定义数据集的方法中,设置分区数目为2
array = [1, 2, 3, 4]
rdd = sc.parallelize(array, 2)
# 在已经定义过分区数目后,还可以动态重定义分区数目
array = [1, 2, 3, 4]
rdd = sc.parallelize(array, 2)
len(rdd.glom().collect()) # 显示rdd这个RDD分区的数量
# 2
rdd1 = rdd.repartition(3) # 对rdd这个RDD进行重新分区
len(rdd.glom().collect()) # 再次显示rdd这个RDD分区的数量
# 3
在进行对此转换操作,然后进行行动操作后,生成的RDD如果不进行保存,在后面的子节点下再次进行行动操作时,就会从头开始取数据和计算,而不是从当前节点取RDD结果进行操作,所以要对每次行动操作生成的RDD进行持久化保存
使用persist( )方法在进行行动操作之前对一个RDD标记为持久化,等真正遇到第一个进行行动操作之后,系统自动对生成的RDD进行保存
保存时,是以放序列化对象的形式保存在JVM当中。
*.persist(MEMORY_ONLY),代表如果内存不足,就以最近最少使用的原则替换
*.persist(MEMORY_AND_DISK),代表如果内存不足,就保存到磁盘
如果每次都要写*.persist(MEMORY_ONLY)这么长的代码,就有点麻烦,可以使用*.cache()方法代替
如果数据太大内存保存不下,或者后面不再使用到了,可以该RDD从缓存中移除。使用*.unpersist( )方法
strlist = ["Hadoop", "Spark", "Hive"]
rdd = sc.parallelize(strlist)
rdd.cache() # 会调用persist(MEMORY_ONLY)方法标记为持久化,但是不会触发计算,也不会进行缓存到内存
print(','.join(rdd.collect())) # 行动操作,触发计算,然后将计算结果保存到内存
# Hadoop,Spark,Hive
也可以直接保存到磁盘,并手动指定保存磁盘位置。位置是目录地址,不是具体文件位置,有多少个分区就产生有多少个文件
strlist = ["Hadoop", "Spark", "Hive"]
rdd = sc.parallelize(strlist)
rdd1 = ','.join(rdd.collect())
rdd1.saveAsFile("file:///usr/local/spark/mycode/rdd/partitioner/")
在操作过程中用的最多的是键值对的操作,如何将一个普通的数据转化为一个键值对就显得很重要
lines = ["I Love Python"]
rdd = lines.flatMap(lambda x: x.split(' ')).map(lambda y: (y, 1))
rdd.foreach(print)
# ('I', 1)
# ('Love', 1)
# ('Python', 1)
命令行使用pyspark
命令启动spark之后,在浏览器输入http://localhost:4040进入web页面。注意一定是本地访问,不能远程访问,而且一定是开启了pyspark之后才能访问web页面
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。