当前位置:   article > 正文

RDD编程指南_三,spark 3.2.1安装哪个版本的python

三,spark 3.2.1安装哪个版本的python

概述

在高层次上,每个 Spark 应用程序都包含一个驱动程序driver program,该驱动程序运行用户main方法,在集群上并行操作来执行任务Spark 提供的主要抽象模型是弹性分布式数据集(RDD),它是跨集群节点分区的元素集合,可以并行操作。

Spark 中的第二个抽象是可以在并行操作中使用的共享变量。Spark 支持两种类型的共享变量:广播变量,可用于在所有节点的内存中缓存值,以及累加器,它们是仅“添加”到的变量,例如计数器和总和counters and sums.

与 Spark 链接

Spark 3.2.1 适用于 Python 3.6+。它可以使用标准的 CPython 解释器,因此可以使用 NumPy 等 C 库。Spark 3.1.0 中删除了对 Python 2、3.4 和 3.5 的支持。Spark 3.2.0 中已弃用 Python 3.6 支持。

如果没有用pip安装PySpark,在安装的Spark路径下用bin/spark-submit 运行脚本。该脚本将加载 Spark 的 Java/Scala 库并允许您将应用程序提交到集群。还可以使用它bin/pyspark来启动交互式 Python shell。

driver and workers需要相同的python版本,spark使用PATH中默认的python版本,可以通过 PYSPARK_PYTHON指定spark的python的版本。

$ PYSPARK_PYTHON=python3.8 bin/pyspark

 初始化 Spark

Spark 程序必须做的第一件事是创建一个SparkContext对象,它告诉 Spark 如何访问集群。要创建一个SparkContext,您首先需要构建一个包含有关您的应用程序信息的SparkConf对象。

  1. conf = SparkConf().setAppName(appName).setMaster(master)
  2. sc = SparkContext(conf=conf)

使用Shell

用bin/pyspark在四个 core上运行

$ ./bin/pyspark --master local[4]
也可以用IPython启动PySpark,IPython版本1.0.0及以上. 设置环境变量。
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

弹性分布式数据集 (RDD)

Spark 围绕弹性分布式数据集(RDD) 的概念展开,RDD 是可以并行操作的元素的容错集合,因为RDD 会自动从节点故障中恢复。

有两种方法可以创建 RDD:

  • 并行集合:并行化存在于驱动程序driver program中的集合
  • 外部数据集:外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。

并行集合(Parallelized Collections

在你驱动程序driver program中已经存在的iterable or collection,可以通过SparkContext的parallelize方法来创建并行集合。例如

  1. data = [1, 2, 3, 4, 5]
  2. distData = sc.parallelize(data)

创建后,分布式数据集 ( distData) 可以并行操作。例如,我们可以调用distData.reduce(lambda a, b: a + b)将列表的元素相加,结果:

  1. >>> data = [1, 2, 3, 4, 5]
  2. >>> distData = sc.parallelize(data)
  3. >>> res = distData.reduce(lambda a, b: a + b)
  4. >>> res
  5. 15

 并行集合的一个重要参数是将数据集切割成的分区数量partitions Spark 将为集群的每个分区partitions运行一个任务。通常,您希望集群中的每个 CPU 有 2-4 个分区。通常,Spark 会尝试根据您的集群自动设置分区数partitions 。但是,您也可以通过将其作为第二个参数传递给parallelize(例如sc.parallelize(data, 10))来手动设置它。注意:代码中的某些地方使用术语切片slices (partitions的同义词)来保持向后兼容性。

外部数据集

PySpark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark 支持文本文件、SequenceFiles和任何其他 Hadoop InputFormat

可以使用SparkContext'textFile方法创建文本文件 RDD。此方法获取文件的 URI(机器上的本地路径,或hdfs://,s3a://等 URI)并将其作为行集合读取。这是一个示例调用:

>>> distFile = sc.textFile("data.txt")

创建后,distFile可以通过数据集操作对其进行操作。例如,我们可以使用mapandreduce操作将所有行的大小相加,如下所示distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

使用 Spark 读取文件的一些注意事项:

  • 如果使用本地文件系统上的路径,则该文件也必须可在工作节点worker nodes上的同一路径上访问。

  • Spark 的所有基于文件的输入方法,包括textFile,也支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • textFile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark 为文件的每个块 block创建一个分区partition(在 HDFS 中,块blocks默认为 128MB),但您也可以通过传递更大的值来请求更大数量的分区。请注意,您的分区partition不能少于块blocks。理解:blocks是hdfs中的存储概念,partition是spark中的计算概念,partition数量≥block数量。

RDD 操作

Rdd支持两种操作

  • transformations:从现有数据集创建新数据集
  • actions:在对数据集运行计算后将值返回给驱动程序driver program

Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。仅当操作需要将结果返回给驱动程序时才计算转换。这种设计使 Spark 能够更高效地运行。

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR) and pair RDD functions doc (ScalaJava) for details.

TransformationMeaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacementfractionseed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numPartitions]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numPartitions])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.
reduceByKey(func, [numPartitions])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOpcombOp, [numPartitions])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numPartitions])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numPartitions])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoinrightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command[envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR) and pair RDD functions doc (ScalaJava) for details.

ActionMeaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacementnum, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n[ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

Shuffle operations 

Spark 中的某些操作会触发称为 shuffle 的事件。shuffle 是 Spark 用于重新分配数据的机制,以便跨分区对数据进行不同的分组。这通常涉及跨执行器和机器复制数据,使洗牌成为一项复杂且成本高昂的操作。

Operations which can cause a shuffle include repartition operations like repartition and coalesce‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Shuffle是一项代价高的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。为了组织 shuffle 的数据,Spark 生成了一组任务task——映射map任务task来组织数据,一组reduce任务来聚合它

RDD 持久性

持久化到磁盘或者内存,每个节点都会存储该节点计算的数据。

共享变量Shared Variables

通常,当传递给 Spark 操作的函数(例如mapreduce)在远程集群节点上执行时,它会处理函数中使用的所有变量的单独副本。这些变量被复制到每台机器上,并且对远程机器上的变量的更新不会传播回驱动程序。支持跨任务的通用读写共享变量效率低下。然而,Spark 确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。 

广播变量Broadcast Variables

Spark 自动广播每个阶段内任务所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。

  1. >>> broadcastVar = sc.broadcast([1, 2, 3])
  2. <pyspark.broadcast.Broadcast object at 0x102789f10>
  3. >>> broadcastVar.value
  4. [1, 2, 3]
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/532850
推荐阅读
相关标签
  

闽ICP备14008679号