赞
踩
首先根据模板创建一个scala项目
模板:
group:net.alchim31.maven
artifact: scala-archetype-simple
version: 1.7
repository:https://maven.aliyun.com/repository/central
2.配置项目
根目录创建路径null/bin,然后将winutils.exe在这里
导入必要的依赖,并修改scala版本
- <properties>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <encoding>UTF-8</encoding>
- <scala.version>2.11.11</scala.version>
- <scala.compat.version>2.11</scala.compat.version>
- <spec2.version>4.2.0</spec2.version>
- </properties>
-
- <!--scala依赖-->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <!--sparkcore依赖 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.compat.version}</artifactId>
- <version>2.3.2</version>
- <scope>provided</scope>
- </dependency>
-
- <!--sparksql依赖-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.compat.version}</artifactId>
- <version>2.3.2</version>
- <scope>provided</scope>
- </dependency>
-
- <!--log4j-->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>2.14.1</version>
- </dependency>

在main下创建目录resouces目录,并将log4j的配置文件放入
- package com.antg.main
-
- import org.apache.spark.sql.SQLContext
- import org.apache.spark.{SparkConf, SparkContext}
-
- object DataFrames1_6 {
- def main(args: Array[String]): Unit = {
- //创建sparkconf
- val conf = new SparkConf()
- conf.setMaster("local")
- conf.setAppName("测试DF1.6")
- //创建上下文环境
- val sc = new SparkContext(conf)
- //创建sql上下文
- val sqlContext = new SQLContext(sc)
- //读取数据
- val df = sqlContext.read.json("C:\\Users\\Administrator\\Desktop\\data.json")
- //显示全部信息
- df.show()
- //关闭上下文
- sc.stop()
- }
- }

- package com.antg.main
-
- import org.apache.spark.sql.SparkSession
-
- object DataFrames2_3 {
- def main(args: Array[String]): Unit = {
- //创建session
- val sparkSession = SparkSession.builder()
- .master("local[*]")
- .appName("dataframes2.3")
- .getOrCreate()
- //创建df
- val df = sparkSession.read.json("C:\\Users\\Administrator\\Desktop\\data.json")
-
- //虚表
- val vrTable = df.createTempView("vrTable")
-
- sparkSession.sql("select * from vrTable").show()
-
- //数据持久化
- df.repartition(2).write.format("parquet").save("./data")
-
- //关闭
- sparkSession.stop()
-
- }
- }

- package com.antg.main
-
- import org.apache.spark.sql.{Row, SparkSession}
- import org.apache.spark.sql.types.{StringType, StructField, StructType}
-
- object RDD_DF {
- def main(args: Array[String]): Unit = {
- var sparkSession = SparkSession.builder()
- .appName("test_rdd to df")
- .master("local[*]")
- .getOrCreate()
-
- var scheme = StructType(
- "stdno name classId className".split(" ").map(t => StructField(t,StringType,true))
- )
- var lineRDD = sparkSession.sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\student_mysql.txt")
- var rowRDD = lineRDD.map(_.split("\t")).map(row => Row(row(0),row(1),row(2),row(3)))
- var df = sparkSession.createDataFrame(rowRDD,scheme)
- df.show()
- df.printSchema()
- sparkSession.stop()
- }
- }

例子
- package com.antg.main
-
- import org.apache.spark.sql.SparkSession
- case class Student(name:String,age:BigInt)
- object TestDS {
- def main(args: Array[String]): Unit = {
- //创建Session
- val sparkSession = SparkSession.builder()
- .appName("ds test")
- .master("local[*]")
- .getOrCreate()
- //引入自动隐式转换
- import sparkSession.implicits._
-
- //使用基础数据类型创建DataSet
- val a = Seq(1,2,3).toDS()
-
- //使用DataSet
- a.map(_+1).collect.foreach(println)
- a.show()
-
- //使用样例类创建DS
- val b = Seq(Student("tom",22)).toDS()
- b.show()
-
- //通过导入文件创建,并使用样例类指定DS的格式
- val path = "C:\\Users\\Administrator\\Desktop\\student_data.txt"
- val c = sparkSession.read.json(path).as[Student]
- c.show()
- //由于是强类型,所以这里可以很方便的操作ds中的内容
- c.foreach(x=>println(x.age))
- }
- }

student_data.txt
- {"name":"张一","age":10,"address":"国际庄"}
- {"name":"张二","age":20}
- {"name":"张三","age":30}
- {"name":"张四","age":40}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。