当前位置:   article > 正文

spark基于dataFrame和sparksql对hdfs文件夹下多个文件进行读、写、join等操作_pspark读写hdfs

pspark读写hdfs

  本文主要介绍spark读取hdfs文本文件,并利用spark-sql进行join操作,最后将结果写入hdfs文件系统,话不多说,直接上代码。代码是基于在windows上安装的hadoop,提交到yarn上可以不加hdfs文件的根路径。

1,准备数据文件

math.txt

在这里插入图片描述
在这里插入图片描述

sports.txt

在这里插入图片描述

在这里插入图片描述

2,代码实现

2.1,初始化sparkContext或者sparkSession

    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkHdfsOps")
    val ss  = SparkSession.builder()
      .config(conf)
      .getOrCreate()
	import ss.implicits._
	
    val sc = ss.sparkContext
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.2,读取hdfs上的文件

  可以通过sparkContext或者sparkSession来读取,但是各自的返回值类型不一样,后续的处理也有细微差别。
  sparkContext读取文本后返回的是RDD[String]
  sparkSession返回的是DataFrame,且只有一列,需要将DataFrame转成RDD,转换之后变成:RDD[Row]。如果需要对RDD[Row]作map操作,并且还需要对每一行数据进行处理的话,还得增加一个步骤,将RDD[Row]转成String:
  RDD[Row].map(_.mkString(’’))

  RDD[Row].map(_.getString(0))

注意:如果line是以空(null)结尾,分割之后会读不出来最后一个位置的元素,需要对map之后的每一行作处理,将最后一个位置补null或者空字符串

    //本地hdfs路径
    val bp = "hdfs://localhost:9000/fenghuo/test/"
    //sc.textFile返回的是rdd[String]
    val math_rdd = sc.textFile(bp+"math/math.txt")
        .map(line => {
          val fields = line.split("\t")
          (fields(0),fields(1),fields(2))
        }).foreach(println)

    val math_df = ss.read.text(bp+"math/math.txt")
      .rdd //转成rdd[ROW]
      .map(line => {
        val ls = line.getString(0)
        val fields = ls.split("\t")
        if(ls.endsWith("\t")){
          (fields(0),fields(1),null)
        }else{
          (fields(0),fields(1),fields(2))
        }
      }).toDF("name","role","team").show(10)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

结果如下:

**'sc.textFile()的结果:'**
(罗斯,PG,KNICKS)
(威少,PG,LAKERS)
(霍华德,C,LAKERS)
(哈登,SG,NETS)
(锡安,PF,PELICANS)
(老詹,SF,LAKERS)
(科比,SG,LAKERS)

**'ss.read.text()的结果:'**
+------+----+--------+
|  name|role|    team|
+------+----+--------+
|  威少|  PG|  LAKERS|
|  哈登|  SG|    NETS|
|  老詹|  SF|  LAKERS|
|  科比|  SG|  LAKERS|
|  罗斯|  PG|  KNICKS|
|霍华德|   C|  LAKERS|
|  锡安|  PF|PELICANS|
+------+----+--------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

2.3,建临时表,做join操作

  join时候需要做null值处理,在写入hdfs文件时,如果有null值,null不会被写入,特别是导入hive,可能会导致数据错位,常用方法是将null值转成空字符串:""。

    math_df.createOrReplaceTempView("math_df_table")
    sports_df.createOrReplaceTempView("sports_df_table")
    val join_df = ss.sql(
      "select " +
        "nvl(a.name,'') as name," +
        "nvl(a.age,'') as age," +
        "nvl(a.skills,'') as skills," +
        "nvl(b.role,'') as role," +
        "nvl(b.team,'') as team" +
        " from sports_df_table a left join math_df_table b on " +
        "a.name=b.name " +
        "order by age desc").show(10)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

结果如下:

