赞
踩
Apache Spark是一个快速的通用集群计算框架 / 殷勤。它提供Java,Scala,Python和R中的高级API,以及支持常规执行图的优化引擎。它还支持一组丰富的更高级别的工具,包括Spark SQL用于SQL和结构化数据的处理,MLlib机器学习,GraphX用于图形处理和Spark Streaming. 。
作为Apache的顶级项目之一, 它的官网为 http://spark.apache.org
快速
运行工作负载的速度提高了100倍。
Apache Spark使用最先进的DAG调度程序,查询优化器和物理执行引擎,为批处理数据和流数据提供了高性能。
易用
使用Java,Scala,Python,R和SQL快速编写应用程序。
Spark提供了80多个算子,可轻松构建并行应用程序。我们可以 从Scala,Python,R和SQL Shell 交互使用它。
通用
结合使用SQL,流和复杂的分析。
Spark拥有一系列强大的库,包括 SQL和DataFrames,MLlib机器学习, GraphX和Spark Streaming。我们可以在同一应用程序中无缝组合这些库。
到处运行
Spark可在Hadoop,Apache Mesos,Kubernetes,Standalone或云服务器中运行。它可以访问各种数据源。
我们可以在EC2,Hadoop YARN,Mesos或Kubernetes上使用其独立集群模式运行Spark 。访问HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive和数百种其他数据源中的数据。
The Berkeley Data Analytics Stack
回顾hadoop
要基于Yarn来进行资源调度,必须实现AppalicationMaster接口,Spark实现了这个接口,所以可以基于Yarn。
面试如果问Spark的RDD
我们可以介绍RDD的五大特性以及相关注意实现
可以将下图中的每一个猴看成是一个RDD
RDD Lineage依靠他们之间的依赖关系形成了一个有向无环图DAG
但在复杂的逻辑中, 可能是多条lineage组成一个DAG
Driver: 发送task, 资源回收. 如果task的计算结果非常大就不要回收了。会造成oom。
Worker: 资源管理从节点
Master: 资源管理主节点, 管理多个Worker
注意:
Spark的Driver如果回收多个Worker可能会出现OOM问题
OOM问题 ; out of memery 内存溢出
这些角色都是以JVM进程形式存在
注意:
分布式文件系统(File system)–加载RDD
transformations延迟执行–针对RDD的操作 , 是某一类算子(函数)
Action触发执行 , Action也是一类算子(函数)
Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。
Transformation类算子:
filter
过滤符合条件的记录数,true保留,false过滤掉。
map
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
特点:输入一条,输出一条数据。
flatMap
先map后flat。与map类似,每个输入项可以映射为0到多个输出项( 1对多 )。
sample
随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
reduceByKey
将相同的Key根据相应的逻辑进行处理。
sortByKey/sortBy
作用在K,V格式的RDD上,对key进行升序或者降序排序。可选参数: false:降序, true:默认,升序
join,leftOuterJoin,rightOuterJoin,fullOuterJoin
作用在K,V格式的RDD上。根据K进行连接,对(K,V)join(K,W)返回(K,(V,W))
join后的分区数与父RDD中分区数多的那一个相同。
union
合并两个数据集。两个数据集的类型要一致。
返回新的RDD的分区数是合并RDD分区数的总和。
intersection
取两个数据集的交集,返回新的RDD与父RDD分区多的一致
subtract
取两个数据集的差集,结果RDD的分区数与subtract前面的RDD的分区数一致。
mapPartitions
与map类似,遍历的单位是每个partition上的数据。
distinct(map+reduceByKey+map)
去重
cogroup
当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>))
,子RDD的分区与父RDD多的一致。
mapPartitionWithIndex
类似于mapPartitions,除此之外还会携带分区的索引值。
repartition
增加或减少分区。会产生shuffle。(多个分区分到一个分区不会产生shuffle)
coalesce
coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
true为产生shuffle,false不产生shuffle。默认是false。
如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true)
groupByKey
作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>
)。
zip
将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同。
zipWithIndex
该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。
Action类算子
count
返回数据集中的元素数。会在结果计算完成后回收到Driver端。
take(n)
返回一个包含数据集前n个元素的集合。
first
first=take(1),返回数据集中的第一个元素。
foreach
循环遍历数据集中的每个元素,运行相应的逻辑。
collect
将计算结果回收到Driver端。
foreachPartition
遍历的数据是每个partition的数据。
countByKey
作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。
countByValue
根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
reduce
根据聚合逻辑聚合数据集中的每个元素。
控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
cache
默认将RDD的数据持久化到内存中。cache是懒执行。
注意:chche () = persist()=persist(StorageLevel.Memory_Only)
测试cache文件:
测试代码:
/** * 验证控制算子cache :默认将RDD的数据持久化到内存中。cache是懒执行。 * Author TimePause * Create 2019-12-13 19:55 */ object CacheTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("cache") val sc = new SparkContext(conf) val lines = sc.textFile("./data/persistData.txt") // val lines1: lines.type = lines.cache() val lines1 = lines.persist(StorageLevel.MEMORY_ONLY) val t1 = System.currentTimeMillis() val l: Long = lines1.count() val t2 = System.currentTimeMillis() println("第一次count:"+l+s",time=${t2-t1}") //用时5833ms val t3 = System.currentTimeMillis() val l1 = lines1.count() val t4 = System.currentTimeMillis() println("第二次count:"+l1+s",time=${t4-t3}") //用时183ms, 由此可见基于内存速度要快很多 } }
persist:
可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。
持久化级别如下:
cache() : 默认将数据存在内存中
且 cache() = persist() = persist(StorageLevel.MEMORY_ONLY)
persist(): 可以手动指定持久化的级别, 我们经常使用的persist级别(推荐程度由高到低)
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
MEMORY_AND_DISK
尽量避免使用“_2” 和 “DISK_ONLY” 级别
cache和persist的注意事项:
checkpoint 的执行原理:
/** * 验证控制算子checkPoint * Author TimePause * Create 2019-12-13 19:55 */ object CheckPointTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("cache") val sc = new SparkContext(conf) sc.setCheckpointDir("./ck/word.txt") val lines = sc.textFile("./data/word.txt") lines.checkpoint() lines.count() } }
运行结果
1).Spark官网下载安装包,解压
2).进入安装包的conf目录下,复制slaves.template文件,复制后名称为slaves
添加从节点(worker)名称。保存。
3).复制spark-env.sh.template文件,复制后名称为spark-env.sh
修改spark-env.sh
JAVA_HOME jdk所在目录
SPARK_MASTER_IP:master的ip
SPARK_MASTER_PORT:提交任务的端口,默认是7077
SPARK_WORKER_CORES:每个worker从节点能够支配的core的个数
SPARK_WORKER_MEMORY:每个worker从节点能够支配的内存数
4).同步到其他节点上
5).启动集群
进入sbin目录下,执行当前目录下的./start-all.sh
6)访问集群的图形化界面, 默认为8080
这是使用的jar是Spark自带的一个jar, 用于计算圆周率, 无需自己手动编写, 执行运行即可, 在此用于测试Spark能否正常提交任务
通过bin,目录下的 spark-submit来提交(在那一个节点都可以,命令都如下,不会改变)
# ./spark-submit --master spark提交任务的ip和端口 提交的jar的全限定路径 提交的jar的名称 运行jar/任务的task数(图1)
./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
图1
因此如果我们使用方法二, 会在任务提交时一直占用当前shell以及网卡资源,为了消除这个影响我们选择方法二
将spark安装包原封不动的拷贝到一个新的节点上,然后,在新的节点上提交任务即可。
在bin目录下, 命令依旧如下
# ./spark-submit --master spark提交任务的ip和端口 提交的jar的全限定路径 提交的jar的名称 运行jar/任务的task数(图1)
./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
这样占用的就不是Spark集群节点的网卡和shell, 而是无关的节点的相关资源了
基于Standalone每次提交任务时,都会在Worker节点Spark安装目录的/work目录下生成一个命名为app-xxx-xxx的目录,这个目录下存放程序运行时所需的依赖的jar包。每次提交任务都会在这个work目录下生成一个application目录且不会自动清理。如果时间长了就有可能占用大量的磁盘空间。
清理:可以在worker节点的Spark-env.sh中配置如下参数,定期清理work目录。
export SPARK_WORKER_OPTS="
-Dspark.worker.cleanup.enabled=true #是否开启自动清理
-Dspark.worker.cleanup.interval=1800 #每隔多长时间清理一次,单位s
-Dspark.worker.cleanup.appDataTtl=604800" # 保留最近多长时间的数据,单位s
以上参数中:
spark.worker.cleanupenabled=true 只有运行完成的application才会被清理。
spark.worker.cleanup.interval 清理周期,单位s,默认值为30分钟。
spark.worker.clearnup.appDataTtl 保存多长时间的数据,单位s,默认是一周
可以将依赖的jar包全部打入一个jar包中,直接提交任务,jar包相对大。这个jar包会被复制到每个work节点的work目录app-xx-xx目录下。
./spark-submit
--master spark://node1:7077
--class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze
/root/test/TrafficProject-1.0-SNAPSHOT-jar-with-dependencies.jar
可以将依赖的jar包在提交任务时使用–jars 来指定,这种方式每次都会给每个work节点的worker目录中application-xx-xx路径复制一份依赖的jar包(提交的任务jar包也会被复制到这个路径下)。提交任务时,使用不含有依赖的jar包。
./spark-submit
--master spark://node1:7077
--jars /root/test/mysql-connector-java-5.1.47.jar,/root/test/fastjson-1.2.11.jar
--class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze
/root/test/TrafficProject-1.0-SNAPSHOT.jar 4
提交任务时使用不含有依赖的jar包,将依赖的jar包放入每台worker节点的spark安装目录下jars目录下(如果客户端时单独一台,客户端这个目录下也要放jar包),这种模式不会将依赖的jar包复制到worker节点worke下application-xxx-xxx目录,class所在的提交jar包会在这个目录下:
./spark-submit
--master spark://node1:7077
--class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze
/root/test/TrafficProject-1.0-SNAPSHOT.jar
提交任务时将所有依赖包打入一个jar包,使用含有依赖的jar包,这个jar包会被复制到每台worker节点的worke目录app-xx-xx下。
./spark-submit
--master spark://node1:7077
--deploy-mode cluster
--class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze
hdfs://node1:9000/spark/TrafficProject-1.0-SNAPSHOT-jar-with-dependencies.jar
提交任务时使用不含有依赖包的jar包,使用- -jars 来指定依赖的jar包。由于Driver会在worker节点启动,所以每台worker节点上都要含有所有- - jars指定的路径和包。-- jars 也可以指定hdfs中的路径,这样就不需要每台worker节点要含有 - -jars的路径和包,但是依赖的hdfs中的jar包会被复制到每台worker节点的work目录app-xx-xx下。
./spark-submit
--master spark://node1:7077
--deploy-mode cluster
--jars hdfs://node1:9000/spark/fastjson-1.2.11.jar,hdfs://node1:9000/spark/mysql-connector-java-5.1.47.jar
--class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze
hdfs://node1:9000/spark/TrafficProject-1.0-SNAPSHOT.jar
提交任务时使用不含有依赖包的jar包,将所有依赖的jar包在每台worker节点的spark安装目录下的jas目录中,class所在的jar包会被复制到每台worker节点的work目录app-xx-xx下,依赖的jar包不会被放在这个路径下。
./spark-submit
--master spark://node1:7077
--deploy-mode cluster
--class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze
hdfs://node1:9000/spark/TrafficProject-1.0-SNAPSHOT.jar
无论spark基于Yarn提交任务的client模式还是cluster模式,都会在HDFS中/user/${username}/.sparkStaging目录下产生一个application的目录,这个目录存放class所在的jar包以及- -jars 指定的依赖的jar包,客户端提交任务后,spark任务节点的spark安装目录jars目录下的所有jar包以__spark_libs__5619457283046496725.zip的形式上传到这个路径下。
如果spark_home/jars目录下的jar包很多,会上传很久,导致任务执行很慢,可以通过在客户端spark_home/conf/spark-defaults.conf中配置spark.yarn.jars hdfs://node1:9000/sparkjars/*
(sparkjars 目录需要在hdfs中创建,要配置访问权限755),将spark_home/jars下的所有jar包都上传到hdfs中sparkjars目录下,这样每次提交任务时,就不会从客户端的spark_home/jars下上传所有jar包,只是从hdfs中sparkjars下读取,速度会很快,省略了上传的过程。
每运行一个application都会在sparkStaging路径下产生一个目录,这个路径默认是没有读取权限的,如果读取查看,可以使用命令:hdfs dfs -chmod -R 755 /user/root/.sparkStaging 来更改权限查看。当application运行完成,这个路径会自动删除,可以通过参数spark.yarn.preserve.staging.files false来配置,这个参数配置在Spark提交任务节点的spark安装目录下的conf/spark-defaults.conf中,默认为false,每次运行完成任务之后会自动清除,如果改成true,每次运行完成任务之后不会清除目录。
提交任务有依赖jar包时,有以下三种方式选择:
提交命令
真实提交时必须将这个命令修改成一行然后运行
## 不指定参数 ./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100 ./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100 ## 添加一行参数--deploy-mode client ./spark-submit --master spark://node1:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100 ./spark-submit --master spark://node1:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
执行原理图解
执行流程
在client节点启动Driver进程后,Driver向Master申请资源,向woker发送task,并接受worker的执行结果
总结
client模式适用于测试调试程序 。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。
在Driver端可以看到task执行的情况。生产环境下不能使用client模式,是因为:假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端100次网卡流量暴增的问题。
提交命令
真实提交时必须将这个命令修改成一行然后运行
./spark-submit
--master spark://node1:7077
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar 100
./spark-submit --master spark://node1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
执行原理图解
执行流程
在worker节点启动Driver进程后,Driver向Master申请资源,向woker发送task,并接受worker的执行结果
总结
cluster模式适合在生产模式(项目上线环境)使用, Driver进程是在集群某一台Worker上启动的,在客户端是无法查看task的执行情况(包括执行结果!!!)。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散布在集群上。
图1
图2
图3
需要有dhfs集群和yarn框架的支持, 但是无需启动 spark Standalone集群
使用前的步骤
启动Zookeeper集群 zKServer.sh start
启动hdfs集群 start-dfs.sh
启动yarn框架start-yarn.sh
启动resourccemanager yarn-daemon.sh start resourcemanager
修改用于提交任务Spark客户端的配置文件 spark-env.sh, 添加自己hadoop配置文件所在目录
注意: 下面三种方式效果相同, 但是在输入要整合成一行命令输入
格式: ./spark-submit --master yarn --class 类所在全限定路径 jar所在位置 task数量 举例: ./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100 # --master yarn–client ./spark-submit --master yarn–client --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100 # --deploy-mode client ./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
结果输出在当前命令行
执行原理图解
执行流程
RS接收Driver发送的资源请求, 在NM上启动AM, 接收AM启动成功后的资源请求, 分配给NM给AM, 启动Executor.
Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。
总结
Yarn-client模式同样是适用于测试 ,因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加.
ApplicationMaster的作用:
提交命令
格式
./spark-submit (--master yarn-cluster 或者--deploy-mode cluster ) --class 类的全限定路径 jar所在位置 task数目
举例
方式一
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 1000
方式二
./spark-submit --master yarn --deploy-mode cluster ../examples/jars/spark-examples_2.11-2.3.1.jar 1000
停止集群任务命令:
yarn application -kill applicationID
执行原理图解
执行流程
RM接收客户端请求, 在NM上启动AM(相当于Driver), 接收AM请求, 返回AM一批NM节点
AM连接NM发送请求启动Executor, 接收Executor的反向注册, 最后发送任务到Executor
总结
Yarn-Cluster主要用于生产环境中, 因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。
ApplicationMaster的作用:
访问自己 ResourceManager所在节点的 8088端口, eg: http://node3:8088
任务正在执行时效果图
任务结束点击该id
查看该任务的 logs 日志文件
选择标准输出日志
结果就在日志内容中
简单关系图
RDD之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。
宽窄依赖图理解
Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。
stage是由一组并行的task组成。
切割规则:从后往前,遇到宽依赖就切割stage
pipeline管道计算模式,pipeline只是一种计算思想,模式。
粗粒度资源申请(Spark)
在Application执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task执行完成后,才会释放这部分资源。
优点:在Application执行之前,所有的资源都申请完毕,每一个task直接使用资源就可以了,不需要task在执行前自己去申请资源,task启动就快了,task执行快了,stage执行就快了,job就快了,application执行就快了。
缺点:直到最后一个task执行完成才会释放资源,集群的资源无法充分利用。
细粒度资源申请(MapReduce)
Application执行之前不需要先去申请资源,而是直接执行,让job中的每一个task在执行前自己去申请资源,task执行完成就释放资源。
优点:集群的资源可以充分利用。
缺点:task自己去申请资源,task启动变慢,Application的运行就相应的变慢了。
Spark standalone with cluster deploy mode only:
Spark standalone and Mesos only:
Spark standalone and YARN only:
YARN-only:
资源请求简单图
资源调度Master路径:
路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/Master.scala
提交应用程序,submit的路径:
路径:spark-1.6.0/core/src/main/scala/org.apache.spark/ deploy/SparkSubmit.scala
总结:
1. Executor在集群中分散启动,有利于task计算的数据本地化。
2. 默认情况下(提交任务的时候没有设置–executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
3. 如果想在Worker上启动多个Executor,提交Application的时候要加–executor-cores这个选项。
4. 默认情况下没有设置–total-executor-cores,一个Application会使用Spark集群中所有的cores。
总结如下图:
Action算子开始分析
任务调度可以从一个Action类算子开始。因为Action类算子会触发一个job的执行。
划分stage,以taskSet形式提交任务DAGScheduler 类中getMessingParentStages()方法是切割job划分stage。可以结合以下这张图来分析:
java代码
public class wc3 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("wc3"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("./data/word.txt");// {hello,world} JavaRDD<String> stringJavaRDD = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1);//(hello,1),(world,1) } }); JavaPairRDD<String, Integer> stringIntegerJavaPairRDD1 = stringIntegerJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2;//(hello,27),(world,1) } }); JavaPairRDD<Integer, String> integerStringJavaPairRDD = stringIntegerJavaPairRDD1.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2.swap();//(27,hello),(1,world) } }); JavaPairRDD<Integer, String> integerStringJavaPairRDD1 = integerStringJavaPairRDD.sortByKey(); JavaPairRDD<String, Integer> result = integerStringJavaPairRDD1.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception { return integerStringTuple2.swap();//正序(world,1),(hello,27) } }); result.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { System.out.println(stringIntegerTuple2); } }); System.out.println("qqq"); sc.stop(); } }
Scala代码
object Wc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("wc1")
val sc = new SparkContext(conf)
sc.textFile("./data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).foreach(println)
sc.stop()
}
}
数据格式
运行结果
Scala代码
/** * 网站访问量统计, 多多练习和理解 * * Author TimePause * Create 2019-12-16 21:09 */ object PvUv { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("pvuv") val sc = new SparkContext(conf) val lines = sc.textFile("./data/pvuvdata") //pv页面访问量: 用户访问某个网站次数,无需去重 //这里没有flatmap, 直接通过map返回了split分割后形成的数组的第五个元素n,组成(n,1)的元组,然后进行wc操作 val result = lines.map(line => {(line.split("\t")(5), 1)}).reduceByKey(_ + _).sortBy(_._2,false) .foreach(tp=>{println(s"网站 ${tp._1} 的页面访问量为: ${tp._2}")}) //uv 独立访问用户数,一般为1天统计一次, 需要去重 //别老是忘记去去重过后的元素line2.split("_")(1) lines.map(line=>{(line.split("\t")(0)+"_"+line.split("\t")(5))}).distinct() .map(line2=>{(line2.split("_")(1),1)}).reduceByKey(_+_).sortBy(_._2,false) .foreach(tp=>{println(s"网站 ${tp._1} 的独立访问用户数为: ${tp._2}")}) } }
数据格式
运行结果
java代码
SparkConf sparkConf = new SparkConf() .setMaster("local") .setAppName("SecondarySortTest"); final JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> secondRDD = sc.textFile("secondSort.txt"); JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<SecondSortKey, String> call(String line) throws Exception { String[] splited = line.split(" "); int first = Integer.valueOf(splited[0]); int second = Integer.valueOf(splited[1]); SecondSortKey secondSortKey = new SecondSortKey(first,second); return new Tuple2<SecondSortKey, String>(secondSortKey,line); } }); pairSecondRDD.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey,String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Tuple2<SecondSortKey, String> tuple) throws Exception { System.out.println(tuple._2); } });
public class SecondSortKey implements Serializable,Comparable<SecondSortKey>{ /** * */ private static final long serialVersionUID = 1L; private int first; private int second; public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } public SecondSortKey(int first, int second) { super(); this.first = first; this.second = second; } @Override public int compareTo(SecondSortKey o1) { if(getFirst() - o1.getFirst() ==0 ){ return getSecond() - o1.getSecond(); }else{ return getFirst() - o1.getFirst(); } } }
Scala代码
/** * 样例类: 实现二次排序的逻辑实现 * @param first * @param second */ case class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] { def compare(that: SecondSortKey): Int = { if(this.first-that.first==0) this.second- that.second else this.first-that.first } } /** * 二次排序问题 */ object SecondSort { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("secondarySort") conf.setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("./data/secondSort.txt") val transRDD: RDD[(SecondSortKey,String)] = lines.map(s=>{(SecondSortKey(s.split(" ")(0).toInt,s.split(" ")(1).toInt),s)}) transRDD.sortByKey(false).map(_._2).foreach(println) } }
数据样式
运行结果
java代码
SparkConf conf = new SparkConf() .setMaster("local") .setAppName("TopOps"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> linesRDD = sc.textFile("scores.txt"); JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String str) throws Exception { String[] splited = str.split("\t"); String clazzName = splited[0]; Integer score = Integer.valueOf(splited[1]); return new Tuple2<String, Integer> (clazzName,score); } }); pairRDD.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception { String clazzName = tuple._1; Iterator<Integer> iterator = tuple._2.iterator(); Integer[] top3 = new Integer[3]; while (iterator.hasNext()) { Integer score = iterator.next(); for (int i = 0; i < top3.length; i++) { if(top3[i] == null){ top3[i] = score; break; }else if(score > top3[i]){ for (int j = 2; j > i; j--) { top3[j] = top3[j-1]; } top3[i] = score; break; } } } System.out.println("class Name:"+clazzName); for(Integer sscore : top3){ System.out.println(sscore); } } });
Scala代码
/** * 分组取topN * * Author TimePause * Create 2019-12-18 15:50 */ object TopNTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("topn") val sc = new SparkContext(conf) val lines = sc.textFile("./data/scores.txt") //class1 100=>(class1,int(100)) val pairInfo = lines.map(one => { (one.split(" ")(0), one.split(" ")(1).toInt) }) val value = pairInfo.groupByKey().map(tp => { val classname = tp._1 val iter = tp._2.iterator val top3Score = new Array[Int](3) val loop = new Breaks while (iter.hasNext) { val currScore = iter.next() loop.breakable { for (i <- 0.until(top3Score.size)) { if (top3Score(i) == 0) { top3Score(i) = currScore loop.break() } else if (currScore > top3Score(i)) { // 2到i, 步长为-1 for (j <- 2.until(i, -1)) { top3Score(j) = top3Score(j - 1) } top3Score(i) = currScore loop.break() } } } } (classname,top3Score.toBuffer) }).collect() value.foreach(println) } }
原理图
测试代码
object BroadCastTest { def main(args:Array[String]): Unit ={ val conf = new SparkConf().setMaster("local").setAppName("test") val sc = new SparkContext(conf) val list: List[String] = List[String]("hello timepause") // 将该数组定义成广播变量 val bcValue: Broadcast[List[String]] = sc.broadcast(list) val lines = sc.textFile("./data/word.txt") lines.filter(one=>{ val value: List[String] = bcValue.value //获取广播变量数组的值 // list.contains(one) value.contains(one) }).foreach(println) } }
注意事项
能不能将一个RDD使用广播变量广播出去?
不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
广播变量只能在Driver端定义,不能在Executor端定义。
在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
原理图
演示代码
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0)
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
println(accumulator.value)
sc.stop()
注意事项
累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新。
概念:
SparkShell是Spark自带的一个快速原型开发工具,也可以说是Spark的scala REPL(Read-Eval-Print-Loop),即交互式shell。支持使用scala语言来进行Spark的交互式编程。
使用:
启动Standalone集群,./start-all.sh ( sbin )
在客户端bin目录下启动 spark-shell:
./spark-shell --master spark://node1:7077
启动hdfs,创建目录spark/test,上传文件wc.txt
启动hdfs集群:
start-all.sh
创建目录:
hdfs dfs -mkdir -p /spark/test
上传wc.txt
hdfs dfs -put /root/test/wc.txt /spark/test/
word,txt部分内容
# 如果直接使用foreach进行输出, 结果会在执行的日志中显示,需要通过图形化界面查看 scala> sc.textFile("hdfs://node2:8020/spark/data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).foreach(println) # 如果通过其他的 Action算子触发执行将会显示结果, 如下 scala> sc.textFile("hdfs://node2:8020/spark/data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect() res2: Array[(String, Int)] = Array((hello,110), (myself,36), (world,36), (timepause,14), (ah,12), (sz,8), (worldd,4)) # 将hdfs文件赋给一个rdd变量 scala> var rdd=sc.textFile("hdfs://node2:8020/spark/data/word.txt") rdd: org.apache.spark.rdd.RDD[String] = hdfs://node2:8020/spark/data/word.txt MapPartitionsRDD[11] at textFile at <console>:24 # 可以这样进行导包 scala> import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel # 设置为只使用内存并序列化 scala> rdd.persist(StorageLevel.MEMORY_ONLY_SER) res1: org.apache.spark.rdd.RDD[String] = hdfs://node2:8020/spark/data/word.txt MapPartitionsRDD[11] at textFile at <console>:24
SparkUI界面介绍
可以指定提交Application的名称
./spark-shell --master spark://node1:7077 --name myapp
配置historyServer
临时配置,对本次提交的应用程序起作用
./spark-shell --master spark://node1:7077
--name myapp1
--conf spark.eventLog.enabled=true
--conf spark.eventLog.dir=hdfs://node1:9000/spark/test
停止程序,在Web Ui中Completed Applications对应的ApplicationID中能查看history。
永久配置, spark-default.conf配置文件中配置HistoryServer
对所有提交的Application都起作用, 在客户端节点,进入…/spark-2.3.1/conf/ spark-defaults.conf最后加入:
##开启记录事件日志的功能
spark.eventLog.enabled true
##设置事件日志存储的目录
spark.eventLog.dir hdfs://node1:9000/spark/test
##设置HistoryServer加载事件日志的位置
spark.history.fs.logDirectory hdfs://node1:9000/spark/test
##日志优化选项,压缩日志
spark.eventLog.compress true
或者像本人这样配置
spark.eventLog.enabled true
spark.eventLog.dir hdfs://mycluster/spark/log
spark.history.fs.logDirectory hdfs://mycluster/spark/log
spark.eventLog.compress true
mycluster为我的Hadoop集群名称. 如何查找自己Hadoop集群名称?
位于自己 /hadoop/etc/hadoop/hdfs-site.xml 文件下
启动HistoryServer:
./start-history-server.sh
访问HistoryServer:node4:18080,之后所有提交的应用程序运行状况都会被记录。
Standalone集群只有一个Master,如果Master挂了就无法提交应用程序,需要给Master进行高可用配置,Master的高可用可以使用fileSystem(文件系统)和zookeeper(分布式协调服务)。
fileSystem只有存储功能,可以存储Master的元数据信息,用fileSystem搭建的Master高可用,在Master失败时,需要我们手动启动另外的备用Master,这种方式不推荐使用。
zookeeper有选举和存储功能,可以存储Master的元素据信息,使用zookeeper搭建的Master高可用,当Master挂掉时,备用的Master会自动切换,推荐使用这种方式搭建Master的HA。
原理图
搭建步骤
在Spark Master节点上配置主Master,配置spark-env.sh
指定Zookeeper集群ip+port以及在ZK中存放Master状态行业选举信息的文件名称
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node2:2181,node3:2181,node4:2181 -Dspark.deploy.zookeeper.dir=/sparkmaster1219"
发送到其他worker节点上
不需要发送客户端所在节点node4
找一台节点(非主Master节点, 例如node2)配置备用 Master,修改spark-env.sh配置节点上的MasterIP
启动集群之前启动zookeeper集群:
../zkServer.sh start
启动spark Standalone集群,启动备用Master
打开主Master和备用Master WebUI页面,观察状态。
注意点
./start-master,sh
SparkShuffle概念
reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key,value>对的形式,这样每一个key对应一个聚合起来的value。
问题:聚合之前,每一个key对应的value不一定都是在一个partition中,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,RDD的partition极有可能分布在各个节点上。
如何聚合?
– Shuffle Write:上一个stage的每个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。
– Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合。
Spark中有两种Shuffle管理类型,HashShufflManager和SortShuffleManager,Spark1.2之前是HashShuffleManager, Spark1.2引入SortShuffleManager,在Spark 2.0+版本中已经将HashShuffleManager丢弃。
1) 普通机制
普通机制示意图
执行流程
总结
存在的问题
产生的磁盘小文件过多,会导致以下问题:
2) 合并机制
合并机制示意图
总结
产生磁盘小文件的个数:E(Executor的个数)*R(reduce的个数)
执行流程
5.01*2-5=5.02M
内存给内存数据结构。总结
产生磁盘小文件的个数: 2*M(map task的个数)
2) bypass机制
bypass机制示意图
总结
shuffle文件寻址图
BlockManager块管理者,是Spark架构中的一个模块,也是一个主从架构。
Spark执行应用程序时,Spark集群会启动Driver和Executor两种JVM进程,Driver负责创建SparkContext上下文,提交任务,task的分发等。Executor负责task的计算任务,并将结果返回给Driver。同时需要为需要持久化的RDD提供储存。Driver端的内存管理比较简单,这里所说的Spark内存管理针对Executor端的内存管理。
Spark内存管理分为静态内存管理和统一内存管理,Spark1.6之前使用的是静态内存管理,Spark1.6之后引入了统一内存管理。
静态内存管理中存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。
统一内存管理与静态内存管理的区别在于储存内存和执行内存共享同一块空间,可以互相借用对方的空间。
Spark1.6以上版本默认使用的是统一内存管理,可以通过参数spark.memory.useLegacyMode
设置为true(默认为false)使用静态内存管理。
静态内存管理分布图
SparkShuffle调优配置项如何使用?
在代码中,不推荐使用,硬编码。
new SparkConf().set(“spark.shuffle.file.buffer”,”64”)
在提交spark任务的时候,推荐使用。
spark-submit --conf spark.shuffle.file.buffer=64 –conf ….
在conf下的spark-default.conf配置文件中,不推荐,因为是写死后, 所有应用程序都要用。
附其他的调优参数
spark.shuffle.file.buffer 默认值:32k 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 spark.reducer.maxSizeInFlight 默认值:48m 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 spark.shuffle.io.maxRetries 默认值:3 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。 shuffle file not find taskScheduler不负责重试task,由DAGScheduler负责重试stage spark.shuffle.io.retryWait 默认值:5s 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。 spark.shuffle.memoryFraction 默认值:0.2 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。 spark.shuffle.manager 默认值:sort|hash 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。 spark.shuffle.sort.bypassMergeThreshold 默认值:200 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。 spark.shuffle.consolidateFiles 默认值:false 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎,由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用。
除了基于Spark的特性外,Shark是完全兼容Hive的语法,表结构以及UDF函数等,已有的HiveSql可以直接进行迁移至Shark上Shark底层依赖于Hive的解析器,查询优化器,但正是由于Shark的整体设计架构对Hive的依赖性太强,难以支持其长远发展,比如不能和Spark的其他组件进行很好的集成,无法满足Spark的一栈式解决大数据处理的需求。
Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。
Spark on Hive和Hive on Spark
Spark on Hive: Hive只作为储存角色,Spark负责sql解析优化,执行。
Hive on Spark:Hive即作为存储又负责sql的解析优化,Spark负责执行。
DataFrame也是一个分布式数据容器。与RDD类似,然而DataFrame更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。
从API易用性的角度上 看, DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。
DataFrame就Row类型的DataSet。
SparkSQL的数据源可以是JSON类型的字符串,JDBC,Parquent,Hive,HDFS等。
首先拿到sql后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,
再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过SparkPlanner的策略转化成一批物理计划,
随后经过消费模型转换成一个个的Spark任务执行。
注意:
java
SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("jsonfile"); SparkContext sc = new SparkContext(conf); //创建sqlContext SQLContext sqlContext = new SQLContext(sc); /** * DataFrame的底层是一个一个的RDD RDD的泛型是Row类型。 * 以下两种方式都可以读取json格式的文件 */ DataFrame df = sqlContext.read().format("json").load("sparksql/json"); // DataFrame df2 = sqlContext.read().json("sparksql/json.txt"); // df2.show(); /** * DataFrame转换成RDD */ RDD<Row> rdd = df.rdd(); /** * 显示 DataFrame中的内容,默认显示前20行。如果现实多行要指定多少行show(行数) * 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。 */ // df.show(); /** * 树形的形式显示schema信息 */ df.printSchema(); /** * dataFram自带的API 操作DataFrame */ //select name from table // df.select("name").show(); //select name age+10 as addage from table df.select(df.col("name"),df.col("age").plus(10).alias("addage")).show(); //select name ,age from table where age>19 df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show(); //select count(*) from table group by age df.groupBy(df.col("age")).count().show(); /** * 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘 */ df.registerTempTable("jtable"); DataFrame sql = sqlContext.sql("select age,count(1) from jtable group by age"); DataFrame sql2 = sqlContext.sql("select * from jtable"); sc.stop();
Scala
val conf = new SparkConf() conf.setMaster("local").setAppName("jsonfile") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.json("sparksql/json") //val df1 = sqlContext.read.format("json").load("sparksql/json") df.show() df.printSchema() //select * from table df.select(df.col("name")).show() //select name from table where age>19 df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show() //select count(*) from table group by age df.groupBy(df.col("age")).count().show(); /** * 注册临时表 */ df.registerTempTable("jtable") val result = sqlContext.sql("select * from jtable") result.show() sc.stop()
java
SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("jsonRDD"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> nameRDD = sc.parallelize(Arrays.asList( "{\"name\":\"zhangsan\",\"age\":\"18\"}", "{\"name\":\"lisi\",\"age\":\"19\"}", "{\"name\":\"wangwu\",\"age\":\"20\"}" )); JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList( "{\"name\":\"zhangsan\",\"score\":\"100\"}", "{\"name\":\"lisi\",\"score\":\"200\"}", "{\"name\":\"wangwu\",\"score\":\"300\"}" )); DataFrame namedf = sqlContext.read().json(nameRDD); DataFrame scoredf = sqlContext.read().json(scoreRDD); namedf.registerTempTable("name"); scoredf.registerTempTable("score"); DataFrame result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name"); result.show(); sc.stop();
Scala
val conf = new SparkConf() conf.setMaster("local").setAppName("jsonrdd") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val nameRDD = sc.makeRDD(Array( "{\"name\":\"zhangsan\",\"age\":18}", "{\"name\":\"lisi\",\"age\":19}", "{\"name\":\"wangwu\",\"age\":20}" )) val scoreRDD = sc.makeRDD(Array( "{\"name\":\"zhangsan\",\"score\":100}", "{\"name\":\"lisi\",\"score\":200}", "{\"name\":\"wangwu\",\"score\":300}" )) val nameDF = sqlContext.read.json(nameRDD) val scoreDF = sqlContext.read.json(scoreRDD) nameDF.registerTempTable("name") scoreDF.registerTempTable("score") val result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name") result.show() sc.stop()
java
/** * 注意: * 1.自定义类必须是可序列化的 * 2.自定义类访问级别必须是Public * 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序 */ SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("RDD"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt"); JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() { /** * */ private static final long serialVersionUID = 1L; @Override public Person call(String s) throws Exception { Person p = new Person(); p.setId(s.split(",")[0]); p.setName(s.split(",")[1]); p.setAge(Integer.valueOf(s.split(",")[2])); return p; } }); /** * 传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame * 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了DataFrame */ DataFrame df = sqlContext.createDataFrame(personRDD, Person.class); df.show(); df.registerTempTable("person"); sqlContext.sql("select name from person where id = 2").show(); /** * 将DataFrame转成JavaRDD * 注意: * 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用 * 2.可以使用row.getAs("列名")来获取对应的列值。 * */ JavaRDD<Row> javaRDD = df.javaRDD(); JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() { /** * */ private static final long serialVersionUID = 1L; @Override public Person call(Row row) throws Exception { Person p = new Person(); //p.setId(row.getString(1)); //p.setName(row.getString(2)); //p.setAge(row.getInt(0)); p.setId((String)row.getAs("id")); p.setName((String)row.getAs("name")); p.setAge((Integer)row.getAs("age")); return p; } }); map.foreach(new VoidFunction<Person>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Person t) throws Exception { System.out.println(t); } }); sc.stop();
Scala
val conf = new SparkConf() conf.setMaster("local").setAppName("rddreflect") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lineRDD = sc.textFile("./sparksql/person.txt") /** * 将RDD隐式转换成DataFrame */ import sqlContext.implicits._ val personRDD = lineRDD.map { x => { val person = Person(x.split(",")(0),x.split(",")(1),Integer.valueOf(x.split(",")(2))) person } } val df = personRDD.toDF(); df.show() /** * 将DataFrame转换成PersonRDD */ val rdd = df.rdd val result = rdd.map { x => { Person(x.getAs("id"),x.getAs("name"),x.getAs("age")) } } result.foreach { println} sc.stop()
java:
SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("rddStruct"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt"); /** * 转换成Row类型的RDD */ JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() { /** * */ private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception { return RowFactory.create( String.valueOf(s.split(",")[0]), String.valueOf(s.split(",")[1]), Integer.valueOf(s.split(",")[2]) ); } }); /** * 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库 */ List<StructField> asList =Arrays.asList( DataTypes.createStructField("id", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true) ); StructType schema = DataTypes.createStructType(asList); DataFrame df = sqlContext.createDataFrame(rowRDD, schema); df.show(); sc.stop();
Scala
val conf = new SparkConf() conf.setMaster("local").setAppName("rddStruct") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lineRDD = sc.textFile("./sparksql/person.txt") val rowRDD = lineRDD.map { x => { val split = x.split(",") RowFactory.create(split(0),split(1),Integer.valueOf(split(2))) } } val schema = StructType(List( StructField("id",StringType,true), StructField("name",StringType,true), StructField("age",IntegerType,true) )) val df = sqlContext.createDataFrame(rowRDD, schema) df.show() df.printSchema() sc.stop()
注意:
可以将DataFrame存储成parquet文件。保存成parquet文件的方式有两种
df.write().mode(SaveMode.Overwrite)format("parquet")
.save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
SaveMode指定文件保存时的模式。
java
SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("parquet"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> jsonRDD = sc.textFile("sparksql/json"); DataFrame df = sqlContext.read().json(jsonRDD); /** * 将DataFrame保存成parquet文件,SaveMode指定存储文件时的保存模式 * 保存成parquet文件有以下两种方式: */ df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet"); df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet"); df.show(); /** * 加载parquet文件成DataFrame * 加载parquet文件有以下两种方式: */ DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet"); load = sqlContext.read().parquet("./sparksql/parquet"); load.show(); sc.stop();
Scala
val conf = new SparkConf() conf.setMaster("local").setAppName("parquet") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val jsonRDD = sc.textFile("sparksql/json") val df = sqlContext.read.json(jsonRDD) df.show() /** * 将DF保存为parquet文件 */ df.write.mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet") df.write.mode(SaveMode.Overwrite).parquet("./sparksql/parquet") /** * 读取parquet文件 */ var result = sqlContext.read.parquet("./sparksql/parquet") result = sqlContext.read.format("parquet").load("./sparksql/parquet") result.show() sc.stop()
两种方式创建DataFrame
java:
parkConf conf = new SparkConf(); conf.setMaster("local").setAppName("mysql"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); /** * 第一种方式读取MySql数据库表,加载为DataFrame */ Map<String, String> options = new HashMap<String,String>(); options.put("url", "jdbc:mysql://192.168.179.4:3306/spark"); options.put("driver", "com.mysql.jdbc.Driver"); options.put("user", "root"); options.put("password", "123456"); options.put("dbtable", "person"); DataFrame person = sqlContext.read().format("jdbc").options(options).load(); person.show(); person.registerTempTable("person"); /** * 第二种方式读取MySql数据表加载为DataFrame */ DataFrameReader reader = sqlContext.read().format("jdbc"); reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark"); reader.option("driver", "com.mysql.jdbc.Driver"); reader.option("user", "root"); reader.option("password", "123456"); reader.option("dbtable", "score"); DataFrame score = reader.load(); score.show(); score.registerTempTable("score"); DataFrame result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name"); result.show(); /** * 将DataFrame结果保存到Mysql中 */ Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "123456"); result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties); sc.stop();
Scala
val conf = new SparkConf() conf.setMaster("local").setAppName("mysql") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) /** * 第一种方式读取Mysql数据库表创建DF */ val options = new HashMap[String,String](); options.put("url", "jdbc:mysql://192.168.179.4:3306/spark") options.put("driver","com.mysql.jdbc.Driver") options.put("user","root") options.put("password", "123456") options.put("dbtable","person") val person = sqlContext.read.format("jdbc").options(options).load() person.show() person.registerTempTable("person") /** * 第二种方式读取Mysql数据库表创建DF */ val reader = sqlContext.read.format("jdbc") reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark") reader.option("driver","com.mysql.jdbc.Driver") reader.option("user","root") reader.option("password","123456") reader.option("dbtable", "score") val score = reader.load() score.show() score.registerTempTable("score") val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name") result.show() /** * 将数据写入到Mysql表中 */ val properties = new Properties() properties.setProperty("user", "root") properties.setProperty("password", "123456") result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties) sc.stop()
./spark-submit
--master spark://node1:7077,node2:7077
--executor-cores 1
--executor-memory 2G
--total-executor-cores 1
--class com.bjsxt.sparksql.dataframe.CreateDFFromHive
/root/test/HiveTest.jar
java
SparkConf conf = new SparkConf(); conf.setAppName("hive"); JavaSparkContext sc = new JavaSparkContext(conf); //HiveContext是SQLContext的子类。 HiveContext hiveContext = new HiveContext(sc); hiveContext.sql("USE spark"); hiveContext.sql("DROP TABLE IF EXISTS student_infos"); //在hive中创建student_infos表 hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' "); hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos"); hiveContext.sql("DROP TABLE IF EXISTS student_scores"); hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'"); hiveContext.sql("LOAD DATA " + "LOCAL INPATH '/root/test/student_scores'" + "INTO TABLE student_scores"); /** * 查询表生成DataFrame */ DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score " + "FROM student_infos si " + "JOIN student_scores ss " + "ON si.name=ss.name " + "WHERE ss.score>=80"); hiveContext.sql("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.registerTempTable("goodstudent"); DataFrame result = hiveContext.sql("select * from goodstudent"); result.show(); /** * 将结果保存到hive表 good_student_infos */ goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos"); Row[] goodStudentRows = hiveContext.table("good_student_infos").collect(); for(Row goodStudentRow : goodStudentRows) { System.out.println(goodStudentRow); } sc.stop();
Scala
val conf = new SparkConf() conf.setAppName("HiveSource") val sc = new SparkContext(conf) /** * HiveContext是SQLContext的子类。 */ val hiveContext = new HiveContext(sc) hiveContext.sql("use spark") hiveContext.sql("drop table if exists student_infos") hiveContext.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'") hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos") hiveContext.sql("drop table if exists student_scores") hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'") hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores") val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name") hiveContext.sql("drop table if exists good_student_infos") /** * 将结果写入到hive表中 */ df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos") sc.stop()
在Spark客户端配置Hive On Spark
在Spark客户端安装包下spark-1.6.0/conf中创建文件hive-site.xml:(或者从hive配置文件复制)
配置hive的metastore路径(启动数据源服务hive --service metastore &
所在的节点)
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083</value>
</property>
</configuration>
启动Hive的metastore服务(node3)
# 阻塞式启动
hive --service metastore
# 后台启动
hive --service metastore &
启动zookeeper集群,启动HDFS集群。
启动SparkShell 读取Hive中的表总数,对比hive中查询同一表查询总数测试时间。
./spark-shell --master spark://node1:7077,node2:7077
--executor-cores 1
--executor-memory 1g
--total-executor-cores 1
## 方式一
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
hc.sql("show databases").show
hc.sql("user default").show
hc.sql("select count(*) from jizhan").show
## 方式二
spark.sql("show tables").show
注意:
如果使用Spark on Hive 查询数据时,出现错误:
找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置HDFS的路径
将下面代码所在的项目打包, 将含有依赖的jar上传至虚拟机
/** * 读取Hive中的数据 * 要开启 :enableHiveSupport */ object CreateDataFrameFromHive { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("CreateDataFrameFromHive").enableHiveSupport().getOrCreate() spark.sql("use spark") spark.sql("drop table if exists student_infos") spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'") spark.sql("load data local inpath '/root/test/student_infos' into table student_infos") spark.sql("drop table if exists student_scores") spark.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'") spark.sql("load data local inpath '/root/test/student_scores' into table student_scores") // val frame: DataFrame = spark.table("student_infos") 可以将表转换成DataFrame // frame.show(100) val df = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name") df.show(100) spark.sql("drop table if exists good_student_infos") /** * 将结果写入到hive表中 */ df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos") } }
hive创建表后所需的文件内容
student_infos
-------------------
zhangsan 18
lisi 19
wangwu 20
student_scores
---------------
zhangsan 100
lisi 200
wangwu 300
可以看到打包后会有两个, 第一个包没有相关的jar,只有404kb, 只含有代码; 第二个jar有209MB, 含有运行该代码所需要的所有jar包, 可以直接运行
2.通过相关命令上传并执行该jar任务
./spark-submit
--master spark://node1:7077
--class com.bjsxt.scalaspark.sql.DataSetAndDataFrame.CreateDataFrameFromHive /root/test/MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar
3.进入hive中, 进入相应的DB查看
注意: 如果没有该DB, 可以手动创建
可以自定义类实现UDFX接口。
java
SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("udf"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu")); JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() { /** * */ private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType,true)); StructType schema = DataTypes.createStructType(fields); DataFrame df = sqlContext.createDataFrame(rowRDD,schema); df.registerTempTable("user"); /** * 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx */ sqlContext.udf().register("StrLen", new UDF1<String,Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Integer call(String t1) throws Exception { return t1.length(); } }, DataTypes.IntegerType); sqlContext.sql("select name ,StrLen(name) as length from user").show(); //sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() { // // /** // * // */ // private static final long serialVersionUID = 1L; // // @Override // public Integer call(String t1, Integer t2) throws Exception { //return t1.length()+t2; // } //} ,DataTypes.IntegerType ); //sqlContext.sql("select name ,StrLen(name,10) as length from user").show(); sc.stop();
scala:
val conf = new SparkConf() conf.setMaster("local").setAppName("udf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc); val rdd = sc.makeRDD(Array("zhansan","lisi","wangwu")) val rowRDD = rdd.map { x => { RowFactory.create(x) } } val schema = DataTypes.createStructType(Array(StructField("name",StringType,true))) val df = sqlContext.createDataFrame(rowRDD, schema) df.registerTempTable("user") //sqlContext.udf.register("StrLen",(s : String)=>{s.length()}) //sqlContext.sql("select name ,StrLen(name) as length from user").show sqlContext.udf.register("StrLen",(s : String,i:Int)=>{s.length()+i}) sqlContext.sql("select name ,StrLen(name,10) as length from user").show sc.stop()
实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类
java
SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("udaf"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu","zhangsan","zhangsan","lisi")); JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() { /** * */ private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); DataFrame df = sqlContext.createDataFrame(rowRDD, schema); df.registerTempTable("user"); /** * 注册一个UDAF函数,实现统计相同值得个数 * 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的 */ sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() { /** * */ private static final long serialVersionUID = 1L; /** * 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑 * buffer.getInt(0)获取的是上一次聚合后的值 * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 * 大聚和发生在reduce端. * 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算 */ @Override public void update(MutableAggregationBuffer buffer, Row arg1) { buffer.update(0, buffer.getInt(0)+1); } /** * 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理 * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来 * buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值 * buffer2.getInt(0) : 这次计算传入进来的update的结果 * 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作 */ @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0)); } /** * 指定输入字段的字段及类型 */ @Override public StructType inputSchema() { return DataTypes.createStructType( Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true))); } /** * 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果 */ @Override public void initialize(MutableAggregationBuffer buffer) { buffer.update(0, 0); } /** * 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果 */ @Override public Object evaluate(Row row) { return row.getInt(0); } @Override public boolean deterministic() { //设置为true return true; } /** * 指定UDAF函数计算后返回的结果类型 */ @Override public DataType dataType() { return DataTypes.IntegerType; } /** * 在进行聚合操作的时候所要处理的数据的结果的类型 */ @Override public StructType bufferSchema() { return DataTypes.createStructType( Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true))); } }); sqlContext.sql("select name ,StringCount(name) from user group by name").show(); sc.stop();
Scala
class MyUDAF extends UserDefinedAggregateFunction { // 聚合操作时,所处理的数据的类型 def bufferSchema: StructType = { DataTypes.createStructType(Array(DataTypes.createStructField("aaa", IntegerType, true))) } // 最终函数返回值的类型 def dataType: DataType = { DataTypes.IntegerType } def deterministic: Boolean = { true } // 最后返回一个最终的聚合值 要和dataType的类型一一对应 def evaluate(buffer: Row): Any = { buffer.getAs[Int](0) } // 为每个分组的数据执行初始化值 def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0 } //输入数据的类型 def inputSchema: StructType = { DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true))) } // 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0) } // 每个组,有新的值进来的时候,进行分组对应的聚合值的计算 def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Int](0)+1 } } object UDAF { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local").setAppName("udaf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val rdd = sc.makeRDD(Array("zhangsan","lisi","wangwu","zhangsan","lisi")) val rowRDD = rdd.map { x => {RowFactory.create(x)} } val schema = DataTypes.createStructType(Array(DataTypes.createStructField("name", StringType, true))) val df = sqlContext.createDataFrame(rowRDD, schema) df.show() df.registerTempTable("user") /** * 注册一个udaf函数 */ sqlContext.udf.register("StringCount", new MyUDAF()) sqlContext.sql("select name ,StringCount(name) from user group by name").show() sc.stop() } }
注意:
row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个值,相当于分组取topN
如果SQL语句里面使用到了开窗函数,那么这个SQL语句必须使用HiveContext来执行,HiveContext默认情况下在本地无法创建。在MySql8之后也增加了开窗函数。(一般在Spark集群中运行,将任务提交至集群中运行)
开窗函数格式:
row_number() over (partitin by XXX order by XXX)
java代码
SparkConf conf = new SparkConf(); conf.setAppName("windowfun"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc); hiveContext.sql("use spark"); hiveContext.sql("drop table if exists sales"); hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '\t'"); hiveContext.sql("load data local inpath '/root/test/sales' into table sales"); /** * 开窗函数格式: * 【 rou_number() over (partitin by XXX order by XXX) 】 */ DataFrame result = hiveContext.sql("select riqi,leibie,jine " + "from (" + "select riqi,leibie,jine," + "row_number() over (partition by leibie order by jine desc) rank " + "from sales) t " + "where t.rank<=3"); result.show(); sc.stop();
Scala代码
val conf = new SparkConf() conf.setAppName("windowfun") val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) hiveContext.sql("use spark"); hiveContext.sql("drop table if exists sales"); hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '\t'"); hiveContext.sql("load data local inpath '/root/test/sales' into table sales"); /** * 开窗函数格式: * 【 rou_number() over (partitin by XXX order by XXX) 】 */ val result = hiveContext.sql("select riqi,leibie,jine " + "from (" + "select riqi,leibie,jine," + "row_number() over (partition by leibie order by jine desc) rank " + "from sales) t " + "where t.rank<=3"); result.show(); sc.stop()
SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是 :Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。
storm 和 spark streaming 在实时性,吞吐量等方面的对比
1、实时性:一般Storm的时延性比spark streaming要低,原因是Spark Streaming是小的批处理,通过间隔时长生成批次,一个批次触发一次计算,比如我在程序里面设置间隔时长为5秒,那就是五秒接收到的数据触发一次计算,Storm是实时处理,来一条数据,触发一次计算,所以可以称spark streaming为流式计算,Storm 为实时计算,阿里的JStorm通过实现Trident,也支持小的批处理计算
2、吞吐量 :Storm的吞吐量要略差于Spark Streaming,原因一是Storm从spout组件
接收源数据,通过发射器发送到bolt,bolt对接收到的数据进行处理,处理完以后,写入到外部存储系统中或者发送到下个bolt进行再处理,所以storm是移动数据,不是移动计算;Spark Streaming获取Task要计算的数据在哪个节点上,然后TaskScheduler把task发送到对应节点上进行数据处理,所以Spark Streaming是移动计算不是移动数据,移动计算也是当前计算引擎的主流设计思想;原因二大家很容易看出来,一个是批处理,一个是实时计算,批处理的吞吐量一般要高于实时触发的计算
3、容错机制:storm是acker(ack/fail消息确认机制)确认机制确保一个tuple被完全处理,Spark Streaming是通过存储RDD转化逻辑进行容错,也就是如果数据从A数据集到B数据集计算错误了,由于存储的有A到B的计算逻辑,所以可以从A重新计算生成B,容错机制不一样,暂时无所谓好坏
注意:
receiver task是7*24小时一直在执行,一直接受数据,将一段时间内接收来的数据保存到batch中。
假设batchInterval为5s,那么会将接收来的数据每隔5秒封装到一个batch中,batch没有分布式计算特性,这一个batch的数据又被封装到一个RDD中,RDD最终封装到一个DStream中。
例如:假设batchInterval为5秒,每隔5秒通过SparkStreaming将得到一个DStream,在第6秒的时候计算这5秒的数据,假设执行任务的时间是3秒,那么第6~9秒一边在接收数据,一边在计算任务,9 ~10秒只是在接收数据。然后在第11秒的时候重复上面的操作。
如果job执行的时间大于batchInterval会有什么样的问题?
如果接受过来的数据设置的级别是仅内存,接收来的数据会越堆积越多,最后可能会导致OOM(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟 )。
netcat yum install -y nc
)foreachRDD
output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行。
transform
transformation类算子
可以通过transform算子,对Dstream做RDD到RDD的任意操作。
updateStateByKey
transformation算子updateStateByKey作用:
使用到updateStateByKey要开启checkpoint机制和功能。
多久会将内存中的数据写入到磁盘一份?
如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份。
对一定间隔时间内的Wc,而不是全局的Wc
import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * 使用SparkStreaming进行WordCount: 注意这种wc只是对一定间隔时间内的Wc,而不是全局的Wc * 注意 * 1. batchDuration: 代表我们能够处理数据接收的延迟度, 批次数据处理的间隔时间, 可以集合WebUI调节 * 2. 创建StreamingContext的两种方式 * val ssc=new StreamingContext(SparkContext, batchDuration) * val ssc=new StreamingContext(SparkConf,batchDuration) * 3. SparkStreaming操作的是Dstream, 可以使用的DStream的TransFormation算子, 要使用outputOperation类算子触发执行 * 4. StreamingContext.start()后, 不能添加新的业务逻辑 * 5. StreamingContext.stop()后, 不能调用StreamingContext,start()重新启动, 因为对象已经被回收 * 6. StreaningContext.stop(stopSparkContext=true), 默认关闭关闭StreamingContext关闭时会将SparkContext * * Author TimePause * Create 2019-12-21 10:10 */ object SparkStreamingForWc { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("sswc") //创建2个线程,一个用于接收,一个用于处理 val sc = new SparkContext(conf) // 创建StreamingContext对象,Durations指定批处理间隔时间. 通过socketTextStream设置Socket通信ip和端口 val ssc = new StreamingContext(sc, Durations.seconds(5)) val socket: ReceiverInputDStream[String] = ssc.socketTextStream("node4", 9999) val words = socket.flatMap(one => { (one.split(" ")) }) val pairWords = words.map(word => { (word, 1) }) val reduceResult = pairWords.reduceByKey((v1, v2) => { v1 + v2 }) // print可以指定输出的行数 //reduceResult.print(10) /** * foreachRDD * 1. 拿到DataStream中的RDD.对RDD进行Transformation或者action操作 * 2. 只有foreachRDD这个operation算子,不会触发执行,必须还要有action算子的支持 * 3. foreachRDD算子内map算子外的地方的代码是在Driver执行的, 我们可以通过这里动态的改变广播变量, 实现对配置的热部署 */ val resultRDD = reduceResult.foreachRDD(rdd => { println("******************") //外部代码在Driver端执行 val resultRdd = rdd.map(one => { println(s"实时接收的数据为-------------${one}") //内部代码在Executor端执行 }) resultRdd.foreach(println) }) ssc.start() ssc.awaitTermination() ssc.stop() } }
全局的WordCount, 会将实时的结果持久化到磁盘中
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext} /** * UpdateStateByKey 根据key更新状态(全局) * 1、为Spark Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是一个自定义的对象,那么更新函数也可以是自定义的。 * 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新 */ object UpdateStateByKey { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]") conf.setAppName("UpdateStateByKey") val ssc = new StreamingContext(conf,Durations.seconds(5)) //设置日志级别 ssc.sparkContext.setLogLevel("ERROR") val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node4",9999) val words: DStream[String] = lines.flatMap(line=>{line.split(" ")}) val pairWords: DStream[(String, Int)] = words.map(word => {(word, 1)}) /** * 根据key更新状态,需要设置 checkpoint来保存状态 * 默认key的状态在内存中 有一份,在checkpoint目录中有一份。 * * 多久会将内存中的数据(每一个key所对应的状态)写入到磁盘上一份呢? * 如果你的batchInterval小于10s 那么10s会将内存中的数据写入到磁盘一份 * 如果bacthInterval 大于10s,那么就以bacthInterval为准 * * 这样做是为了防止频繁的写HDFS */ ssc.checkpoint("./data/streamingCheckpoint") // ssc.sparkContext.setCheckpointDir("./data/streamingCheckpoint") /** * currentValues :当前批次某个 key 对应所有的value 组成的一个集合 * preValue : 以往批次当前key 对应的总状态值 */ val result: DStream[(String, Int)] = pairWords.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => { var totalValues = 0 if (!preValue.isEmpty) { totalValues += preValue.get } for(value <- currentValues){ totalValues += value } Option(totalValues) }) result.print() ssc.start() ssc.awaitTermination() ssc.stop() } }
/** * 使用transform算子实现指定信息的过滤 * * Author TimePause * Create 2019-12-21 16:20 */ object TransFormTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("windowfun") // 规定处理的时间为5秒 val ssc = new StreamingContext(conf, Durations.seconds(5)) // 通过获取sparkContext来设置Spark输出的日志级别 ssc.sparkContext.setLogLevel("error") val scoket: ReceiverInputDStream[String] = ssc.socketTextStream("node4", 9999) // transform可以将一种格式的DStream转换成另一种格式的DStream val transRDD: DStream[(String, String)] = scoket.transform(rdd => { val filterRDD: RDD[String] = rdd.filter(line => { println(s"====需要被过滤的字符====$line") //不加!是只显示所过滤的字符串,反之是显示除了过滤的字符串 !"world".equals(line.split(" ")(1)) }) val mapRDD: RDD[(String, String)] = filterRDD.map(one => { (one.split(" ")(0), one.split(" ")(1)) }) mapRDD }) transRDD.print() ssc.start() ssc.awaitTermination() ssc.stop() } }
窗口操作
实现每隔n秒, 打印nm秒的数据
/** * 窗口函数 * * Author TimePause * Create 2019-12-21 15:28 */ object reduceByKeyAndWindowTest { def main(args:Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("windowfun") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Durations.seconds(5)) ssc.sparkContext.setLogLevel("error") // 配置窗口函数优化后需要checkpoint保存数据 ssc.checkpoint("./data/window") val socket = ssc.socketTextStream("node4", 9999) val words: DStream[String] = socket.flatMap(line => { line.split(" ") }) // map 处理后成为一个元组(k,v)形式pairwords val pairwords = words.map(word => { (word, 1) }) // pairwords调用窗口函数, 结束后紧跟窗口函数参数, 第一个代表窗口时间长度, 第二个代表窗口的滑动间隔 /* val result = pairwords.reduceByKeyAndWindow((v1: Int, v2: Int) => { v1 + v2 }, Durations.seconds(15), Durations.seconds(5))*/ // 窗口函数优化, 在表窗口长度结束后, 仍保存其k, v设置为0 val result = pairwords.reduceByKeyAndWindow((v1: Int, v2: Int) => {v1 + v2}, (v1:Int,v2:Int)=>{v1-v2}, Durations.seconds(15), Durations.seconds(5)) // val value = pairwords.window(Durations.seconds(15), Durations.seconds(5)) //打印结果 result.print() // 启动和关闭StreamingContext对象 ssc.start() ssc.awaitTermination() ssc.stop() } }
因为SparkStreaming是7*24小时运行,Driver只是一个简单的进程,有可能挂掉,所以实现Driver的HA就有必要(如果使用的Client模式就无法实现Driver HA ,这里针对的是cluster模式)。Yarn平台cluster模式提交任务,AM(AplicationMaster)相当于Driver,如果挂掉会自动启动AM。这里所说的DriverHA针对的是Spark standalone和Mesos资源调度的情况下。
实现Driver的高可用有两个步骤:
第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。
第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)
Driver中元数据包括:
示例代码
/** * Driver HA : * 1.在提交application的时候 添加 --supervise 选项 如果Driver挂掉 会自动启动一个Driver * 2.代码层面恢复Driver(StreamingContext) * */ object SparkStreamingDriverHA { //设置checkpoint目录 val ckDir = "./data/streamingCheckpoint" def main(args: Array[String]): Unit = { /** * StreamingContext.getOrCreate(ckDir,CreateStreamingContext) * 这个方法首先会从ckDir目录中获取StreamingContext【 因为StreamingContext是序列化存储在Checkpoint目录中,恢复时会尝试反序列化这些objects。 * 如果用修改过的class可能会导致错误,此时需要更换checkpoint目录或者删除checkpoint目录中的数据,程序才能起来。】 * * 若能获取回来StreamingContext,就不会执行CreateStreamingContext这个方法创建,否则就会创建 */ val ssc: StreamingContext = StreamingContext.getOrCreate(ckDir,CreateStreamingContext) ssc.start() ssc.awaitTermination() ssc.stop() } def CreateStreamingContext() = { println("=======Create new StreamingContext =======") val conf = new SparkConf() conf.setMaster("local") conf.setAppName("DriverHA") val ssc: StreamingContext = new StreamingContext(conf,Durations.seconds(5)) ssc.sparkContext.setLogLevel("Error") /** * 默认checkpoint 存储: * 1.配置信息 * 2.DStream操作逻辑 * 3.batch执行的进度 或者【offset】 */ ssc.checkpoint(ckDir) val lines: DStream[String] = ssc.textFileStream("./data/streamingCopyFile") val words: DStream[String] = lines.flatMap(line=>{line.trim.split(" ")}) val pairWords: DStream[(String, Int)] = words.map(word=>{(word,1)}) val result: DStream[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int)=>{v1+v2}) // result.print() /** * 更改逻辑 */ result.foreachRDD(pairRDD=>{ pairRDD.filter(one=>{ println("*********** filter *********") true }).foreach(println) }) ssc } }
kafka是什么?使用场景?
kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信
Kafka更多介绍以及安装请跳转至本页面查看
链接介绍的是Kafka 0.1.0版本, 这里介绍 kafka 0.8.2环境搭建
上传kafka_2.10-0.8.2.2.tgz包到三个不同节点上,解压。
配置…/ kafka_2.10-0.8.2.2/config/server.properties文件
节点编号:(不同节点按0,1,2,3整数来配置)
真实数据存储位置:
zookeeper的节点:
启动zookeeper集群。
# 进入zk集群
zkCli.sh
# 查看每个borker所对应的唯一id信息(可以看到当前kafka集群中启动了几个borker)
ls /brokers/ids
三个节点上,启动kafka:
bin/kafka-server-start.sh config/server.properties
最好使用自己写的脚本启动,将启动命令写入到一个文件:
(放在与bin同一级别下,注意创建后要修改权限:chmod 755 startkafka.sh)
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
kafka_2.11-0.11.0.3 安装同kafka 0.8.2
不过更换版本时需要删除zk中存放 kafka信息删除方式如下
import java.text.SimpleDateFormat import java.util.{Date, Properties} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import scala.util.Random /** * 向 kafka 中生产数据 */ object ProduceDataToKafka { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers", "node2:9092,node3:9092,node4:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") //批次大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384") //等待时间 props.put(ProducerConfig.LINGER_MS_CONFIG,"1") props.put("enable.auto.commit","true") val producer = new KafkaProducer[String,String](props) var counter = 0 var keyFlag = 0 while(true){ counter +=1 keyFlag +=1 val content: String = userlogs() print(content) producer.send(new ProducerRecord[String, String]("testKafka,mytopic1222", s"key-$keyFlag", content)) if(0 == counter%100){ counter = 0 Thread.sleep(5000) } } producer.close() } def userlogs()={ val userLogBuffer = new StringBuffer("") val timestamp = new Date().getTime(); var userID = 0L var pageID = 0L //随机生成的用户ID userID = Random.nextInt(2000) //随机生成的页面ID pageID = Random.nextInt(2000); //随机生成Channel val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML") val channel = channelNames(Random.nextInt(10)) val actionNames = Array[String]("View", "Register") //随机生成action行为 val action = actionNames(Random.nextInt(2)) val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) userLogBuffer.append(dateToday) .append("\t") .append(timestamp) .append("\t") .append(userID) .append("\t") .append(pageID) .append("\t") .append(channel) .append("\t") .append(action) System.out.println(userLogBuffer.toString()) userLogBuffer.toString() } }
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.{Durations, StreamingContext} /** * SparkStreaming2.3版本 读取kafka 中数据 : * 1.采用了新的消费者api实现,类似于1.6中SparkStreaming 读取 kafka Direct模式。并行度 一样。 * 2.因为采用了新的消费者api实现,所有相对于1.6的Direct模式【simple api实现】 ,api使用上有很大差别。未来这种api有可能继续变化 * 3.kafka中有两个参数: * heartbeat.interval.ms:这个值代表 kafka集群与消费者之间的心跳间隔时间,kafka 集群确保消费者保持连接的心跳通信时间间隔。这个时间默认是3s. * 这个值必须设置的比session.timeout.ms 小,一般设置不大于 session.timeout.ms 的1/3 * session.timeout.ms : * 这个值代表消费者与kafka之间的session 会话超时时间,如果在这个时间内,kafka 没有接收到消费者的心跳【heartbeat.interval.ms 控制】, * 那么kafka将移除当前的消费者。这个时间默认是30s。 * 这个时间位于配置 group.min.session.timeout.ms【6s】 和 group.max.session.timeout.ms【300s】之间的一个参数, * 如果SparkSteaming 批次间隔时间大于5分钟,也就是大于300s,那么就要相应的调大group.max.session.timeout.ms 这个值。 * 4.大多数情况下,SparkStreaming读取数据使用 LocationStrategies.PreferConsistent 这种策略,这种策略会将分区均匀的分布在集群的Executor之间。 * 如果Executor在kafka 集群中的某些节点上,可以使用 LocationStrategies.PreferBrokers 这种策略,那么当前这个Executor 中的数据会来自当前broker节点。 * 如果节点之间的分区有明显的分布不均,可以使用 LocationStrategies.PreferFixed 这种策略,可以通过一个map 指定将topic分区分布在哪些节点中。 * * 5.新的消费者api 可以将kafka 中的消息预读取到缓存区中,默认大小为64k。默认缓存区在 Executor 中,加快处理数据速度。 * 可以通过参数 spark.streaming.kafka.consumer.cache.maxCapacity 来增大,也可以通过spark.streaming.kafka.consumer.cache.enabled 设置成false 关闭缓存机制。 * "注意:官网中描述这里建议关闭,在读取kafka时如果开启会有重复读取同一个topic partition 消息的问题,报错:KafkaConsumer is not safe for multi-threaded access" * * 6.关于消费者offset * 1).如果设置了checkpoint ,那么offset 将会存储在checkpoint中。 * 这种有缺点: 第一,当从checkpoint中恢复数据时,有可能造成重复的消费。 * 第二,当代码逻辑改变时,无法从checkpoint中来恢复offset. * 2).依靠kafka 来存储消费者offset,kafka 中有一个特殊的topic 来存储消费者offset。新的消费者api中,会定期自动提交offset。这种情况有可能也不是我们想要的, * 因为有可能消费者自动提交了offset,但是后期SparkStreaming 没有将接收来的数据及时处理保存。这里也就是为什么会在配置中将enable.auto.commit 设置成false的原因。 * 这种消费模式也称最多消费一次,默认sparkStreaming 拉取到数据之后就可以更新offset,无论是否消费成功。自动提交offset的频率由参数auto.commit.interval.ms 决定,默认5s。 * *如果我们能保证完全处理完业务之后,可以后期异步的手动提交消费者offset. * 注意:这种模式也有弊端,这种将offset存储在kafka中方式,参数offsets.retention.minutes=1440控制offset是否过期删除,默认将offset信息保存一天, * 如果停机没有消费达到时长,存储在kafka中的消费者组会被清空,offset也就被清除了。 * 3).自己存储offset,这样在处理逻辑时,保证数据处理的事务,如果处理数据失败,就不保存offset,处理数据成功则保存offset.这样可以做到精准的处理一次处理数据。 * */ object SparkStreamingOnKafkaDirect { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("SparkStreamingOnKafkaDirect") val ssc = new StreamingContext(conf,Durations.seconds(5)) //设置日志级别 // ssc.sparkContext.setLogLevel("ERROR") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node4:9092,node2:9092,node3:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "MyGroupId",// /** * * earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始 * latest:自动重置偏移量为最大偏移量【默认】* * none:没有找到以前的offset,抛出异常 */ "auto.offset.reset" -> "earliest", /** * 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交 */ "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true ) val topics = Array[String]("mytopic1222") val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent,//消费策略 Subscribe[String, String](topics, kafkaParams) ) val transStrem: DStream[String] = stream.map((record:ConsumerRecord[String, String]) => { val key_value = (record.key, record.value) println("receive message key = "+key_value._1) println("receive message value = "+key_value._2) key_value._2 }) val wordsDS: DStream[String] = transStrem.flatMap(line=>{line.split("\t")}) val result: DStream[(String, Int)] = wordsDS.map((_,1)).reduceByKey(_+_) result.print() /** * 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset * 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。 */ stream.foreachRDD { rdd => val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed for(or <- offsetRanges){ println(s"current topic = ${or.topic},partition = ${or.partition},fromoffset = ${or.fromOffset},untiloffset=${or.untilOffset}") } stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } ssc.start() ssc.awaitTermination() ssc.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。