赞
踩
Parquet文件 是一种列式存储格式,以二进制存储,文件中包含数据和元数据
idea代码
//写parquet文件 import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types._ object ParDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[*]") .appName("parquetDemo") .getOrCreate() val sc = spark.sparkContext val list = List( ("张三", "red", Array(3, 4, 5)), ("李四", "black", Array(13, 14, 55)), ("王五", "orange", Array(23, 64, 45)), ("赵六", "blue", Array(33, 34, 35)) ) val rdd = sc.parallelize(list) //定义表结构 val schema = StructType( Array( StructField("name",StringType), StructField("color",StringType), StructField("num",ArrayType(IntegerType)) ) ) val rowRdd = rdd.map(x => Row(x._1,x._2,x._3)) val df = spark.createDataFrame(rowRdd,schema) //写parquet文件 df.write.parquet("out/color") //读取 val frame = spark.read.parquet("out/color") frame.printSchema() frame.show() } }
方式一
直接spark-shell中操作:
//在spark中输入命令
val df = spark.table("表名")
注:默认default库,可以用 “库名.表名” 的方式
方式二:使用idea操作
//pom中导入依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.36</version> </dependency> //代码部分 val spark = SparkSession.builder() .master("local[*]") .appName("onHive") .config("hive.metastore.uris","thrift://hadoop001:9083") .enableHiveSupport() .getOrCreate() spark.sql("show databases").show //spark默认连接hive的Default库 val df=spark.sql("select * from toronto") val ds = df.where(df("ssn").startsWith("158")) val ds2 = df.filter(df("ssn").startsWith("111")) ds.show() ds2.show()
运行前需要先启动该metastore
nohup hive --service metastore &
方式一:直接在spark中操作
//更换需要连接的主机ip地址或主机名,表名,用户名密码
val url="jdbc:mysql://hadoop001:3306/hive"
val tbname="TBLS"
val prop=new java.util.Properties
prop.setProperty("user","root")
prop.setProperty("password","ok")
prop.setProperty("driver","com.mysql.jdbc.Driver")
val jdbcDF = spark.read.jdbc(url,tbname,prop)
jdbcDF.show
方式二:idea连接
val spark = SparkSession.builder() .master("local[*]") .appName("onMysql").getOrCreate() val url="jdbc:mysql://hadoop001:3306/school" val user="root" val password="ok" val driver="com.mysql.jdbc.Driver" val prop=new Properties() prop.setProperty("user",user) prop.setProperty("password",password) prop.setProperty("driver",driver) val df = spark.read.jdbc(url,"Score",prop) df.show() val cnt = df.where(df("s_score").startsWith("8")).count() println(cnt) // 7 val frame = df.groupBy("s_id").count().show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。