+------+---+----------+----+------+
|  name|age|    skills|role|  team|
+------+---+----------+----+------+
|艾弗森| 41| CROSSOVER|    |      |
|  麦迪| 41|          |    |      |
|  科比| 40|   FADEWAY|  SG|LAKERS|
|  韦德| 33|LIGHTENING|    |      |
|  老詹| 33|      KING|  SF|LAKERS|
|  威少| 30|      DUNK|  PG|LAKERS|
|杜兰特| 30|     SCORE|    |      |
|      | 30|          |    |      |
|  罗斯| 30|     SPEED|  PG|KNICKS|
|  哈登| 29|          |  SG|  NETS|
+------+---+----------+----+------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.4,写入hdfs文件

  通过DataFrame.write.text()写入hdfs文件。
注意:直接用dataframe.write方式写入文本文件时候,需要注意dataframe只能有一列,否则会报错。
解决方法:将DataFrame合并成一列

	//方法一:
	val allColumnsLine = join_df.columns.mkString(",")
    val join_result_df = join_df.selectExpr(s"concat_ws('\t',$allColumnsLine) " +
      s"as allColumns2OneColumn")
    //方法二:
    val join_result_df1:DataFrame = join_df.selectExpr(s"concat_ws('\t',name,age,skills,role,team) " +
      s"as allColumns2OneColumn")
    join_result_df1.repartition(1)
      .write.mode("overwrite")
      .text(path = bp+"spark/joinResult")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

结果如下:
在这里插入图片描述
在这里插入图片描述

3,项目整体代码如下,希望能帮到你

package com.zero.scala.sparkCore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadAndWriteHdfs {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkHdfsOps")
    val ss  = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    val sc = ss.sparkContext
    import ss.implicits._

    //本地hdfs路径
    val bp = "hdfs://localhost:9000/fenghuo/test/"
    //sc.textFile返回的是rdd[String]
    val math_rdd = sc.textFile(bp+"math/math.txt")
        .map(line => {
          val fields = line.split("\t")
          (fields(0),fields(1),fields(2))
        })

    val math_df = ss.read.text(bp+"math/math.txt")
      .rdd //转成rdd[ROW]
      .map(line => {
        val ls = line.getString(0)
        val fields = ls.split("\t")
        if(ls.endsWith("\t")){
          (fields(0),fields(1),null)
        }else{
          (fields(0),fields(1),fields(2))
        }
      }).toDF("name","role","team")

    val sports_df = ss.read.text(bp+"sports/*")
      .rdd
      .map(line => {
        val ls = line.getString(0)
        val fields = ls.split("\t")
        val length = fields.length
          if(ls.endsWith("\t")){
          (fields(0),fields(1),null)
        }else{
          (fields(0),fields(1),fields(2))
        }
      }).toDF("name","age","skills")

    //创建表,作join操作
    println("创建临时表 math_df_table")
    math_df.createOrReplaceTempView("math_df_table")
    println("创建临时表 sports_df_table")
    sports_df.createOrReplaceTempView("sports_df_table")
    println("join结果如下: ")
    ss.sql("select a.name,a.age,a.skills,b.role,b.team from sports_df_table a left join math_df_table b on " +
      "a.name=b.name").show(10)

    val join_df = ss.sql(
      "select " +
        "nvl(a.name,'') as name," +
        "nvl(a.age,'') as age," +
        "nvl(a.skills,'') as skills," +
        "nvl(b.role,'') as role," +
        "nvl(b.team,'') as team" +
        " from sports_df_table a left join math_df_table b on " +
        "a.name=b.name " +
        "order by age desc")
    //方法一:将df变成一列,且每一行都是以","分割的String
    val allColumnsLine = join_df.columns.mkString(",")
    val join_result_df = join_df.selectExpr(s"concat_ws('\t',$allColumnsLine) " +
      s"as allColumns2OneColumn").show(10)
    //直接拼接列名
    val join_result_df1:DataFrame = join_df.selectExpr(s"concat_ws('\t',name,age,skills,role,team) " +
      s"as allColumns2OneColumn")
    join_result_df1.repartition(1)
      .write.mode("overwrite")
      .text(path = bp+"spark/joinResult")
//    Thread.sleep(20000)
    ss.close()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/662995
推荐阅读
相关标签
  

闽ICP备14008679号