当前位置:   article > 正文

python spark数据分析_Spark数据分析之pyspark

python sparkcomputation

慕课笔记

一、大数据简史,从hadoop到Spark

1.hadoop的出现:

(1)问题:1990年,电商爆发以及机器产生了大量数据,单一的系统无法承担

(2)办法:为了解决(1)的问题许多公司,尤其是大公司领导了普通硬件集群的水平扩展

(3)执行:hadoop应运而生

2.spark的出现:

(1)hadoop面临问题:

- 硬件瓶颈:多年来,内存技术突飞猛进,而硬盘技术没有太大的变化。hadoop主要

运用的是硬盘,没有利用好内存技术。

- 编程困难,hadoop的MapReduce编程不太容易,后续才出现了Pig、Hive

- 场景不适,批处理要根据不同场景进行开发

(2)spark应运而生

3.集群的强大之处:

(1)存储:切割 HDFS的Block

(2)计算:切割 【分布式并行计算】 MapReduce/Spark

(3)存储 + 计算: HDFS/S3 + MapReduce/Spark

二、Spark简介

(1)目标:企业数据处理的统一引擎

(2)特点:

- 广支持:一套系统解决多种环境

- 高速度:内存上运行Hadoop快100倍,硬盘上运行比Hadoop快10倍

- 多接口:如:Python、Java、R...

- 多应用:sparkSQL、sparkStreaming、MLlib、GraphX

(3)为啥spark解决了hadoop慢的问题呢?

- 减少网络使用:Spark设计思想是减少节点间的数据交互

- 运用内存技术:Spark是和内存进行交互,Hadoop是磁盘进行交互

(4)大数据处理的三种情况:

- 复杂的批量处理:时间长,跨度为10min~N hr

- 历史数据为基础的交互式查询:时间通常为10sec~N min

- 实时数据为基础的流式数据:时间通常为N ms~N sec

(5)Spark的对应方案:

- Spark Core:以RDD为基础提供了丰富的基础操作接口,

使得Spark可以灵活处理类似MR的批处理作业

- Spark SQL:兼容HIVE数据,提供比Hive更快的查询速度

(10~100x)的分布式SQL引擎

- Spark Streaming:将流式计算分解成一系列小的批处理作业

利用spark轻量级低时延的框架来支持流数

据处理,目前已经支持Kafka,Flume等

- GraphX:提供图形计算框架,与Pregel/GraphLab兼容

- Milb:提供基于Spark的机器学习算法库

三、Spark Core核心之RDD

(1)RDD是什么:

①RDD是一个抽象类

②RDD支持多种类型,即泛形

③RDD:弹性分布式数据集

弹性 【容错,自动进行数据的修复】

分布式 【不同节点运行】

数据集

- 不可变 (1,2,3,4,5,6)

- 可分区 (1,2,3)(4,5,6)

(2)RDD的特性:

①一个RDD由多个分区/分片组成

②对RDD进行一个函数操作,会对RDD的所有分区都执行相同函数操作

③一个RDD依赖于其他RDD,RDD1->RDD2->RDD3->RDD4->RDD5,若RDD1中某节点数据丢失,

后面的RDD会根据前面的信息进行重新计算

④对于Key-Value的RDD可以制定一个partitioner,告诉他如何分片。常用hash/range

⑤移动数据不如移动计算,注:移动数据,不如移动计算。考虑磁盘IO和网络资源传输等

(3)图解RDD:RDD图解

(4)SparkContext&SparkConf

SparkContext意义:主入口点

SparkContext作用:连接Spark集群

SparkConf作用:创建SparkContext前得使用SparkConf进行配置,以键值对形式进行

①创建SparkContext

- 连接到Spark“集群”:local、standalone、yarn、mesos

- 通过SparkContext来创建RDD、广播变量到集群

②创建SparkContext前还需要创建SparkConf对象

③SparkConf().setAppName(appname).setMaster('local')这个设置高于系统设置

④pyspark.SparkContext连接到Spark‘集群’即master(local[单机]、Standalone[标准]

、yarn(hadoop上)、mesos),创建RDD,广播变量到集群

⑤代码:

conf = SparkConf().setAppName(appname).setMaster('local')

sc = SparkContext(Conf=Conf)

(5)pyspark的使用:

①默认:pySpark在启动时就会创建一个SparkContext,别名为sc

