当前位置:   article > 正文

【spark数据处理】文件上传+文件读取+文件筛选_spark读取文件

spark读取文件

环境介绍(hadoop、spark、jdk)

  1. HDFS常用命令:
  2. hadoop fs & hdfs dfs
  3. 注:path 为路径 src为文件路径 dist 为文件夹
  4. -help[cmd] 显示命令的帮助信息
  5. -ls(r) 显示当前目录下的所有文件 -R层层循出文件夹
  6. -du(s) 显示目录中所有文件大小
  7. -count[-q] 显示当前目录下的所有文件大小
  8. -mv 移动多个文件目录到目标目录
  9. -cp 复制多个文件到目标目录
  10. -rm(r) 删除文件(夹)
  11. -put 本地文件复制到hdfs
  12. -copyFromLocal 本地文件复制到hdfs
  13. -moveFromLocal 本地文件移动到hdfs
  14. -get[-ignoreCrc] 复制文件到本地,可以忽略crc校验
  15. -getmerge 将源目录中的所有文件排序合并到一个文件中
  16. -appendToFile 将内容追加到指定文档中
  17. -cat 在终端显示文件内容
  18. -text 在终端显示文件内容
  19. -copyToLocal[-ignoreCrc] 复制文件到本地
  20. -moveToLocal 移动文件到本地
  21. -mkdir 创建文件夹 后跟-p 可以创建不存在的父路径
  22. -touchz 创建一个空文件

一、文件上传

hdfs上传:

hadoop fs -mkdir -p /data  //创建目录

hadoop fs -put /data/wenjian1.csv /data   //上传文件

(hadoop fs或者hdfs dfs上传都可以,命令区别不大)

二、修改文件名(根据自身情况而定)

这边建议有中文名的都改为英文名,有特殊字符的都改成标准的格式,以免后续报错

下面假设我们上传的文件名称为(文件1.csv)

hadoop fs -mv /data/文件1.csv /data/wenjian1.csv

三、多文件合并

(1)该命令为data1文件和data2文件末尾追加到data3.csv文件中,做到多文件合并成一个新的文件

hadoop fs  -appendToFile /data/data1.csv /data/data2.csv /data/data3.csv

(2)在这个例子中,file1.txt 的内容会首先出现在 merged_data.txt 中,然后是 file2.txt 的内容,最后是 file3.txt 的内容。达成合并文件目的

hdfs dfs -getmerge /user/data/file1.txt /user/data/file2.txt /user/data/file3.txt merged_data.txt

 四、读取文件

  1. Text文件:这是最基本的文件格式,Spark可以通过sc.textFile("path")来读取Text文件,并且可以使用saveAsTextFile(path)将RDD保存为Text文件。
  2. Json文件:对于Json文件,Spark提供了多种方式来读取和解析,例如使用SparkSQL或第三方库如fastjson。读取Json文件时,通常需要解析其Json格式。
  3. Sequence文件:这是针对key-value类型RDD的文件格式。可以通过sc.sequenceFile[KeyClass, ValueClass]("path")来读取,并可以创建RDD保存为Sequence文件。
  4. Object文件:可以将pairRDD保存为Object文件,并使用sc.objectFile[KeyClass, ValueClass]("path")来读取
  5. Csv文件:可以用spark.read.csv()来实现读取

val df = spark.read.option("header", "true").option("sep", ",").csv("/data/wenjian1.csv")

  1. val df =:这是Scala中的声明变量的方式。这里声明了一个名为df的变量,它将被赋予一个DataFrame类型的值。
  2. spark.readspark是一个SparkSession的实例,它是Spark所有功能的入口点。read方法用于从外部数据源读取数据。
  3. .option("header", "true"):这是为读取操作设置的一个选项。header选项指定CSV文件的第一行是否包含列名。在这个例子中,我们设置header"true",意味着CSV文件的第一行是列名,并且这些列名将作为DataFrame的列名。
  4. .option("sep", ","):这也是为读取操作设置的一个选项。sep选项指定字段之间的分隔符。在这个例子中,我们设置分隔符为逗号(,),这是CSV文件的标准分隔符。
  5. .csv("/data/wenjian1.csv"):这指定了要读取的CSV文件的路径。在这个例子中,文件位于/data/目录下,名为wenjian1.csv

