赞
踩
内容如下:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.example</groupId>
- <artifactId>Spark</artifactId>
- <version>1.0</version>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <encoding>UTF-8</encoding>
- <hadoop.version>3.1.3</hadoop.version>
- <scala.version>2.12.15</scala.version>
- <scala.tools.version>2.12</scala.tools.version>
- <spark.version>3.2.1</spark.version>
- <mysql.version>5.1.47</mysql.version>
- <hive.version>3.1.2</hive.version>
- </properties>
-
- <repositories>
- <!--阿里云仓库-->
- <repository>
- <id>aliyun</id>
- <url>http://maven.aliyun.com/nexus/content/groups/public</url>
- </repository>
-
- <repository>
- <id>cloudera</id>
- <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
- </repository>
-
- </repositories>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.tools.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.tools.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql.version}</version>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_${scala.tools.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
-
- </dependencies>
- </project>
使用maven下载相应jar包
内容如下:
- log4j.rootCategory=error, console
- log4j.appender.console=org.apache.log4j.ConsoleAppender
- log4j.appender.console.target=System.err
- log4j.appender.console.layout=org.apache.log4j.PatternLayout
- log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
info 打印日志级别
error 不打印,只报错
- package com.peizk
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object WordCount {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local").setAppName("My app")
- val sc = new SparkContext(conf)
-
- sc.textFile("Data/Wcdata.txt").flatMap(_.split(",")) .map((_,1)).reduceByKey(_+_).foreach(println)
- sc.stop()
- }
- }
运行
sc.textFile("/spark-test/spark-test.txt").flatMap(x => x.split(",")).map(x =>(x,1)).reduceByKey(_+_).foreach(println)
sc.textFile("/spark-test/spark-test.txt").flatMap(_.split(",")).map((_, 1)).groupByKey().map(tuple => {(tuple._1, tuple._2.sum)}).collect().foreach(println)
map 每次只能处理一条数据
mapPartitions 每次处理一个分区的数据
所以当内存空间比较大的时候,可以使用mapPartitions ,提高处理效率
关系:两者都是用来改变RDD的分区数量的
repartition 底层调用的就是 coalesce 方法
coalesce(numPartitions, shuffle = true)
区别:repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce
需要两个rdd有相同的分区数 和 数据条数相同
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作
groupByKey:按照key进行分组,直接进行shuffle。
建议使用 reduceByKey
- // 17 join
- val left = sc.makeRDD(List(("aa", 1), ("bb", 2)))
- val right = sc.makeRDD(List(("aa","dh")))
- left.join(right).foreach(println)
- left.leftOuterJoin(right).foreach(println)
- left.fullOuterJoin(right ).foreach(println)
reduceByKey 支持分区内预聚合 可以有效减少shuffle落盘数据量
但是 这要求我们 使用reduceByKey 时分区内与分区间的计算规则要一样,例如统计wc
相同的key 两两之间聚合
但是当遇到计算逻辑不一致时,比如分区内求最大值,分区间求和,此时reduceByKey 就不太能满足要求。
- package com.peizk.test
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Test {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local").setAppName("My HdfsApp")
- val sc = new SparkContext(conf)
-
- val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)),2)
- //第一个参数列表
- //需要传递一个参数,表示为初始值
- // 主要用于当碰见第一个key的时候,和value进行分区内的计算
- //第二个参数列表
- //需要两个参数,第一个参数 表示分区内计算规则
- // 第二个参数 表示分区间计算规则
- rdd.aggregateByKey(0)(
- (x,y) => math.max(x,y),
- (x,y) => x +y
- ).foreach(println)
-
-
- // val rdd1 = sc.makeRDD(List(("a", 1), ("a", 2),("b", 2),("b", 3), ("a", 3), ("a", 4)),2)
- // rdd1.aggregateByKey(0)(
- // (x,y) => math.max(x,y),
- // (x,y) => x +y
- // ).foreach(println)
-
-
- sc.stop()
- }
-
- }
- package com.peizk.test
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Test2 {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local").setAppName("My HdfsApp")
- val sc = new SparkContext(conf)
-
- val rdd = sc.makeRDD(List(1,2,3,4),2)
-
- println(rdd.aggregate(10)(_ + _, _ + _))
-
-
- sc.stop()
- }
- }
前者的初始值,分区内,分区间都会参与
后者初始值只参与分区间
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。