赞
踩
def save_table(spark: SparkSession, partitionCondition:String, path: String, database: String, table_name: String, save_format: String = "PARQUET"): Unit = { val df: DataFrame = spark.read.parquet(s"$path/$partitionCondition") //day=xxxx/hour=yyyy val parts = partitionCondition.replaceAll("\\s", "").split("/") val partBuilder = new StringBuilder() for(p <- parts){ partBuilder.append(p.split("=")(0)).append(" string").append(",") } if(partBuilder.nonEmpty){ partBuilder.deleteCharAt(partBuilder.length-1) } val schema = df.schema.map(x => x.name.toLowerCase.concat(" ").concat( x.dataType.typeName match { case "integer" => "int" case "array" => "array<"+x.dataType.asInstanceOf[ArrayType].typeName + ">" case "map" => "map<"+x.dataType.asInstanceOf[MapType].keyType.typeName + "," + x.dataType.asInstanceOf[MapType].valueType.typeName + ">" case smt => smt })).mkString(",\n") val dropSql = s"drop table if exists ${database.toLowerCase}.${table_name.toLowerCase}" println(s"dropSql:\n$dropSql") spark.sql(dropSql) val createSql = if(partBuilder.nonEmpty) { s""" |Create external table ${database.toLowerCase}.${table_name.toLowerCase} ($schema) \n |partitioned by (${partBuilder.toString()}) \n |Stored As Parquet \n |-- inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' \n |-- outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' \n |Location '$path' """.stripMargin }else { s""" |Create external table ${database.toLowerCase}.${table_name.toLowerCase} ($schema) \n |Stored As Parquet \n |-- inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' \n |-- outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' \n |Location '$path' """.stripMargin } println(s"createSql:\n$createSql") spark.sql(createSql) if(partBuilder.nonEmpty) { val refreshSql = s"msck repair table $database.$table_name" println(s"refreshSql:\n$refreshSql") spark.sql(refreshSql) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。