赞
踩
Spark是一个大规模数据处理的统一分析引擎。
迅速、通用、易用、支持多种资源管理器
Spark用十分之一的计算资源,获得了比Hadoop快3倍的速度。
可以用Spark进行sql查询、流式计算、机器学习、图计算。
支持多种编程语言API,包括Java、Scala、Python、R
Spark可以使用单机集群模式来运行,也可以在Hadoop YARN、Apache Mesos、Kubernates上运行,或者在“云”里运行。
Spark可以访问HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive等上百种数据源。
Hadoop是分布式数据设施。
Spark只是一个专门的工具,不会进行分布式数据的存储。
Hadoop可用自身的MapReduce来代替Spark
Spark可不依赖Hadoop,而选择其他基于云的数据系统平台。
Hadoop:两步计算、磁盘存储
Spark:多步计算、内存存储
Hadoop:使用HDFS
Spark:使用RDD
大多数的人会认为Spark都是基于内存的计算的,但是基于如下两个情况,Spark会落地于磁盘
Spark避免不了shuffle
如果数据过大(比服务器的内存还大)也会落地于磁盘
在比较短的作业确实能快上100倍,但是在真实的生产环境下,一般只会快 2.5x ~ 3x!
目前备受追捧的Spark还有很多缺陷,比如:
稳定性方面,由于代码质量问题,Spark长时间运行会经常出错,在架构方面,由于大量数据被缓存在RAM中,Java回收垃圾缓慢的情况严重,导致Spark性能不稳定,在复杂场景中SQL的性能甚至不如现有的Map/Reduce。
不能处理大数据,单独机器处理数据过大,或者由于数据出现问题导致中间结果超过RAM的大小时,常常出现RAM空间不足或无法得出结果。然而,Map/Reduce运算框架可以处理大数据,在这方面,Spark不如Map/Reduce运算框架有效。
不能支持复杂的SQL统计;目前Spark支持的SQL语法完整程度还不能应用在复杂数据分析中。在可管理性方面,SparkYARN的结合不完善,这就为使用过程中埋下隐忧,容易出现各种难题。
推荐系统
实时日志系统
快速查询系统
定制广告系统
用户图计算系统
Spark Core提供Spark SQL、Spark Streaming、MLlib、GraphX四大模块,进行离线计算,产生RDD弹性分布式数据集。
Spark SQL是一种结构化的数据处理模块。
DataFrame是Spark SQL提供的一个编程抽象,相当于一个列数据的分布式的采集组织,在关系数据库或R/Python中的概念相当于一个表。
Spark Streaming处理实时数据流并容错。
MLlib是Spark提供的可扩展的机器学习库
MLlib提供的API主要分为以下两类:
GraphX是Spark面向图计算提供的框架与算法库
2.x基本上是基于1.x进行了更多的功能和模块的扩展以及性能的提升:
主要思想:在运行时使用优化后的字节码,将整体查询合成为单个函数,不再使用虚拟函数调用,而是利用CPU来注册中间数据。
start-all.sh
(已设置好环境变量)
stop-all.sh
(已设置好环境变量)
spark-submit //提交任务命令
--master spark://master:7077 //提交集群的地址
--deploy-mode client //部署模式为client模式
--executor-memory 512M //设置每个执行单元使用512Mb的内存空间
--total-executor-cores 4 //每个执行单元为4个核
demo.py //实际提交的应用程序,具体以实际为准
不用多说。。。
local、standalone、yarn、mesos
pyspark --master local[4]
pyspark --master yarn-client
pyspark --master spark://Spark:7077
pyspark --master mesos://Mesos:7077
日志级别包括:ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE,WARN
控制日志输出内容的方式有两种:
修改log4j.properties
,默认控制台输出INFO
及以上级别信息
log4j.rooCategory=INFO,console
代码中使用setLogLevel(logLevel)
控制日志输出
from pyspark import SparkContext
sc = SparkContext("local", "First App")
sc.setLogLevel("WARN")
就是安装环境,编译器可以用Anaconda,Jupyter notebook,pycharm,pyspark是一个python的第三方库,可以通过pip安装,但是如果安装了Spark包,bin目录里会包含pyspark
初代RDD:真实数据的分区信息,单个分区的读取方法
子代RDD:初代RDD产生子代RDD的原因(动作),初代RDD的引用
task在executor上运行的时候
分区信息(Partition) | 数据集的基本组成单位 |
---|---|
Compute函数 | 对于给定的数据集,需要做哪些计算 |
Partitioner函数 | 对于计算出来的数据结果如何分发 |
优先位置列表 | 对于data partition的位置偏好 |
依赖关系 | 描述了RDD之间的Lineage |
下面代码都是Python API,使用pyspark
distFile = sc.textFile("file:///FILE_TO_PATH")
#textFile支持从多种源创建RDD,如hdfs://,s3n://
distFile.count()
#计算文本的行数
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data) #通过并行化创建RDD
#parallelize可以传入分片个数参数,否则采用defaultParallelism
distData.count() #返回RDD中元素的个数
两种算子(Operation)
转换(transformation) | 在一个已存在的RDD上创建一个新的RDD,但实际的计算并没有执行,仅仅记录操作过程 |
---|---|
行动(action) | 执行RDD记录的所有运行transformations操作,并计算结果,结果可返回到driver程序 |
如何区分?
通过对这个RDD的每个元素应用一个函数来返回一个新的RDD。
>>> rdd = sc.parallelize(['b', 'a', 'c'])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a',1), ('b',1), ('c',1)]
将函数应用于该RDD的所有元素,然后将结果平坦化(压扁),从而返回新的RDD。
>>> rdd = sc.parallelize([2, 3, 4])
>>> rdd2 = rdd.map(lambda x: range(1, x))
>>> rdd2.collect()
[[1], [1, 2], [1, 2, 3]]
>>> rdd1 = rdd.flatMap(lambda x: range(1, x))
>>> rdd1.collect()
[1, 1, 2, 1, 2, 3]
map | 映射 |
---|---|
flatMap | 先映射,后扁平化 |
– | – |
map对每一次(func)都产生一个对象,分别产生一个列表 | flatMap多一步,最后会将所有对象合并为一个列表返回 |
它的输入函数应用于每个分区,也就是把每个分区中的内容作为整体来处理的
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
# 上面第二个参数是分区数,所以分成了[1, 2]和[3, 4]。
# 不管分区数为多少,都是取下界。比如上面假如分区数为3,则界限分别在4/3和8/3,取下界则分成[1], [2], [3, 4]。
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]
注
与mapPartitions的区别:
>>> rdd = sc.parallelize([1, 2, 3, 4], 4) # [1] [2] [3] [4]
>>> def f(splitIndex, iterator): yield splitIndex
>>> rdd.mapPartitionsWithIndex(f).sum()
6 # 0+1+2+3
对每个元素应用f函数,返回值为true的元素在RDD中保留,返回值为false的元素将被过滤掉。
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]
将RDD中的元素进行去重操作
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.distinct().collect()
[1, 2, 3]
合并两个RDD,结果中包含两个RDD中的所有元素
>>> rdd1 = sc.parallelize([1, 2, 3, 4])
>>> rdd2 = sc.parallelize([5, 6, 7, 8])
>>> rdd1.union(rdd2).collect()
[1, 2, 3, 4, 5, 6, 7, 8]
返回这个RDD和另一个RDD的交集,输出将不包含任何重复的元素
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersectioni(rdd2).collect()
[1, 2, 3]
返回RDD1中出现,但是不在RDD2中出现的元素,不去重
>>> rdd1 = sc.parallelize([('a', 1), ('b', 4), ('b', 5), ('a', 3)])
>>> rdd2 = sc.parallelize([('a', 3), ('c', None)])
>>> rdd1.subtract(rdd2).collect()
[('a', 1), ('b', 4), ('b', 5)]
根据指定的KeyFunc对标准RDD进行排序
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
这类RDD称为PairRDD
针对(Key, Value)型数据中的Value进行Map操作,而不对Key进行处理。
>>> rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])
>>> rdd.mapValues(lambda value: value + 2).glom().collect() # glom()将同一分区的元素合并到一个列表里
[[('a', 3), ('b', 4), ('c', 5)]]
完成mapValues处理后,再对结果进行扁平化处理。
>>> rdd = sc.parallelize([('a', ['x', 'y']), ('b', ['p', 'r'])])
>>> rdd.flatMapValues(lambda x: x).collect()
[('a', 'x'), ('a', 'y'), ('b', 'p'), ('b', 'r')]
相同Key值的value值进行对应函数运算,类似于hdp得combiner操作。
>>> from operator import add
>>> rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3)])
>>> rdd.reduceByKey(add).collect()
[('a', 4), ('b', 2)]
将Pair RDD中相同Key的值放在一个序列中
>>> rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
>>> rdd.groupByKey().mapValues(len).collect()
[('a', 2), ('b', 1)]
>>> rdd.groupByKey().mapValues(list).collect()
[('a', [1 1]), ('b', [1])]
注:如果分组的目的是为了聚合(比如求综合或求平均),那还是建议使用reduceByKey或aggregateByKey,二者会提供更好的性能。另外groupBy系列算子涉及到一个将数据打乱而又重新组和的操作,这个操作我们称之为shuffle
根据key值进行排序,默认升序
>>> tmp = [('a', 1), ('B', 2), ('1', 3), ('d', 4)]
>>> sc.parallelize(tmp).sortByKey()
[('1', 3), ('B', 2), ('a', 1), ('d', 4)]
>>> sc.parallelize(tmp).sortByKey(True, None, keyfunc=lambda k: k.lower()).collect()
[('1', 3), ('a', 1), ('B', 2), ('d', 4)]
注:sortByKey只能操作PairRDD(由(key, value)对组成),而sortBy范围更广,操作标准RDD。查看sortBy源码可以发现,后者包含前者。
返回一个仅包含键的RDD
>>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
>>> m.collect()
[1, 3]
返回一个仅包含值的RDD
>>> m = sc.parallelize([(1, 2), (3, 4)]).values()
>>> m.collect()
[2, 4]
可以将两个RDD按照相同的Key值join起来
>>> x = sc.parallelize([('a', 1), ('b', 4)])
>>> y = sc.parallelize([('a', 2), ('a', 3)])
>>> x.join(y).collect()
[('a', (1, 2)), ('a', (1, 3))]
左外连接,与SQL中的左外连接一致
>>> x = sc.parallelize([('a', 1), ('b', 4)])
>>> y = sc.parallelize([('a', 2)])
>>> x.leftOuterJoin(y).collect()
[('a', (1, 2)), ('b', (4, None))]
右外连接,与SQL中的右外连接一致
>>> x = sc.parallelize([('a', 1), ('b', 4)])
>>> y = sc.parallelize([('a', 2)])
>>> x.rightOuterJoin(y).collect()
[('a', (1, 2))]
返回RDD中的所有元素。
>>> sc.parallelize([1, 2]).collect()
[1, 2]
返回RDD中的所有元素的个数。
>>> sc.parallelize([1, 2]).count()
2
通过指定的聚合方法来对RDD中元素进行聚合。
>>> from operator import add
>>> sc.parallelize([1, 2, 3 ,4 ,5]).reduce(add)
15
>>> sc.parallelize([]).reduce(add)
Traceback (most recent call last):
ValueError: Can not reduce() empty RDD
从RDD中返回前num个元素的列表
>>> sc.parallelize([4, 6, 8, 2, 9]).take(2)
[4, 6]
>>> sc.parallelize([4, 6, 8, 2, 9]).take(10)
[4, 6, 8, 2, 9]
从RDD中返回前num个最小的元素的列表,结果默认升序排列
>>> sc.parallelize([4,6,8,2,9]).takeOrdered(2)
[2, 4]
>>> sc.parallelize([4,6,8,2,9]).takeOrdered(10)
[2, 4, 6, 8, 9]
从RDD中返回第一个元素
>>> sc.parallelize([2,3,4,5,6]).first()
2
从RDD中返回最大的前num个元素列表,结果默认降序排列
如果Key参数有值,则先对各元素进行对应处理
注:会把所有数据都加载到内存,所以该方法只有在数据很小时使用
>>> sc.parallelize([10,4,2,12,3]).top(1)
[12]
>>> sc.parallelize([2,3,4,5,6],2).top(2)
[6, 5)
>>> sc.parallelize([10, 4, 2, 12, 3]).top(4, key=str)
[4, 3, 2, 12] # 字符串的大小比较是从左往右
遍历RDD的每个元素,并执行f函数操作,无返回值。
python API 中的foreach是相当特殊,有别于Scala API。
你若是习惯性地运行rdd.collect().foreach()
必定会报错,因为collect()返回一个list,list怎么会有foreach方法?python API 不存在RDD集合对象的操作方法。
所以,只能使用rdd.foreach(f)
,通过运行,你会发现没有打印输出:
那输出在哪儿呢?
look,输出在我的pyspark服务器端,并且还是无序、不全的输出。
我们要知道rdd.foreach(f)
是在Executor
端进行分布式操作。由于Spark的分布式特性,不能保证打印命令将所有数据发送到Driver
端的输出流中。自然也无法决定输出顺序。
另外,在Scala API 中,支持rdd.collect().foreach(println)
循环遍历操作,并且是在Driver
端输出,因此输出是完整而有序的。
对每个分区执行f函数操作,无返回值
>>> def f(iterator):
... s = sum(iterator)
... print(s)
>>> sc.parallelize([1,2,3,4,5],3).foreachPartition(f) # 1 2+3+4 5
1
9
5
将RDD中的元素以字符串的格式存储在文件系统中。
>>> rdd = sc.parallelize(['foo', 'bar'], 2)
>>> rdd.saveAsTextFile('/home/...')
>>> rdd.saveAsTextFile('hdfs://host:8020/...')
以字典形式,返回PairRDD中的键值对。如果key重复,则后面的value覆盖前面的。
>>> rdd = sc.parallelize([(1, 2), (3, 4)])
>>> rdd.collectAsMap()
{1: 2, 3: 4}
>>> rdd = sc.parallelize([(1, 2), (3, 4), (1, 4)])
{1: 4, 3: 4}
以字典形式,返回PairRDD中key值出现的次数
>>> rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
>>> rdd.countByKey()
[('a', 2), ('b', 1)]
为甚么要引入累加器?
看下面这段代码:
counter = 0
rdd = sc.parallelize(range(10))
def increment(x):
global counter
counter += x
rdd.foreach(increment)
print("Counter value: ", counter)
# Counter value: 0
在输出之前,我们按惯性思维肯定会觉得答案应该是45,但结果却是0。
实际上Spark分配的多个Executor的确执行了累加操作,但是它们并没有返回值,即最后并没有把结果返回给Driver,如下图:
欲累加1,2,3,4,Executor1执行了1+2=3,Executor2执行了3+4=7,但是并没能把3和7返回给Driver。
于是引入了累加器,它告诉Spark,最后要把Executor的结果返回给Driver,并执行Merge合并操作,更新sum值。
引入accumulator,并对上面代码做如下修改:
accumulator:一个全局共享变量, 可以完成对信息进行聚合操作。
counter = sc.accumulator(0) #初始值为0的累加器
rdd = sc.parallelize(range(10))
def increment(x):
global counter
counter += x
rdd.foreach(increment)
print("Counter value: ", counter.value)
# Counter value: 45
注意事项!!!
rdd.map(increment)
则结果为0rdd = rdd.map(increment)
rdd.collect() #计算元素总和
rdd.collect()
为什么要引入广播变量?
闭包数据,都是以Task为单位发送的,每个任务中包含闭包数据。这样可能会导致,一个Executor中含有大量重复的数据,并且占用大量的内存。
Executor其实就是一个JVM,所以在启动时,会自动分配内存。完全可以将任务中的闭包数据放置在Executor的内存中,达到共享的目的。
Spark中的广播变量就可以将闭包的数据保存到Executor的内存中。
Spark中的广播变量不能够更改:分布式共享只读变量。
概念
Spark1.x:HttpBroadcast、TorrentBroadcast
Spark2.x:TorrentBroadcast、TorrentBroadcast:点到点的传输,有效避免单点故障,提高网络利用率,减少节点压力。
Broadcast:
#eg
>>> b = sc.broadcast([1,2,3,4,5]) # b是广播变量
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
# 空
注意事项!!!
DAG(有向无环图),原始的RDD通过一系列的转换就形成了DAG,在Spark里每一个操作生成了一个RDD,RDD之间连一条边,最后这些RDD和他们之间的边组成了一个有向无环图。
对数据进行持久化,是为了避免多次计算同一个RDD。
持久化方法
persist方法 | 支持StorageLevel,可以持久化一个RDD在内存或磁盘中 |
---|---|
cache方法 | 仅缓存到内存,本质上是persist(MEMORY_ONLY)的别名 |
持久化存储等级
StorageLevel类型 | 类型描述 |
---|---|
MEMORY_ONLY(默认级别) | 将RDD以JAVA对象的形式保存到JVM内存。如果分片太大,内存缓存不下,就不缓存 |
MEMORY_ONLY_SER | 将RDD以序列化的JAVA对象形式保存到内存 |
DISK_ONLY | 将RDD持久化到硬盘 |
MEMORY_AND_DISK | 将RDD数据集以JAVA对象的形式保存到JVM内存中,如果分片太大不能保存到内存中,则保存到磁盘上,并在下次用时重新从磁盘读取。 |
MEMORY_AND_DISK_SER | 与MEMORY_ONLY_SER类似,但当分片太大,不能保存到内存中,会将其保存到磁盘中 |
XXX_2 | 上述5种level,后缀添加2代表两副本 |
OFF_HEAP | RDD实际被保存到Tachyon |
持久化实例
>>> from pyspark import StorageLevel
>>> rdd = sc.parallelize(['b', 'a', 'c'])
>>> rdd.persist()
ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:475
>>> rdd.getStorageLevel() # 取得存储等级
StorageLevel(False, True, False, False, 1)
>>> rdd.unpersist() # 解持久化
ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:475
>>> rdd.persist(StorageLevel.DISK_ONLY)
ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:475
>>> rdd.getStorageLevel()
StorageLevel(True, False, False, False, 1)
Spark RDD 结束,参考我的另一篇博客Spark RDD 编程指导进行编程练习。
-----------------------------未完待续-------------------------------
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。