②参数的使用:

./bin/pyspark --master local[4]

./bin/pyspark --master[4] --py-files code.py

(6)RDD的创建:

①集合转RDD

data = [1,2,3]

distData = sc.parallelize(data,3) #这行代码可以将列表转为RDD数据集

distData.collect() #这行代码可以打印输出RDD数据集#【触发一个job】

distData.reduce(lambda a,b :a+b) #【触发一个job】

注意:一个CPU可以设置2~4个partition

②外部数据集转RDD

distFile = sc.textFile("hello.txt") #将外部数据转换为RDD对象

distFile.collect()

(7)提交pyspark作业到服务器上运行

./spark-submit --master local[2] --name spark0301 /home/hadoop/scrip

t/spark0301.py

(8)PySpark实战之Spark Core核心RDD常用算子:

【两个算子】

①transfermation: map、filter【过滤】、group by、distinct

map()是将传入的函数依次作用到序列的每个元素,每个元素都是独自被函数“作用”一次

②action: count, reduce, collect

注意:(1)

1)All transformations in Spark are lazy,in that they do not

compute their results right away.

-- Spark的transformations很懒,因为他们没有马上计算出结果

2)Instead they just remember the transformations applied to some

base dataset

-- 相反,他们只记得应用于基本数据集

(2)

1)action triggers the computation

-- 动作触发计算

2) action returns values to driver or writes data to external storage

-- action将返回值数据写入外部存储

【单词记忆】

applied to:施加到

Instead:相反

in that:因为

external storage:外部存储

map()是将传入的函数依次作用到序列的每个元素,每个元素都是独自被函数“作用”一次

(1)map

map(func)

#将func函数作用到数据集每一个元素上,生成一个新的分布式【数据集】返回

(2)filter

filter(func)

返回所有func返回值为true的元素,生成一个新的分布式【数据集】返回

(3)flatMap #flat压扁以后做map

flatMap(func)

输入的item能够被map或0或多个items输出,返回值是一个【Sequence】

(4)groupByKey:把相同的key的数据分发到一起

['hello', 'spark', 'hello', 'world', 'hello', 'world']

('hello',1) ('spark',1)........

(5)reduceByKey: 把相同的key的数据分发到一起并进行相应的计算

mapRdd.reduceByKey(lambda a,b:a+b)

[1,1] 1+1

[1,1,1] 1+1=2+1=3

[1] 1

(6)左连接:以左表为基准

右连接:以右表为基准

全连接:以左右都为基准

练习1:Transformation算子编程

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':

conf = SparkConf().setMaster("local[2]").setAppName("spark0401")

sc = SparkContext(conf=conf)

'''

map:

map(func)

将func函数作用到数据集的每个元素上,生成一个新的分布式数据集返回

'''

print("***************************map***************************")

def my_map():

# 创建一个序列

data = [1,2,3,4,5]

# 将序列转换为RDD

rdd1 = sc.parallelize(data)

# 使用函数对RDD进行作用,生成RDD2

rdd2 = rdd1.map(lambda x:x*2)

# 使用collect()讲结果输出

print(rdd2.collect())

my_map()

def my_map2():

a = sc.parallelize(["dog","tiger","lion","cat","panter","eagle"])

b = a.map(lambda x:(x,1)) #进来一个x,返回一个(x,1)的形式

print(b.collect())

my_map2()

print("***************************filter***************************")

def my_filter():

#给一个数据

data = [1,2,3,4,5]

rdd1 = sc.parallelize(data)

mapRdd = rdd1.map(lambda x:x**2)

filterRdd = mapRdd.filter(lambda x:x>5)

print(filterRdd.collect())

'''

filter:

filter(func)

返回所有func返回值为true的元素,生成一个新的分布式数据集返回

'''

def my_filter():

data = [1,2,3,4,5]

rdd1 = sc.parallelize(data)

mapRdd = rdd1.map(lambda x:x*2)

filterRdd = mapRdd.filter(lambda x:x > 5)

print(filterRdd.collect())

print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())

my_filter()

print("***************************flatMap()***************************")

#Wordcount第一步:

def my_flatMap():

#flatMap,将东西压扁/拆开 后做map

data = ["hello spark","hello world","hello world"]

rdd = sc.parallelize(data)

print(rdd.flatMap(lambda line:line.split(" ")).collect())

my_flatMap()

