赞
踩
package com.bigdata.spark.day1021 import java.util.Properties import org.apache.log4j.{Level, Logger} import org.apache.spark.sql._ /** * JDBC 数据源 */ object JDBCDataSource { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val spark: SparkSession = SparkSession.builder().appName("SQLIPLocation1").master("local[*]").getOrCreate() import spark.implicits._ //要读取数据对应表的元数据信息,然后创建DF val products: DataFrame = spark.read.format("jdbc").options( Map( "url" -> "jdbc:mysql://bigdata01:3306/test", "driver" -> "com.mysql.jdbc.Driver", "user" -> "root", "password" -> "*******", "dbtable" -> "products" ) ).load() products.show() } }
val filter: Dataset[Row] = products.filter(a=>{
a.getAs[Double](2) <= 80
})
val filter1= products.filter($"price"<=80)
filter.show()
filter1.show()
val props = new Properties()
props.put("user","root")
props.put("password","********")
//保存数据
products.write.mode(SaveMode.Ignore).jdbc("jdbc:mysql://bigdata01:3306/test","products1",props)
要注意写入Mysql后可能会出现中文乱码问题。具体可参考Mysql修改编码配置文件
products.select($"id",$"name",$"price").write.save("D:\\testFile\\product")
可以看出, Spark SQL默认保存的文件格式:parquet文件 (列式存储)
products.select($"id",$"name",$"price").write.format("csv").save("D:\\testFile\\product1")
products.write.json("D:\\testFile\\product2")
val result: DataFrame = spark.read.json("D:\\testFile\\product2")
result.write.parquet("D:\\testFile\\product3")
package com.bigdata.spark.day1021 import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{DataFrame, SparkSession} object SparkDataSource { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val spark: SparkSession = SparkSession.builder().appName("Parquet").master("local[*]").getOrCreate() import spark.implicits._ //创建第一个Parquet文件(RDD->Parquet) val df1: DataFrame = spark.sparkContext.makeRDD(1 to 5).map(a => (a,a * 2)).toDF("single","double") df1.write.parquet("D:\\testFile\\product4\\key=1") //创建第二个Parquet文件 val df2: DataFrame = spark.sparkContext.makeRDD(6 to 10).map(a => (a,a * 3)).toDF("single","trible") df2.write.parquet("D:\\testFile\\product4\\key=2") //合并生成的Parquet文件 val df3: DataFrame = spark.read.option("mergeSchema","true").parquet("D:\\testFile\\product4") //spark.read.option("mergeSchema","true").parquet("D:\\testFile\\product6") df3.show() df3.printSchema()
spark.read.json("D:\\testFile\\product2")
spark.read.format("json").load("D:\\testFile\\product2")
hive-site.xml
core-site.xml
hdfs-site.xml
bin/spark-shell --master spark://bigdata01:7077 --jars /root/mysql-connector-java-5.1.44.jar
mysql> GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '***********' WITH GRANT OPTION;
flush privileges;
要想hive可以运行在spark上,spark启动的时候需要连接hive的元数据库,来获取表的schema信息。此时就需要知道hive的元数据库位置。所以就需要一个hive的配置文件(hive-site.xml)
还需要让sparkSQL知道hdfs在哪里,也就是namenode在哪里
hive-site.xml
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>******************</value> <description>password to use against metastore database</description> </property> </configuration>
./spark-sql --master spark://bigdata01:7077 --driver-class-path /root/mysql-connector-java-5.1.44.jar
package com.bigdata.spark.day1021 import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession object HIveTableSource { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) //Hive运行在Spark上,一定要开启spark对hive的支持 val spark = SparkSession.builder() .appName("HiveOnSpark") .master("spark://bigdata01:7077") .enableHiveSupport()//启用spark对hive的支持,可以兼容hive语法 .getOrCreate() //想要使用hive的元数据库,必须指定hive元数据库的位置,添加hive-site.xml到当前程序的classpath下 import spark.sql sql("select * from wjf.src").show() spark.close() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。