赞
踩
一、数据读写
(1)从文件系统加载数据创建RDD
①本地文件:sc.textFile("file:///data/spark/buyer_favorite")
②HDFS文件:sc.textFile("hdfs://localhost:9000/spark/buyer_favorite")
(2)通过并行集合创建RDD
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)
(3)将RDD写入到文本文件
使用saveAsTextFile()函数,要求提供一个不存在的目录名称
*使用repartition重新设置分区个数
二、RDD常见操作
(1)转换操作
① filter(func):筛选出满足函数func的元素,并返回一个新的数据集。
② map(func):将每一个元素传递到函数func中,并将结果返回为一个新的数据集。
③ flatMap(func):类似map(),但是每个输入都可以映射0到多个输出结果。可以理解为:
第一步:执行map()
第二步:把map操作得到的数组中每个元素“拍扁”(flat)
④ groupByKey():对具有相同键的值进行分组,返回一个(key,Iterable)形式数据集。
⑤ reduceByKey(func):先执行groupByKey()操作得到(key,value-list),根据func对value-list进行操作。
(2)行动操作
① count():返回数据集中元素个数。
② collect():以数组的形式返回数据集中的所有元素。
③ first():返回数据集第一个元素。
④ take(n):以数组的形式返回数据集中的前n个元素。
⑤ reduce(func):通过函数func(输入两个参数,返回一个值)聚合数据集中的元素。
⑥ foreach(func):将数据集中每个元素传递到func中运行。
三、键值对RDD
(1)创建键值对RDD
①val lines = sc.textTextFile(file:///usr/local/word.txt)
var pairRDD=lines.flatMap(_.split("\t")).map(word=>(word,1))
②var pairRDD=rdd.map(word=>(word,1))
(2)常用键值对RDD操作
reduceByKey(func)
groupByKey()
keys:返回一个新的RDD.
values
sortByKey()
sortBy()
mapValues(func)
join():内连接,对于给定的两个数据集(k,v1)和(k,v2),当两个数据集都存在key才会被输出,最注重得到一个(k,(v1,v2))类型的数据集。
四、几个简单例子
(1)
WordCount统计:某电商网站记录了大量的用户对商品的收藏数据,并将数据存储在名为buyer_favorite的文本文件中。文本数据格式如下:
用户id(buyer_id),商品id(goods_id),收藏日期(dt);现要求统计用户收藏数据中,每个用户收藏商品数量。
rdd.map(line=> (line.split('\t')(0),1)).reduceByKey(_+_).collect
(2)
去重:使用spark-shell,对上述实验中,用户收藏数据文件进行统计。根据商品ID进行去重,统计用户收藏数据中都有哪些商品被收藏。
rdd.map(line => line.split('\t')(1)).distinct.collect 相当于
rdd.map(line=>(line.split('\t')),1).reduceByKey(_+_).map(_._1).collect
(3)排序:电商网站都会对商品的访问情况进行统计,现有一个goods_visit文件,存储了电商网站中的各种商品以及此各个商品的点击次数。
商品id(goods_id) 点击次数(click_num)现根据商品的点击次数进行排序,并输出所有商品。
rdd1.map(line => ( line.split('\t')(1).toInt, line.split('\t')(0) ) ).sortByKey(true).map(_._2).collect
(4)Join:现有某电商在2011年12月15日的部分交易数据。数据有订单表orders和订单明细表order_items,表结构及数据分别为:
orders表:(订单id order_id, 订单号 order_number, 买家ID buyer_id, 下单日期 create_dt)
order_items表:(明细ID item_id, 订单ID order_id, 商品ID goods_id )
orders表和order_items表,通过订单id进行关联,是一对多的关系。
下面开启spark-shell,查询在当天该电商网站,都有哪些用户购买了什么商品
rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) )
join
rdd2.map(line=> (line.split('\t')(1), line.split('\t')(2)) )
.collect
(5)求平均值:电商网站都会对商品的访问情况进行统计。现有一个goods_visit文件,存储了全部商品及各商品的点击次数。还有一个文件goods,记录了商品的基本信息。两张表的数据结构如下:
goods表:商品ID(goods_id),商品状态(goods_status),商品分类id(cat_id),评分(goods_score)
goods_visit表:商品ID(goods_id),商品点击次数(click_num)
商品表(goods)及商品访问情况表(goods_visit)可以根据商品id进行关联。现在统计每个分类下,商品的平均点击次数是多少?
val rdd11 = rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) )
val rdd22 = rdd2.map(line=> (line.split('\t')(0), line.split('\t')(1)) )
val rddjoin = rdd11 join rdd22
rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>
{(x._1, x._2._1*1.0/x._2._2)}).collect
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。