print("***************************groupBy()***************************")

def my_groupBy():

data = ["hello spark","hello world","hello world"]

rdd = sc.parallelize(data)

mapRdd = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

groupByRdd = mapRdd.groupByKey()

print(groupByRdd.collect())

print(groupByRdd.map(lambda x:{x[0]:list(x[1])}).collect())

my_groupBy()

print("***************************reduceByKey()***************************")

#出现Wordcount结果

def my_reduceByKey():

data = ["hello spark", "hello world", "hello world"]

rdd = sc.parallelize(data)

mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))

reduceByKeyRdd = mapRdd.reduceByKey(lambda a,b:a+b)

print(reduceByKeyRdd.collect())

my_reduceByKey()

print("***************************sortByKey()***************************")

#将Wordcount结果中数字出现的次数进行降序排列

def my_sort():

data = ["hello spark", "hello world", "hello world"]

rdd = sc.parallelize(data)

mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))

reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)

#reduceByKeyRdd.sortByKey().collect() 此时是按照字典在排序

#reduceByKeyRdd.sortByKey(False).collect()

#先对对键与值互换位置,再排序,再换位置回来

reduceByKey=reduceByKeyRdd.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0])).collect()

print(reduceByKey)

my_sort()

print("***************************union()***************************")

def my_union():

a = sc.parallelize([1,2,3])

b = sc.parallelize([3,4,5])

U = a.union(b).collect()

print(U)

my_union()

print("***************************union_distinct()***************************")

def my_distinct():

#这个和数学并集一样了

a = sc.parallelize([1, 2, 3])

b = sc.parallelize([3, 4, 2])

D = a.union(b).distinct().collect()

print(D)

my_distinct()

print("***************************join()***************************")

def my_join():

a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])

b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])

J = a.fullOuterJoin(b).collect

print(J)

my_join()

sc.stop()

'''

Spark Core核心算子回顾

-- Transformation算子编程:

map、filter、groupByKey、flatMap、reduceByKey、sortByKey、join等

'''

练习2:Action算子编程

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':

conf = SparkConf().setMaster("local[2]").setAppName("spark0401")

sc = SparkContext(conf=conf)

def my_action():

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

rdd = sc.parallelize(data)

rdd.collect()

rdd.count()

rdd.take(3) #取前三个

rdd.max() #最大值

rdd.min() #最小值

rdd.sum() #求和

rdd.reduce(lambda x, y: x + y) #相邻两个相加

rdd.foreach(lambda x: print(x))

my_action()

sc.stop()

四、PySpark运行模式【一份代码可在多个模式上运行】:

(1)local模式:主要是开发和测试时使用

--master 集群

--name 应用程序名称

--py-file

例子:

./spark-submit --master local[2] --name spark-local /home/hadoop/

script/spark0402.py file:///home/hadoop/data/hello.txt file:///home/hadoop/

wc/output

注意:

local:运行在一个线程上

local[k]:运行在k个线程上

local[K,F]:运行在K线程上,和最大错误设置

local[*]:用本地尽可能多的线程运行

将上述例子中的local[2]改为其他模式名即可在对应模式上运行

(2)standalone模式

hdfs: NameNode DataNode

yarn: ResourceManager NodeManager

master:

worker:

$SPARK_HOME/conf/slaves

hadoop000

假设你有5台机器,就应该进行如下slaves的配置

hadoop000

hadoop001

hadoop002

hadoop003

hadoop005

如果是多台机器,那么每台机器都在相同的路径下部署spark

启动spark集群

$SPARK_HOME/sbin/start-all.sh

ps: 要在spark-env.sh中添加JAVA_HOME,否则会报错

检测:jps: Master和Worker进程,就说明我们的standalone模式安装成功

(3)yarn模式:

- spark作为作业客户端而已,他需要做的事情就是提交作业到yarn上去执行

- yarn vs standalone

yarn: 你只需要一个节点,然后提交作业即可

这时是不需要spark集群的(不需要启动master和worker的)

standalone:你的spark集群上每个节点都需要部署spark

然后需要启动spark集群(需要master和worker)

- 例子:

./spark-submit --master yarn --name spark-yarn /home/hadoop/

script/spark0402.py hdfs://hadoop000:8020/wc.txt hdfs://hadoop000:8020/

wc/output

