当前位置:   article > 正文

spark 根据parquet文件 建表_spark-shell 根据 parquet 文件建表和写入

spark-shell 根据 parquet 文件建表和写入
def save_table(spark: SparkSession, partitionCondition:String, path: String, database: String, table_name: String, save_format: String = "PARQUET"): Unit = {
  val df: DataFrame = spark.read.parquet(s"$path/$partitionCondition")
  //day=xxxx/hour=yyyy
  val parts = partitionCondition.replaceAll("\\s", "").split("/")
  val partBuilder = new StringBuilder()
  for(p <- parts){
    partBuilder.append(p.split("=")(0)).append(" string").append(",")
  }
  if(partBuilder.nonEmpty){
    partBuilder.deleteCharAt(partBuilder.length-1)
  }
  val schema = df.schema.map(x => x.name.toLowerCase.concat(" ").concat(
    x.dataType.typeName match {
      case "integer" => "int"
      case "array" => "array<"+x.dataType.asInstanceOf[ArrayType].typeName + ">"
      case "map" => "map<"+x.dataType.asInstanceOf[MapType].keyType.typeName + "," + x.dataType.asInstanceOf[MapType].valueType.typeName + ">"
      case smt => smt
  })).mkString(",\n")
  val dropSql = s"drop table if exists ${database.toLowerCase}.${table_name.toLowerCase}"
  println(s"dropSql:\n$dropSql")
  spark.sql(dropSql)
  val createSql = if(partBuilder.nonEmpty) {
    s"""
       |Create external table ${database.toLowerCase}.${table_name.toLowerCase} ($schema) \n
       |partitioned by (${partBuilder.toString()}) \n
       |Stored As Parquet \n
       |-- inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' \n
       |-- outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' \n
       |Location '$path'
     """.stripMargin
  }else {
    s"""
       |Create external table  ${database.toLowerCase}.${table_name.toLowerCase} ($schema) \n
       |Stored As Parquet \n
       |-- inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' \n
       |-- outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' \n
       |Location '$path'
     """.stripMargin
  }
  println(s"createSql:\n$createSql")
  spark.sql(createSql)
  if(partBuilder.nonEmpty) {
    val refreshSql = s"msck repair table $database.$table_name"
    println(s"refreshSql:\n$refreshSql")
    spark.sql(refreshSql)
  }
}
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/801486
推荐阅读
相关标签
  

闽ICP备14008679号