赞
踩
注意:
1、可以两种方式读取json格式的文件。
2、df.show()默认显示前20行数据。
3、DataFrame原生API可以操作DataFrame。
4、注册成临时表时,表中的列默认按ascii顺序显示列。
- df.createTempView("mytable")
- df.createOrReplaceTempView("mytable")
- df.createGlobalTempView("mytable")
- df.createOrReplaceGlobalTempView("mytable")
- Session.sql("select * from global_temp.mytable").show()
5、DataFrame是一个Row类型的RDD,df.rdd()/df.javaRdd()。
java
- SparkConf conf = new SparkConf();
- conf.setMaster("local").setAppName("jsonfile");
- SparkContext sc = new SparkContext(conf);
-
- //创建sqlContext
- SQLContext sqlContext = new SQLContext(sc);
-
- /**
- * DataFrame的底层是一个一个的RDD RDD的泛型是Row类型。
- * 以下两种方式都可以读取json格式的文件
- */
- DataFrame df = sqlContext.read().format("json").load("sparksql/json");
- // DataFrame df2 = sqlContext.read().json("sparksql/json.txt");
- // df2.show();
-
- /**
- * DataFrame转换成RDD
- */
- RDD<Row> rdd = df.rdd();
- /**
- * 显示 DataFrame中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)
- * 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。
- */
- // df.show();
- /**
- * 树形的形式显示schema信息
- */
- df.printSchema();
- /**
- * dataFram自带的API 操作DataFrame
- */
- //select name from table
- // df.select("name").show();
- //select name age+10 as addage from table
- df.select(df.col("name"),df.col("age").plus(10).alias("addage")).show();
- //select name ,age from table where age>19
- df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show();
- //select count(*) from table group by age
- df.groupBy(df.col("age")).count().show();
-
- /**
- * 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘
- */
- df.registerTempTable("jtable");
-
- DataFrame sql = sqlContext.sql("select age,count(1) from jtable group by age");
- DataFrame sql2 = sqlContext.sql("select * from jtable");
-
- sc.stop();
scala:
- 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
- 2.// val frame: DataFrame = session.read.json("./data/json")
- 3.val frame = session.read.format("json").load("./data/json")
- 4.frame.show(100)
- 5.frame.printSchema()
- 6.
- 7./**
- 8.* DataFrame API 操作
- 9.*/
- 10.//select name ,age from table
- 11.frame.select("name","age").show(100)
- 12.
- 13.//select name,age + 10 as addage from table
- 14.frame.select(frame.col("name"),frame.col("age").plus(10).as("addage")).show(100)
- 15.
- 16.//select name,age from table where age >= 19
- 17.frame.select("name","age").where(frame.col("age").>=(19)).show(100)
- 18.frame.filter("age>=19").show(100)
- 19.
- 20.//select name ,age from table order by name asc ,age desc
- 21.import session.implicits._
- 22.frame.sort($"name".asc,frame.col("age").desc).show(100)
- 23.
- 24.//select name ,age from table where age is not null
- 25.frame.filter("age is not null").show()
- 26.
- 27./**
- 28.* 创建临时表
- 29.*/
- 30.frame.createTempView("mytable")
- 31.session.sql("select name ,age from mytable where age >= 19").show()
- 32.frame.createOrReplaceTempView("mytable")
- 33.frame.createGlobalTempView("mytable")
- 34.frame.createOrReplaceGlobalTempView("mytable")
- 35.
- 36./**
- 37.* dataFrame 转换成RDD
- 38.*/
- 39.val rdd: RDD[Row] = frame.rdd
- 40.rdd.foreach(row=>{
- 41. val name = row.getAs[String]("name")
- 42. val age = row.getAs[Long]("age")
- 43. println(s"name is $name ,age is $age")
- 44.})
java:
- SparkConf conf = new SparkConf();
- conf.setMaster("local").setAppName("jsonRDD");
- JavaSparkContext sc = new JavaSparkContext(conf);
- SQLContext sqlContext = new SQLContext(sc);
- JavaRDD<String> nameRDD = sc.parallelize(Arrays.asList(
- "{\"name\":\"zhangsan\",\"age\":\"18\"}",
- "{\"name\":\"lisi\",\"age\":\"19\"}",
- "{\"name\":\"wangwu\",\"age\":\"20\"}"
- ));
- JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList(
- "{\"name\":\"zhangsan\",\"score\":\"100\"}",
- "{\"name\":\"lisi\",\"score\":\"200\"}",
- "{\"name\":\"wangwu\",\"score\":\"300\"}"
- ));
-
- DataFrame namedf = sqlContext.read().json(nameRDD);
- DataFrame scoredf = sqlContext.read().json(scoreRDD);
- namedf.registerTempTable("name");
- scoredf.registerTempTable("score");
-
- DataFrame result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name");
- result.show();
-
- sc.stop();
scala:
- 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
- 2.val jsonList = List[String](
- 3. "{'name':'zhangsan','age':'18'}",
- 4. "{'name':'lisi','age':'19'}",
- 5. "{'name':'wangwu','age':'20'}",
- 6. "{'name':'maliu','age':'21'}",
- 7. "{'name':'tainqi','age':'22'}"
- 8.)
- 9.
- 10.import session.implicits._
- 11.val jsds: Dataset[String] = jsonList.toDS()
- 12.val df = session.read.json(jsds)
- 13.df.show()
- 14.
- 15./**
- 16.* Spark 1.6
- 17.*/
- 18.val jsRDD: RDD[String] = session.sparkContext.parallelize(jsonList)
- 19.val frame: DataFrame = session.read.json(jsRDD)
- 20.frame.show()
- /**
- * 注意:
- * 1.自定义类必须是可序列化的
- * 2.自定义类访问级别必须是Public
- * 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序
- */
- SparkConf conf = new SparkConf();
- conf.setMaster("local").setAppName("RDD");
- JavaSparkContext sc = new JavaSparkContext(conf);
- SQLContext sqlContext = new SQLContext(sc);
- JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt");
- JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public Person call(String s) throws Exception {
- Person p = new Person();
- p.setId(s.split(",")[0]);
- p.setName(s.split(",")[1]);
- p.setAge(Integer.valueOf(s.split(",")[2]));
- return p;
- }
- });
- /**
- * 传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
- * 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了DataFrame
- */
- DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
- df.show();
- df.registerTempTable("person");
- sqlContext.sql("select name from person where id = 2").show();
-
- /**
- * 将DataFrame转成JavaRDD
- * 注意:
- * 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用
- * 2.可以使用row.getAs("列名")来获取对应的列值。
- *
- */
- JavaRDD<Row> javaRDD = df.javaRDD();
- JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public Person call(Row row) throws Exception {
- Person p = new Person();
- //p.setId(row.getString(1));
- //p.setName(row.getString(2));
- //p.setAge(row.getInt(0));
-
- p.setId((String)row.getAs("id"));
- p.setName((String)row.getAs("name"));
- p.setAge((Integer)row.getAs("age"));
- return p;
- }
- });
- map.foreach(new VoidFunction<Person>() {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public void call(Person t) throws Exception {
- System.out.println(t);
- }
- });
-
- sc.stop();
scala:
- 1.case class MyPerson(id:Int,name:String,age:Int,score:Double)
- 2.
- 3.object Test {
- 4. def main(args: Array[String]): Unit = {
- 5. val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
- 6. val peopleInfo: RDD[String] = session.sparkContext.textFile("./data/people.txt")
- 7. val personRDD : RDD[MyPerson] = peopleInfo.map(info =>{
- 8.MyPerson(info.split(",")(0).toInt,info.split(",")(1),info.split(",")(2).toInt,info.split(",")(3).toDouble)
- 9. })
- 10. import session.implicits._
- 11. val ds = personRDD.toDS()
- 12. ds.createTempView("mytable")
- 13. session.sql("select * from mytable ").show()
- 14. }
- 15.}
java:
- SparkConf conf = new SparkConf();
- conf.setMaster("local").setAppName("rddStruct");
- JavaSparkContext sc = new JavaSparkContext(conf);
- SQLContext sqlContext = new SQLContext(sc);
- JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt");
- /**
- * 转换成Row类型的RDD
- */
- JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public Row call(String s) throws Exception {
- return RowFactory.create(
- String.valueOf(s.split(",")[0]),
- String.valueOf(s.split(",")[1]),
- Integer.valueOf(s.split(",")[2])
- );
- }
- });
- /**
- * 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库
- */
- List<StructField> asList =Arrays.asList(
- DataTypes.createStructField("id", DataTypes.StringType, true),
- DataTypes.createStructField("name", DataTypes.StringType, true),
- DataTypes.createStructField("age", DataTypes.IntegerType, true)
- );
-
- StructType schema = DataTypes.createStructType(asList);
- DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
-
- df.show();
- sc.stop();
scala:
- 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
- 2.val peopleInfo: RDD[String] = session.sparkContext.textFile("./data/people.txt")
- 3.
- 4.val rowRDD: RDD[Row] = peopleInfo.map(info => {
- 5. val id = info.split(",")(0).toInt
- 6. val name = info.split(",")(1)
- 7. val age = info.split(",")(2).toInt
- 8. val score = info.split(",")(3).toDouble
- 9. Row(id, name, age, score)
- 10.})
- 11.val structType: StructType = StructType(Array[StructField](
- 12. StructField("id", IntegerType),
- 13. StructField("name", StringType),
- 14. StructField("age", IntegerType),
- 15. StructField("score", DoubleType)
- 16.))
- 17.val frame: DataFrame = session.createDataFrame(rowRDD,structType)
- 18.frame.createTempView("mytable")
- 19.session.sql("select * from mytable ").show()
- df.write().mode(SaveMode.Overwrite)format("parquet")
- .save("./sparksql/parquet");
- df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
Overwrite:覆盖
Append:追加
ErrorIfExists:如果存在就报错
Ignore:如果存在就忽略
java:
- SparkConf conf = new SparkConf();
- conf.setMaster("local").setAppName("parquet");
- JavaSparkContext sc = new JavaSparkContext(conf);
- SQLContext sqlContext = new SQLContext(sc);
- JavaRDD<String> jsonRDD = sc.textFile("sparksql/json");
- DataFrame df = sqlContext.read().json(jsonRDD);
- /**
- * 将DataFrame保存成parquet文件,SaveMode指定存储文件时的保存模式
- * 保存成parquet文件有以下两种方式:
- */
- df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
- df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
- df.show();
- /**
- * 加载parquet文件成DataFrame
- * 加载parquet文件有以下两种方式:
- */
- DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet");
- load = sqlContext.read().parquet("./sparksql/parquet");
- load.show();
-
- sc.stop();
scala:
- 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
- 2.val frame: DataFrame = session.read.json("./data/json")
- 3.frame.show()
- 4.frame.write.mode(SaveMode.Overwrite).parquet("./data/parquet")
- 5.
- 6.val df: DataFrame = session.read.format("parquet").load("./data/parquet")
- 7.df.createTempView("mytable")
- 8.session.sql("select count(*) from mytable ").show()
两种方式创建DataFrame
java:
- SparkConf conf = new SparkConf();
- conf.setMaster("local").setAppName("mysql");
- JavaSparkContext sc = new JavaSparkContext(conf);
- SQLContext sqlContext = new SQLContext(sc);
- /**
- * 第一种方式读取MySql数据库表,加载为DataFrame
- */
- Map<String, String> options = new HashMap<String,String>();
- options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
- options.put("driver", "com.mysql.jdbc.Driver");
- options.put("user", "root");
- options.put("password", "123456");
- options.put("dbtable", "person");
- DataFrame person = sqlContext.read().format("jdbc").options(options).load();
- person.show();
- person.registerTempTable("person");
- /**
- * 第二种方式读取MySql数据表加载为DataFrame
- */
- DataFrameReader reader = sqlContext.read().format("jdbc");
- reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
- reader.option("driver", "com.mysql.jdbc.Driver");
- reader.option("user", "root");
- reader.option("password", "123456");
- reader.option("dbtable", "score");
- DataFrame score = reader.load();
- score.show();
- score.registerTempTable("score");
-
- DataFrame result =
- sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name");
- result.show();
- /**
- * 将DataFrame结果保存到Mysql中
- */
- Properties properties = new Properties();
- properties.setProperty("user", "root");
- properties.setProperty("password", "123456");
- result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties);
-
- sc.stop();
scala:
- 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
- 2.
- 3.val prop = new Properties()
- 4.prop.setProperty("user","root")
- 5.prop.setProperty("password","123456")
- 6./**
- 7.* 第一种方式
- 8.*/
- 9.val df1 = session.read.jdbc("jdbc:mysql://192.168.179.14:3306/spark","person",prop)
- 10.df1.show()
- 11.df1.createTempView("person")
- 12.
- 13./**
- 14.* 第二种方式
- 15.*/
- 16.val map = Map[String,String](
- 17. "url" -> "jdbc:mysql://192.168.179.14:3306/spark",
- 18. "driver " -> "com.mysql.jdbc.Driver",
- 19. "user" -> "root",
- 20. "password" -> "123456",
- 21. "dbtable" -> "score"
- 22.)
- 23.val df2 = session.read.format("jdbc").options(map).load()
- 24.df2.show()
- 25.
- 26./**
- 27.* 第三种方式
- 28.*/
- 29.val df3 = session.read.format("jdbc")
- 30. .option("url", "jdbc:mysql://192.168.179.14:3306/spark")
- 31. .option("driver", "com.mysql.jdbc.Driver")
- 32. .option("user", "root")
- 33. .option("password", "123456")
- 34. .option("dbtable", "score")
- 35. .load()
- 36.df3.show()
- 37.df3.createTempView("score")
- 38.
- 39.val result = session.sql("select person.id,person.name,person.age,score.score from person ,score where person.id = score.id")
- 40.
- 41.result.show()
- 42.//将结果保存到mysql中
- 43.result.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.14:3306/spark","result",prop)
- 44.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。