赞
踩
dataframe是什么
DataFrame与RDD的区别
dataFrame和rdd优缺点
DSL语法风格
它就是dataFrame自身提供的API
1、打印DataFrame的schema printlnSchema 2、查看dataFrame中的数据 show 3、取出第一位 first head(N) 取出前N个 4、查看某个字段 peopleDF.select("name").show peopleDF.select(col("name")).show peopleDF.select($"name").show peopleDF.select(peopleDF("name")).show 5、取出多个字段 peopleDF.select("name","age").show 6、让age字段+1 peopleDF.select(col("age")+1).show 7、过滤出年龄大于30的人数 peopleDF.filter($"age" > 30).count
SQL风格语法
通过将一个dataFrame注册成一张表,接下来就可以通过sql语句操作dataFrame
1、先需要将DataFrame注册成一张临时表
personDF.registerTempTable("t_person")
2、然后通过sparkSession.sql(sql语句)操作DataFrame
sparkSession.sql("select * from t_person").show
导包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.2</version>
</dependency>
1、 通过定义case class样例类利用反射机制推断Schema
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, DataFrame, SparkSession} //todo:利用反射机制(case class )指定dataFrame的schema case class Person(id:Int,name:String,age:Int) object CaseClassSchema { def main(args: Array[String]): Unit = { //1、创建SparkSession val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate() //2、获取sparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //3、读取数据文件 val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(x=>x.split(" ")) //4、将rdd与样例类关联 val personRDD: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //5、获取DataFrame //手动导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRDD.toDF //---------------DSL语法-------------start //1、打印dataframe的schema元信息 personDF.printSchema() //2、 显示dataFrame结果数据 personDF.show() personDF.show(2) //3、显示第一条数据 println(personDF.head()) //4、查询name字段结果数据 personDF.select("name").show() personDF.select($"name").show() personDF.select(new Column("name")).show() //5、把age字段结果加1 personDF.select($"id",$"name",$"age",$"age"+1).show() //6、把age 大于30的人过滤出来 personDF.filter($"age" > 30).show() //7、按照age进行分组 personDF.groupBy("age").count().show() //---------------DSL语法-------------end //--------------SQL语法--------------start //把dataFrame注册成一张表 personDF.createTempView("t_person") //通过sparksession调用sql方法 spark.sql("select * from t_person").show() spark.sql("select * from t_person where name='lisi'").show() spark.sql("select * from t_person order by age desc ").show() //--------------SQL语法--------------end //关闭操作 sc.stop() spark.stop() } }
2、通过StructType指定schema
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} //todo:将rdd转换成dataFrame,通过StructType来指定schema object SparkSqlSchema { def main(args: Array[String]): Unit = { //1、创建sparkSession val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate() //2、获取得到sparkContext val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //3、读取数据文件 val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(_.split(" ")) //4、将rdd与Row类型关联 val rowRDD: RDD[Row] = data.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //5、指定schema val schema:StructType=StructType( StructField("id", IntegerType, true) :: StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: Nil) val personDF: DataFrame = spark.createDataFrame(rowRDD,schema) //打印schema personDF.printSchema() //显示数据 personDF.show() //dataframe注册成一张表 personDF.createTempView("t_person") spark.sql("select * from t_person").show() //关闭 sc.stop() spark.stop() } }
导包
<!--引入 spark-hive依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.0.2</version>
</dependency>
代码开发
import org.apache.spark.sql.SparkSession //todo:利用sparksql操作hivesql object HiveSupport { def main(args: Array[String]): Unit = { //1、创建sparkSession val spark: SparkSession = SparkSession.builder() .appName("HiveSupport") .master("local[2]") .enableHiveSupport() //开启对hivesql的支持 .getOrCreate() //2、利用sparkSession操作hivesql //2.1 创建hive表 //spark.sql("create table if not exists student(id int,name string,age int) row format delimited fields terminated by ',' ") //2.2 加载数据到hive表中 //spark.sql("load data local inpath './data/student.txt' into table student") //2.3 查询表中数据 spark.sql("select * from student").show() //3、关闭 spark.stop() } }
代码开发
import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} //todo:利用sparksql从mysql表中读取数据 object DataFromMysql { def main(args: Array[String]): Unit = { //1、创建sparkSession val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() //2、通过sparksession读取mysql表中的数据 //定义url val url="jdbc:mysql://192.168.200.100:3306/spark" //定义表名 val tableName="iplocation" //定义相关的属性 val properties=new Properties properties.setProperty("user","root") properties.setProperty("password","123456") val jdbcDataFrame: DataFrame = spark.read.jdbc(url,tableName,properties) //显示schema jdbcDataFrame.printSchema() //打印结果数据 jdbcDataFrame.show() //关闭 spark.stop() } }
spark-shell 操作读取mysql表中
启动spark-shell脚本
spark-shell \
--master spark://hdp-node-01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
--jars /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar \
--driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar
执行代码
val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.200.100:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load()
代码开发
import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} //todo:利用sparksql将结果数据写入到mysql表中 case class Student(id:Int,name:String,age:Int) object Data2Mysql { def main(args: Array[String]): Unit = { //1、创建sparkSession val spark: SparkSession = SparkSession.builder().appName("Data2Mysql").getOrCreate() //2、获取sparkcontext val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //3、读取数据文件 val rdd: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(" ")) //4、将样例类与rdd进行关联 val studentRDD: RDD[Student] = rdd.map(x=>Student(x(0).toInt,x(1),x(2).toInt)) //5、将rdd转换成dataframe import spark.implicits._ val studentDF: DataFrame = studentRDD.toDF //打印结果数据 studentDF.show() //dataFrame注册成一张表 studentDF.createTempView("t_student") //通过sparkSession操作这个表 val result: DataFrame = spark.sql("select * from t_student order by age desc") //把结果数据写入到mysql表中 //定义url val url="jdbc:mysql://192.168.200.100:3306/spark" //定义表名 val tableName=args(1) //定义相关的属性 val properties=new Properties properties.setProperty("user","root") properties.setProperty("password","123456") //mode:指定数据插入模式 //overwrite: 覆盖(事先会创建一张表) //append: 追加(事先会创建一张表) //ignore:忽略(如果当前这个表已经存在,不执行操作) //error:如果当前这个表存在,这个时候就报错 result.write.mode("append").jdbc(url,tableName,properties) //关闭 sc.stop() spark.stop() } }
把程序打成jar包 提交到集群中运行
spark-submit --master spark://node1:7077 --class cn.包名.sql.Data2Mysql --executor-memory 1g --total-executor-cores 2 --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar original-spark_class06-2.0.jar /person.txt student100
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。