赞
踩
环境介绍(hadoop、spark、jdk)
- HDFS常用命令:
- hadoop fs & hdfs dfs
- 注:path 为路径 src为文件路径 dist 为文件夹
- -help[cmd] 显示命令的帮助信息
- -ls(r) 显示当前目录下的所有文件 -R层层循出文件夹
- -du(s) 显示目录中所有文件大小
- -count[-q] 显示当前目录下的所有文件大小
- -mv 移动多个文件目录到目标目录
- -cp 复制多个文件到目标目录
- -rm(r) 删除文件(夹)
- -put 本地文件复制到hdfs
- -copyFromLocal 本地文件复制到hdfs
- -moveFromLocal 本地文件移动到hdfs
- -get[-ignoreCrc] 复制文件到本地,可以忽略crc校验
- -getmerge 将源目录中的所有文件排序合并到一个文件中
- -appendToFile 将内容追加到指定文档中
- -cat 在终端显示文件内容
- -text 在终端显示文件内容
- -copyToLocal[-ignoreCrc] 复制文件到本地
- -moveToLocal 移动文件到本地
- -mkdir 创建文件夹 后跟-p 可以创建不存在的父路径
- -touchz 创建一个空文件
一、文件上传
hdfs上传:
hadoop fs -mkdir -p /data //创建目录
hadoop fs -put /data/wenjian1.csv /data //上传文件
(hadoop fs或者hdfs dfs上传都可以,命令区别不大)
二、修改文件名(根据自身情况而定)
这边建议有中文名的都改为英文名,有特殊字符的都改成标准的格式,以免后续报错
下面假设我们上传的文件名称为(文件1.csv)
hadoop fs -mv /data/文件1.csv /data/wenjian1.csv
三、多文件合并
(1)该命令为data1文件和data2文件末尾追加到data3.csv文件中,做到多文件合并成一个新的文件
hadoop fs -appendToFile /data/data1.csv /data/data2.csv /data/data3.csv
(2)在这个例子中,file1.txt
的内容会首先出现在 merged_data.txt
中,然后是 file2.txt
的内容,最后是 file3.txt
的内容。达成合并文件目的
hdfs dfs -getmerge /user/data/file1.txt /user/data/file2.txt /user/data/file3.txt merged_data.txt
四、读取文件
sc.textFile("path")
来读取Text文件,并且可以使用saveAsTextFile(path)
将RDD保存为Text文件。sc.sequenceFile[KeyClass, ValueClass]("path")
来读取,并可以创建RDD保存为Sequence文件。sc.objectFile[KeyClass, ValueClass]("path")
来读取val df = spark.read.option("header", "true").option("sep", ",").csv("/data/wenjian1.csv")
val df =
:这是Scala中的声明变量的方式。这里声明了一个名为df
的变量,它将被赋予一个DataFrame类型的值。spark.read
:spark
是一个SparkSession的实例,它是Spark所有功能的入口点。read
方法用于从外部数据源读取数据。.option("header", "true")
:这是为读取操作设置的一个选项。header
选项指定CSV文件的第一行是否包含列名。在这个例子中,我们设置header
为"true"
,意味着CSV文件的第一行是列名,并且这些列名将作为DataFrame的列名。.option("sep", ",")
:这也是为读取操作设置的一个选项。sep
选项指定字段之间的分隔符。在这个例子中,我们设置分隔符为逗号(,
),这是CSV文件的标准分隔符。.csv(
"/data/wenjian1.csv")
:这指定了要读取的CSV文件的路径。在这个例子中,文件位于/data/
目录下,名为wenjian1.csv
。五、显示数据
show()显示语句
df.show(5)显示前五行数据
六、处理数据
常用语句
select 选择 :可以选择指定行,指定类,并且可以重名
where 筛选 :给筛选的列值等于给定值的行
filter 筛选:可以对数据进行筛选
groupby 分组:对数据进行分组
distinct 去重 :对重复数据进行清洗
select:
对“学校”列进行去重:
val uniquewenjian = df.select("学校").distinct()
选择指定列
val df1 = df.select("columnName1", "columnName2")
选择指定列,并重命名
val df1 =df.select($"columnName",$"columnName")
where:
筛选指定列值等于给定值的行
val df1= df.where("columnName = 'value'")
fitter:
筛选列值大于 5的行
val df1 = df.filter("columnName > 5")
groupBy():
按指定列分组,并对每组 进行聚合计数
val df1 = df.groupBy("columnName").agg(count("columnName"))
对多列进行分组,并对每组进行聚合计数,这段代码首先创建了一个SparkSession实例,然后创建了一个包含三列("columnName1", "columnName2", "col")的DataFrame。之后,使用groupBy
方法按照"columnName1"和"columnName2"进行分组,然后调用agg
方法进行聚合,并使用mean
函数计算"col"列的平均值,将结果列命名为"m"。最后,使用show
方法打印出分组聚合后的结果。
val df1 = df.groupBy("columnName1", "columnName2").agg(mean("student").alias("s1"))
(补充)以下是一些常用的聚合函数及其在DataFrame中的应用:
- count(): 用于计算分组中的记录数。
- sum(): 计算分组中某列的总和。
- avg(): 计算分组中某列的平均值。
- max(): 获取分组中某列的最大值。
- min(): 获取分组中某列的最小值。
- collect_list() 和 collect_set(): 分别收集分组中某列的所有值到一个列表或集合中,去除重复项。
- first() 和 last(): 分别获取分组中某列的第一个和最后一个值。
- group_concat(): 连接分组中某列的所有值。注意,这不是Spark SQL的内置函数,但可以使用
concat_ws
函数达到类似的效果。- approx_count_distinct(): 计算近似不同的值数,这对于大数据集来说通常比
count(DISTINCT column)
更快,但结果可能不是精确的。
- val countDF = df.groupBy("columnName").agg(count("*").alias("count"))
- val sumDF = df.groupBy("columnName").agg(sum("numericColumn").alias("sum"))
- val avgDF = df.groupBy("columnName").agg(avg("numericColumn").alias("avg"))
- val maxDF = df.groupBy("columnName").agg(max("numericColumn").alias("max"))
- val minDF = df.groupBy("columnName").agg(min("numericColumn").alias("min"))
- val listDF = df.groupBy("columnName").agg(collect_list("anotherColumn").alias("list"))
- val setDF = df.groupBy("columnName").agg(collect_set("anotherColumn").alias("set"))
- val firstDF = df.groupBy("columnName").agg(first("anotherColumn").alias("firstValue"))
- val lastDF = df.groupBy("columnName").agg(last("anotherColumn").alias("lastValue"))
- val concatDF = df.groupBy("columnName").agg(concat_ws(",", collect_list("anotherColumn")).alias("concatenated"))
- val approxCountDF = df.groupBy("columnName").agg(approx_count_distinct("anotherColumn").alias("approxCount"))
distinct:
获取指定列的唯一值
df.select("columnName").distinct()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。