赞
踩
package com.sparkSql import java.io.FileInputStream import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Demo07 { def main(args: Array[String]): Unit = { // 1 ,spark 上下文 val spark = SparkSession.builder() .master("local[2]") .config("spark.eventLog.enabled", "false") .config("spark.driver.memory", "2g") .config("spark.executor.memory", "2g") .appName("SparkDemoFromS3") .getOrCreate() // 1 ,日志级别 spark.sparkContext.setLogLevel("WARN") // 2 ,读资源文件 val properties = new Properties() val path = Thread.currentThread().getContextClassLoader.getResource("conf/s3.properties").getPath properties.load(new FileInputStream(path)) // 3 ,设置数据源 ( s3 ) val sc: SparkContext = spark.sparkContext sc.hadoopConfiguration.set("fs.s3a.access.key", properties.getProperty("fs.s3a.access.key")) sc.hadoopConfiguration.set("fs.s3a.secret.key", properties.getProperty("fs.s3a.secret.key")) sc.hadoopConfiguration.set("fs.s3a.endpoint", properties.getProperty("fs.s3a.endpoint")) // 4 ,DS 操作 // 1 ,隐式转换 import spark.implicits._ // 1 ,读文件 val rdd01: RDD[String] = spark.sparkContext.textFile("s3a://lifecyclebigdata/test/data/sfl/jia.csv") // 2 ,rowRdd val rowRdd: RDD[Row] = rdd01.map(line => { // 切分字符串 val arr: Array[String] = line.split("\t") Row(arr(0).toInt, arr(1), arr(2), arr(3).toInt, arr(4)) }) // 3 ,元数据 val fields: Array[StructField] = Array( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("sex", StringType, true), StructField("age", IntegerType, true), StructField("hobby", StringType, true) ) val schema: StructType = StructType(fields) // 4 ,df val df01: DataFrame = spark.createDataFrame(rowRdd,schema) // 5 ,注册表 df01.createOrReplaceTempView("jia") // 6 ,sql : 全部数据 val df02: DataFrame = spark.sql("select * from jia") println("全部数据 =================================") println("全部数据 =================================") df02.show() // 7 ,聚合运算 ( 共几条 ) println("共几条 =================================") println("共几条 =================================") spark.sql("select count(1) cnt from jia").show() // 8 ,聚合运算 ( 共几个性别 ) println("共几个性别 =================================") println("共几个性别 =================================") spark.sql("select sex from jia group by sex").show() spark.sql("select count(1) sexNum from (select sex from jia group by sex) sexGroup").show() // 9 ,聚合运算 ( 最小值,平均值,最大值 ) println("最小值,平均值,最大值 =================================") println("最小值,平均值,最大值 =================================") spark.sql("select min(id) minid,avg(id) avgid,max(id) maxid from jia").show() spark.stop() } }
package com.sparkSql import java.io.FileInputStream import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} object Demo08 { def main(args: Array[String]): Unit = { // 1 ,spark 上下文 val spark = SparkSession.builder() .master("local[2]") .config("spark.eventLog.enabled", "false") .config("spark.driver.memory", "2g") .config("spark.executor.memory", "2g") .appName("SparkDemoFromS3") .getOrCreate() // 1 ,日志级别 spark.sparkContext.setLogLevel("WARN") // 2 ,读资源文件 val properties = new Properties() val path = Thread.currentThread().getContextClassLoader.getResource("conf/s3.properties").getPath properties.load(new FileInputStream(path)) // 3 ,设置数据源 ( s3 ) val sc: SparkContext = spark.sparkContext sc.hadoopConfiguration.set("fs.s3a.access.key", properties.getProperty("fs.s3a.access.key")) sc.hadoopConfiguration.set("fs.s3a.secret.key", properties.getProperty("fs.s3a.secret.key")) sc.hadoopConfiguration.set("fs.s3a.endpoint", properties.getProperty("fs.s3a.endpoint")) // 4 ,DS 操作 // 1 ,隐式转换 import spark.implicits._ // 1 ,读文件 val rdd01: RDD[String] = spark.sparkContext.textFile("s3a://lifecyclebigdata/test/data/sfl/jia.csv") // 2 ,rowRdd val rowRdd: RDD[Row] = rdd01.map(line => { // 切分字符串 val arr: Array[String] = line.split("\t") Row(arr(0).toInt, arr(1), arr(2), arr(3).toInt, arr(4)) }) // 3 ,元数据 val fields: Array[StructField] = Array( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("sex", StringType, true), StructField("age", IntegerType, true), StructField("hobby", StringType, true) ) val schema: StructType = StructType(fields) // 4 ,df val df01: DataFrame = spark.createDataFrame(rowRdd,schema) // 5 ,注册表 df01.createOrReplaceTempView("jia") // 6 ,sql : 全部数据 val df02: DataFrame = spark.sql("select * from jia") println("全部数据 =================================") println("全部数据 =================================") df02.show() // 自定义 UDF spark.udf.register("len", (str: String) => str.length()) // 使用 udf spark.sql("select id,name,len(name) lenname from jia").show spark.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。