赞
踩
本文主要介绍spark读取hdfs文本文件,并利用spark-sql进行join操作,最后将结果写入hdfs文件系统,话不多说,直接上代码。代码是基于在windows上安装的hadoop,提交到yarn上可以不加hdfs文件的根路径。
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkHdfsOps")
val ss = SparkSession.builder()
.config(conf)
.getOrCreate()
import ss.implicits._
val sc = ss.sparkContext
可以通过sparkContext或者sparkSession来读取,但是各自的返回值类型不一样,后续的处理也有细微差别。
sparkContext读取文本后返回的是RDD[String]
sparkSession返回的是DataFrame,且只有一列,需要将DataFrame转成RDD,转换之后变成:RDD[Row]。如果需要对RDD[Row]作map操作,并且还需要对每一行数据进行处理的话,还得增加一个步骤,将RDD[Row]转成String:
RDD[Row].map(_.mkString(’’))
RDD[Row].map(_.getString(0))
注意:如果line是以空(null)结尾,分割之后会读不出来最后一个位置的元素,需要对map之后的每一行作处理,将最后一个位置补null或者空字符串
//本地hdfs路径 val bp = "hdfs://localhost:9000/fenghuo/test/" //sc.textFile返回的是rdd[String] val math_rdd = sc.textFile(bp+"math/math.txt") .map(line => { val fields = line.split("\t") (fields(0),fields(1),fields(2)) }).foreach(println) val math_df = ss.read.text(bp+"math/math.txt") .rdd //转成rdd[ROW] .map(line => { val ls = line.getString(0) val fields = ls.split("\t") if(ls.endsWith("\t")){ (fields(0),fields(1),null) }else{ (fields(0),fields(1),fields(2)) } }).toDF("name","role","team").show(10)
结果如下:
**'sc.textFile()的结果:'** (罗斯,PG,KNICKS) (威少,PG,LAKERS) (霍华德,C,LAKERS) (哈登,SG,NETS) (锡安,PF,PELICANS) (老詹,SF,LAKERS) (科比,SG,LAKERS) **'ss.read.text()的结果:'** +------+----+--------+ | name|role| team| +------+----+--------+ | 威少| PG| LAKERS| | 哈登| SG| NETS| | 老詹| SF| LAKERS| | 科比| SG| LAKERS| | 罗斯| PG| KNICKS| |霍华德| C| LAKERS| | 锡安| PF|PELICANS| +------+----+--------+
join时候需要做null值处理,在写入hdfs文件时,如果有null值,null不会被写入,特别是导入hive,可能会导致数据错位,常用方法是将null值转成空字符串:""。
math_df.createOrReplaceTempView("math_df_table")
sports_df.createOrReplaceTempView("sports_df_table")
val join_df = ss.sql(
"select " +
"nvl(a.name,'') as name," +
"nvl(a.age,'') as age," +
"nvl(a.skills,'') as skills," +
"nvl(b.role,'') as role," +
"nvl(b.team,'') as team" +
" from sports_df_table a left join math_df_table b on " +
"a.name=b.name " +
"order by age desc").show(10)
结果如下:
+------+---+----------+----+------+
| name|age| skills|role| team|
+------+---+----------+----+------+
|艾弗森| 41| CROSSOVER| | |
| 麦迪| 41| | | |
| 科比| 40| FADEWAY| SG|LAKERS|
| 韦德| 33|LIGHTENING| | |
| 老詹| 33| KING| SF|LAKERS|
| 威少| 30| DUNK| PG|LAKERS|
|杜兰特| 30| SCORE| | |
| | 30| | | |
| 罗斯| 30| SPEED| PG|KNICKS|
| 哈登| 29| | SG| NETS|
+------+---+----------+----+------+
通过DataFrame.write.text()写入hdfs文件。
注意:直接用dataframe.write方式写入文本文件时候,需要注意dataframe只能有一列,否则会报错。
解决方法:将DataFrame合并成一列
//方法一:
val allColumnsLine = join_df.columns.mkString(",")
val join_result_df = join_df.selectExpr(s"concat_ws('\t',$allColumnsLine) " +
s"as allColumns2OneColumn")
//方法二:
val join_result_df1:DataFrame = join_df.selectExpr(s"concat_ws('\t',name,age,skills,role,team) " +
s"as allColumns2OneColumn")
join_result_df1.repartition(1)
.write.mode("overwrite")
.text(path = bp+"spark/joinResult")
结果如下:
package com.zero.scala.sparkCore import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, SparkSession} object ReadAndWriteHdfs { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("SparkHdfsOps") val ss = SparkSession.builder() .config(conf) .getOrCreate() val sc = ss.sparkContext import ss.implicits._ //本地hdfs路径 val bp = "hdfs://localhost:9000/fenghuo/test/" //sc.textFile返回的是rdd[String] val math_rdd = sc.textFile(bp+"math/math.txt") .map(line => { val fields = line.split("\t") (fields(0),fields(1),fields(2)) }) val math_df = ss.read.text(bp+"math/math.txt") .rdd //转成rdd[ROW] .map(line => { val ls = line.getString(0) val fields = ls.split("\t") if(ls.endsWith("\t")){ (fields(0),fields(1),null) }else{ (fields(0),fields(1),fields(2)) } }).toDF("name","role","team") val sports_df = ss.read.text(bp+"sports/*") .rdd .map(line => { val ls = line.getString(0) val fields = ls.split("\t") val length = fields.length if(ls.endsWith("\t")){ (fields(0),fields(1),null) }else{ (fields(0),fields(1),fields(2)) } }).toDF("name","age","skills") //创建表,作join操作 println("创建临时表 math_df_table") math_df.createOrReplaceTempView("math_df_table") println("创建临时表 sports_df_table") sports_df.createOrReplaceTempView("sports_df_table") println("join结果如下: ") ss.sql("select a.name,a.age,a.skills,b.role,b.team from sports_df_table a left join math_df_table b on " + "a.name=b.name").show(10) val join_df = ss.sql( "select " + "nvl(a.name,'') as name," + "nvl(a.age,'') as age," + "nvl(a.skills,'') as skills," + "nvl(b.role,'') as role," + "nvl(b.team,'') as team" + " from sports_df_table a left join math_df_table b on " + "a.name=b.name " + "order by age desc") //方法一:将df变成一列,且每一行都是以","分割的String val allColumnsLine = join_df.columns.mkString(",") val join_result_df = join_df.selectExpr(s"concat_ws('\t',$allColumnsLine) " + s"as allColumns2OneColumn").show(10) //直接拼接列名 val join_result_df1:DataFrame = join_df.selectExpr(s"concat_ws('\t',name,age,skills,role,team) " + s"as allColumns2OneColumn") join_result_df1.repartition(1) .write.mode("overwrite") .text(path = bp+"spark/joinResult") // Thread.sleep(20000) ss.close() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。