赞
踩
package com.tipdm.sparkDemo
import org.apache.spark.{SparkConf, SparkContext}
object a1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List(('a',1),('b',99),('c',100),('d',101)))
val rdd2 = sc.parallelize(List(('e',120),('f',150)))
val rdd3 = rdd1.union(rdd2)
rdd3.filter(_._2 >= 100).collect
rdd3.filter(x => x._2 >= 100).collect.foreach(println)
val rdd4 = sc.parallelize(List(('a',1),('b',99),('c',100),('d',101),('c',100)))
rdd4.filter(_._2 >= 100).collect
val rdd5 = rdd4.distinct()
rdd5.filter(x => x._2 >= 100).collect.foreach(println)
}
}
创建rdd1与rdd2,用union()方法合并rdd1与rdd2放在rdd3中
对rdd3进行filter()方法过滤,去除100以下的数据
创建rdd4,先用filter()方法过滤去除100以下的数据,再用distinct()方法去重输出
package com.tipdm.sparkDemo
import org.apache.spark.{SparkConf, SparkContext}
object a2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List(1,2,3))
val rdd2 = sc.parallelize(List(3,4,5,6))
val rdd3 = sc.parallelize(List('a','b','c'))
val rdd4 = rdd1.union(rdd2)
val rdd5 = rdd4.filter(_ >= 3)
val rdd6 = rdd5.distinct()
rdd6.cartesian(rdd3).collect.foreach(println)
}
}
创建rdd1与rdd2,用union()方法合并rdd1与rdd2放在rdd4中
创建rdd5,用filter()方法过滤去除3以下的数据
创建rdd6,先用distinct()方法去再用cartesian()方法输出笛卡尔积
package com.tipdm.sparkDemo
import org.apache.spark.{SparkConf, SparkContext}
object a4 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(
List(('a',1),('a',2),('b',1),('c',1),('c',1))
)
val re_rdd1 = rdd1.reduceByKey((a,b) => a+b)
re_rdd1.collect.foreach(println)
val g_rdd1 = rdd1.groupByKey()
g_rdd1.collect.foreach(println)
g_rdd1.map(x => (x._1, x._2.size)).collect.foreach(println)
}
}
创建re_rdd1,用reduceByKey()方法合并统计键相同的值,将值相加输出
创建g_rdd1,用groupByKey()方法对具有相同的值进行分组,将相同键的值的数量输出
package com.tipdm.sparkDemo
import org.apache.spark.{SparkConf, SparkContext}
object a3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val first_half = sc.textFile("D:\\Employee_salary_first_half.csv")
val second_half = sc.textFile("D:\\Employee_salary_first_half.csv")
val drop_first = first_half.mapPartitionsWithIndex((ix ,it) => {
if (ix == 0) it.drop(1)
it
})
val drop_second = second_half.mapPartitionsWithIndex((ix, it) => {
if (ix == 0) it.drop(1)
it
})
val split_first = drop_first.map(
line => {val data = line.split(",");(data(1), data(6).toInt)}
)
val split_second = drop_second.map(
line => {val data = line.split(",");(data(1), data(6).toInt)}
)
val filter_first = split_first.filter(x => x._2 > 200000).map(x => x._1)
val filter_second = split_second.filter(x => x._2 > 200000).map(x => x._1)
val name = filter_first.union(filter_second).distinct()
name.collect.foreach(println)
}
}
用textFile()方法将文件内容创建为rdd
用map方法加入逗号
用filter方法过滤小于20万的员工
用distinct()方法去重
最后输出结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。