赞
踩
基于Spark框架实现TopN
获取蜀国武将中武力值最高的5位,即通过分布式计算框架实现从原始数据查询出武力最高的Top5
1、安装Hadoop和Spark
具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:
Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894
提示:如果IDEA未构建Spark项目,可以转接到以下的博客:
IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536
2、
查看3个节点的进程
master
slave1
slave2
3、创建 rank.txt 文件
Shell命令:
[root@master ~]# cd /opt/data
[root@master data]# vim rank.txt
4、将 rank.txt 文件上传到 HDFS 上
Shell命令:
[root@master data]# hadoop dfs -mkdir /spark
[root@master data]# hadoop dfs -mkdir /spark/input
[root@master data]# hadoop dfs -put rank.txt /spark/input/
5、实现TopN计算
package com.John.SparkProject
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* @author John
* @Date 2021/5/24 16:41
*/
object project01 {
def main(args: Array[String]): Unit = {
// 创建一个SparkSession对象
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Project01_TOP")
.set("spark.sql.crossJoin.enabled", "true")
val spark = new SparkSession.Builder()
.config(conf)
.getOrCreate()
// 加载文件
val text = spark.read.textFile("hdfs://192.168.254.122:9000/spark/input/rank.txt")
// 跳过表头
import spark.implicits._
val header = text.first()
val dsData = text.filter(_ != header)
// 读取每一行,设置分隔符为空格,转换数据类型,并设置表头
val dfData = dsData.map(line => {
val Array(id, name, experience,country) = line.split(' ')//以空格分隔每一行,并将数据转换为int类型
(id.toInt, name, experience.toInt,country) // 设置字段名称
}).toDF("序号", "姓名", "武力值","国家")
// 根据指定列倒序排列,并取前五条数据
val setRes = dfData.orderBy(-dfData("武力值")).limit(5)
//展示数据
setRes.show()
// 以json格式保存数据
setRes.write.format("json").save("hdfs://192.168.254.122:9000/spark/output/")
}
}
6、查看 HDFS 上的结果
Shell命令:
[root@master ~]# hadoop dfs -cat /spark/output/part-00000-af1dda43-b059-4da1-997e-2196db88178e-c000.json
实验主要考察了大家对HDFS的操作,和Spark对数据操作以及关联HDFS。
实验难度不是特别大,大家可以进一步的推广尝试!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。