当前位置:   article > 正文

[Spark SQL]Spark SQL读取Kudu,写入Hive_sparksql访问kudu表

sparksql访问kudu表

SparkUnit

Function:用于获取Spark Session

package com.example.unitl

import org.apache.spark.sql.SparkSession

object SparkUnit {
  def getLocal(appName: String): SparkSession = {
    SparkSession.builder().appName(appName).master("local[*]").getOrCreate()
  }

  def getLocal(appName: String, supportHive: Boolean): SparkSession = {
    if (supportHive) getLocal(appName,"local[*]",true)
    else getLocal(appName)
  }

  def getLocal(appName:String,master:String,supportHive:Boolean): SparkSession = {
    if (supportHive) SparkSession.builder().appName(appName).master(master).enableHiveSupport().getOrCreate()
    else  SparkSession.builder().appName(appName).master(master).getOrCreate()
  }

  def stopSs(ss:SparkSession): Unit ={
    if (ss != null) {
      ss.stop()
    }
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

log4j.properties

Function:设置控制台输出级别

# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

KTV

Function:读取kudu,写入hive。Kudu_To_Hive,简称KTV

package com.example.dao

import com.example.unitl.SparkUnit
import org.apache.spark.sql.SparkSession

object KTV {
  def getKuduTableDataFrame(ss: SparkSession): Unit = {
    // 读取kudu
    // 获取tb对象
    val kuduTb = ss.read.format("org.apache.kudu.spark.kudu")
      .option("kudu.master", "10.168.1.12:7051")
      .option("kudu.table", "impala::realtimedcs.bakup_db") // Tips:注意指定库
      .load()

    // create view
    kuduTb.createTempView("v1")

    val kudu_unit1_df = ss.sql(
      """
        |SELECT * FROM `sources_tb1`
        |WHERE `splittime` = "2021-07-11"
        |""".stripMargin)

    // print
    kudu_unit1_df.printSchema()
    kudu_unit1_df.show()

    // load of memory
    kudu_unit1_df.createOrReplaceTempView("v2")
  }

  def insertHive(ss: SparkSession): Unit = {
    // create table
    ss.sql(
      """
        |USE `bakup_db`
        |""".stripMargin)

    ss.sql(
      """
        |  CREATE TABLE IF NOT EXISTS `bak_tb1`(
        |   `id` int,
        |   `packtimestr` string,
        |   `dcs_name` string,
        |   `dcs_type` string,
        |   `dcs_value` string,
        |   `dcs_as` string,
        |   `dcs_as2` string)
        | PARTITIONED BY (
        |   `splittime` string)
        |""".stripMargin)
    println("创建表成功!")

    // create view
    ss.sql(
      """
        |INSERT INTO `bakup_db`
        |SELECT * FROM bak_tb1
        |""".stripMargin)
    println("保存成功!")
  }

  def main(args: Array[String]): Unit = {
    //get ss
    val ss = SparkUnit.getLocal("KTV", true)
    // 做动态分区, 所以要先设定partition参数
    // default是false, 需要额外下指令打开这个开关
    ss.sqlContext.setConf("hive.exec.dynamic.partition;","true");
    ss.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict");

    // 调用方法
    getKuduTableDataFrame(ss)
    insertHive(ss)

    // 关闭连接
    SparkUnit.stopSs(ss)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

运行:

运行时请将hive的配置文件 hive-site.xml文件,复制到项目resource下。

hue查看写入的数据:

略

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/417471
推荐阅读
相关标签
  

闽ICP备14008679号