五、显示数据

show()显示语句

df.show(5)显示前五行数据 

六、处理数据

常用语句

select 选择 :可以选择指定行,指定类,并且可以重名

where 筛选 :给筛选的列值等于给定值的行

filter 筛选:可以对数据进行筛选

groupby 分组:对数据进行分组

distinct 去重 :对重复数据进行清洗

select:

对“学校”列进行去重:

val uniquewenjian = df.select("学校").distinct()

选择指定列

val df1 = df.select("columnName1", "columnName2")

选择指定列,并重命名

val df1 =df.select($"columnName",$"columnName")

where:

 筛选指定列值等于给定值的行

val df1= df.where("columnName = 'value'")

fitter:

筛选列值大于 5的行

val df1 = df.filter("columnName > 5")  

groupBy():

按指定列分组,并对每组 进行聚合计数

 val df1 = df.groupBy("columnName").agg(count("columnName")) 

对多列进行分组,并对每组进行聚合计数,这段代码首先创建了一个SparkSession实例,然后创建了一个包含三列("columnName1", "columnName2", "col")的DataFrame。之后,使用groupBy方法按照"columnName1"和"columnName2"进行分组,然后调用agg方法进行聚合,并使用mean函数计算"col"列的平均值,将结果列命名为"m"。最后,使用show方法打印出分组聚合后的结果。

val df1 = df.groupBy("columnName1", "columnName2").agg(mean("student").alias("s1"))

(补充)以下是一些常用的聚合函数及其在DataFrame中的应用:

  1. count(): 用于计算分组中的记录数。
  2. sum(): 计算分组中某列的总和。
  3. avg(): 计算分组中某列的平均值。
  4. max(): 获取分组中某列的最大值。
  5. min(): 获取分组中某列的最小值。
  6. collect_list() 和 collect_set(): 分别收集分组中某列的所有值到一个列表或集合中,去除重复项。
  7. first() 和 last(): 分别获取分组中某列的第一个和最后一个值。
  8. group_concat(): 连接分组中某列的所有值。注意,这不是Spark SQL的内置函数,但可以使用concat_ws函数达到类似的效果。
  9. approx_count_distinct(): 计算近似不同的值数,这对于大数据集来说通常比count(DISTINCT column)更快,但结果可能不是精确的。
  1. val countDF = df.groupBy("columnName").agg(count("*").alias("count"))
  2. val sumDF = df.groupBy("columnName").agg(sum("numericColumn").alias("sum"))
  3. val avgDF = df.groupBy("columnName").agg(avg("numericColumn").alias("avg"))
  4. val maxDF = df.groupBy("columnName").agg(max("numericColumn").alias("max"))
  5. val minDF = df.groupBy("columnName").agg(min("numericColumn").alias("min"))
  6. val listDF = df.groupBy("columnName").agg(collect_list("anotherColumn").alias("list"))
  7. val setDF = df.groupBy("columnName").agg(collect_set("anotherColumn").alias("set"))
  8. val firstDF = df.groupBy("columnName").agg(first("anotherColumn").alias("firstValue"))
  9. val lastDF = df.groupBy("columnName").agg(last("anotherColumn").alias("lastValue"))
  10. val concatDF = df.groupBy("columnName").agg(concat_ws(",", collect_list("anotherColumn")).alias("concatenated"))
  11. val approxCountDF = df.groupBy("columnName").agg(approx_count_distinct("anotherColumn").alias("approxCount"))

distinct:

获取指定列的唯一值

df.select("columnName").distinct()

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号