- 指定hadoop_conf或者yarn_conf_dir是为了指定加载其路径下面的配置文件,spark 想要跑在yarn

上势必要知道HDFS 和 yarn 的信息,不然 spark怎么找到yarn

- yarn支持client和cluster模式:那么driver运行在哪里呢?

本地时是client【默认】:提交作业的进程是不能停止的,否则作业就挂了

集群时是cluster:提交完作业,那么提交作业端就可以断开了,因为driver是运行在

am(application master)里面的

- yarn相关的报错信息:

Error: Cluster deploy mode is not applicable to Spark shells

pyspark/spark-shell : 交互式运行程序 client

如何查看已经运行完的yarn的日志信息: yarn logs -applicationId

五、Spark核心概述

1.名词解析:

Application:基于Spark的应用程序 = 1 driver + executors

pyspark、spark-shell都是应用程序

Driver program:在py文件的主方法__main__下创建一个SparkContext

Cluster manager :从外部去获取资源,同时可以设置申请多少资源

spark-submit --master local[2]/spark://hadoop000:7077/yarn

Deploy mode:区分driver在什么地方启动

In "cluster" mode, the framework launches the driver inside of the cluster.

In "client" mode, the submitter launches the driver outside of the cluster.

Worker node:工作节点,就像manage节点

Any node that can run application code in the cluster

standalone: slave节点 slaves配置文件

yarn: nodemanager

Executor:为工作节点上的应用程序启动的进程

- runs tasks

- keeps data in memory or disk storage across them

- Each application has its own executors.

Task :一个工作单元,将被发送给一个执行者

Job :一个action对应一个job

①Spark=a driver + executors

②driver = main方法 + sparkContext

③executors是一个进程启动在worknode上,能够运行任务能够缓存数据,而且每个应用程序有一组独立的executor

④申请资源时是通过Cluster manager去申请的,可以自定义本地或集群

⑤自定义运行时,Deploy mode可以跑在cluster上也可以跑在client上

⑥Executor运行在worknode上,task运行在Executor上,task(map、flatMap等属于task)从driver上发起

⑦Job是一个并行的计算,由多个①Spark=a driver + executors

②driver = main方法 + sparkContext

③executors是一个进程启动在worknode上,能够运行任务能够缓存数据,而且每个应用程序有一组独立的executor

④申请资源时是通过Cluster manager去申请的,可以自定义本地或集群

⑤自定义运行时,Deploy mode可以跑在cluster上也可以跑在client上组成,spark中一个action(save,collect)对应一个job

⑥Stage:一个job会被拆分为一个小的任务集,一个stage的边界往往是从某个地方取数据开始,到shuffle结束

六、Spark SQL

(1)Spark SQL前世今生:

SQL:MySQL、Oracle、DB2、SQLService

- 我们很熟悉的数据处理语言是SQL

- 但是数据量越来越大 ==> 大数据(Hive、Spark Core)

-- hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,

并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。

-- Spark Core:得熟悉Java、Python等语言

- 综上:能通过SQL语言处理大数据问题是人们最喜欢的啦

出现了:SQL on Hadoop

Hive on Map Reduce

Shark【没有了。。。】

Impala:比较吃内存,Cloudera

Presto:京东再用

发展:

Hive on Map Reduce

Shark on Spark

Spark SQL:on Spark:Spark社区的

共同点:metastore mysql,基于源数据建表

Hive on Spark:Hive社区的不同于Spark SQL,在Hive能运行的SparkSQL不一定可以

(2)官方描述:Spark SQL是Apache Spark的一个模块,是用来处理结构化数据的

①编程和SQL可以无缝对接:

支持SQL和DATa Frame API(Java、Scala、Python、R)

代码示例:results = spark.sql("SELCET * FROM people")

names = results.map(lambda p:p.name)

②统一的数据访问:可以直接将Hive、ORC、JSON、JDBC结果做连接

spark.read.json("s3n://...").registerTempTable("json")

results = spark.sql(

"""SELECT *

FROM people

JOIN hson ...""")

查询和连接不同数据源【Spark SQL不仅仅是SQL】

③Spark SQL 可以使用已经存在的Hive仓库matastores,UDFs等

④提供了标准的JDBC、ODBC接口,外部工具可以直接访问Spark

结:Spark SQL 强调的是“结构化数据”而非“SQL”

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/918891
推荐阅读
相关标签
  

闽ICP备14008679号