当前位置:   article > 正文

Spark---创建DataFrame的方式_spark dataframe

spark dataframe

1、读取json格式的文件创建DataFrame

注意:

1、可以两种方式读取json格式的文件。

2、df.show()默认显示前20行数据。

3、DataFrame原生API可以操作DataFrame。

4、注册成临时表时,表中的列默认按ascii顺序显示列。

  1. df.createTempView("mytable")
  2. df.createOrReplaceTempView("mytable")
  3. df.createGlobalTempView("mytable")
  4. df.createOrReplaceGlobalTempView("mytable")
  5. Session.sql("select * from global_temp.mytable").show()

5、DataFrame是一个Row类型的RDD,df.rdd()/df.javaRdd()。

java

  1. SparkConf conf = new SparkConf();
  2. conf.setMaster("local").setAppName("jsonfile");
  3. SparkContext sc = new SparkContext(conf);
  4. //创建sqlContext
  5. SQLContext sqlContext = new SQLContext(sc);
  6. /**
  7. * DataFrame的底层是一个一个的RDD RDD的泛型是Row类型。
  8. * 以下两种方式都可以读取json格式的文件
  9. */
  10. DataFrame df = sqlContext.read().format("json").load("sparksql/json");
  11. // DataFrame df2 = sqlContext.read().json("sparksql/json.txt");
  12. // df2.show();
  13. /**
  14. * DataFrame转换成RDD
  15. */
  16. RDD<Row> rdd = df.rdd();
  17. /**
  18. * 显示 DataFrame中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)
  19. * 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。
  20. */
  21. // df.show();
  22. /**
  23. * 树形的形式显示schema信息
  24. */
  25. df.printSchema();
  26. /**
  27. * dataFram自带的API 操作DataFrame
  28. */
  29. //select name from table
  30. // df.select("name").show();
  31. //select name age+10 as addage from table
  32. df.select(df.col("name"),df.col("age").plus(10).alias("addage")).show();
  33. //select name ,age from table where age>19
  34. df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show();
  35. //select count(*) from table group by age
  36. df.groupBy(df.col("age")).count().show();
  37. /**
  38. * 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘
  39. */
  40. df.registerTempTable("jtable");
  41. DataFrame sql = sqlContext.sql("select age,count(1) from jtable group by age");
  42. DataFrame sql2 = sqlContext.sql("select * from jtable");
  43. sc.stop();

scala:

  1. 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
  2. 2.// val frame: DataFrame = session.read.json("./data/json")
  3. 3.val frame = session.read.format("json").load("./data/json")
  4. 4.frame.show(100)
  5. 5.frame.printSchema()
  6. 6.
  7. 7./**
  8. 8.* DataFrame API 操作
  9. 9.*/
  10. 10.//select name ,age from table
  11. 11.frame.select("name","age").show(100)
  12. 12.
  13. 13.//select name,age + 10 as addage from table
  14. 14.frame.select(frame.col("name"),frame.col("age").plus(10).as("addage")).show(100)
  15. 15.
  16. 16.//select name,age from table where age >= 19
  17. 17.frame.select("name","age").where(frame.col("age").>=(19)).show(100)
  18. 18.frame.filter("age>=19").show(100)
  19. 19.
  20. 20.//select name ,age from table order by name asc ,age desc
  21. 21.import session.implicits._
  22. 22.frame.sort($"name".asc,frame.col("age").desc).show(100)
  23. 23.
  24. 24.//select name ,age from table where age is not null
  25. 25.frame.filter("age is not null").show()
  26. 26.
  27. 27./**
  28. 28.* 创建临时表
  29. 29.*/
  30. 30.frame.createTempView("mytable")
  31. 31.session.sql("select name ,age from mytable where age >= 19").show()
  32. 32.frame.createOrReplaceTempView("mytable")
  33. 33.frame.createGlobalTempView("mytable")
  34. 34.frame.createOrReplaceGlobalTempView("mytable")
  35. 35.
  36. 36./**
  37. 37.* dataFrame 转换成RDD
  38. 38.*/
  39. 39.val rdd: RDD[Row] = frame.rdd
  40. 40.rdd.foreach(row=>{
  41. 41. val name = row.getAs[String]("name")
  42. 42. val age = row.getAs[Long]("age")
  43. 43. println(s"name is $name ,age is $age")
  44. 44.})

2、通过json格式的RDD创建DataFrame

java:

  1. SparkConf conf = new SparkConf();
  2. conf.setMaster("local").setAppName("jsonRDD");
  3. JavaSparkContext sc = new JavaSparkContext(conf);
  4. SQLContext sqlContext = new SQLContext(sc);
  5. JavaRDD<String> nameRDD = sc.parallelize(Arrays.asList(
  6. "{\"name\":\"zhangsan\",\"age\":\"18\"}",
  7. "{\"name\":\"lisi\",\"age\":\"19\"}",
  8. "{\"name\":\"wangwu\",\"age\":\"20\"}"
  9. ));
  10. JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList(
  11. "{\"name\":\"zhangsan\",\"score\":\"100\"}",
  12. "{\"name\":\"lisi\",\"score\":\"200\"}",
  13. "{\"name\":\"wangwu\",\"score\":\"300\"}"
  14. ));
  15. DataFrame namedf = sqlContext.read().json(nameRDD);
  16. DataFrame scoredf = sqlContext.read().json(scoreRDD);
  17. namedf.registerTempTable("name");
  18. scoredf.registerTempTable("score");
  19. DataFrame result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name");
  20. result.show();
  21. sc.stop();

scala:

  1. 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
  2. 2.val jsonList = List[String](
  3. 3. "{'name':'zhangsan','age':'18'}",
  4. 4. "{'name':'lisi','age':'19'}",
  5. 5. "{'name':'wangwu','age':'20'}",
  6. 6. "{'name':'maliu','age':'21'}",
  7. 7. "{'name':'tainqi','age':'22'}"
  8. 8.)
  9. 9.
  10. 10.import session.implicits._
  11. 11.val jsds: Dataset[String] = jsonList.toDS()
  12. 12.val df = session.read.json(jsds)
  13. 13.df.show()
  14. 14.
  15. 15./**
  16. 16.* Spark 1.6
  17. 17.*/
  18. 18.val jsRDD: RDD[String] = session.sparkContext.parallelize(jsonList)
  19. 19.val frame: DataFrame = session.read.json(jsRDD)
  20. 20.frame.show()

3、非json格式的RDD创建DataFrame

1)、通过反射的方式将非json格式的RDD转换成DataFrame(不建议使用)

  • 自定义类要可序列化
  • 自定义类的访问级别是Public
  • RDD转成DataFrame后会根据映射将字段按Assci码排序
  • 将DataFrame转换成RDD时获取字段两种方式,一种是df.getInt(0)下标获取(不推荐使用),另一种是df.getAs(“列名”)获取(推荐使用)
  1. /**
  2. * 注意:
  3. * 1.自定义类必须是可序列化的
  4. * 2.自定义类访问级别必须是Public
  5. * 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序
  6. */
  7. SparkConf conf = new SparkConf();
  8. conf.setMaster("local").setAppName("RDD");
  9. JavaSparkContext sc = new JavaSparkContext(conf);
  10. SQLContext sqlContext = new SQLContext(sc);
  11. JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt");
  12. JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
  13. /**
  14. *
  15. */
  16. private static final long serialVersionUID = 1L;
  17. @Override
  18. public Person call(String s) throws Exception {
  19. Person p = new Person();
  20. p.setId(s.split(",")[0]);
  21. p.setName(s.split(",")[1]);
  22. p.setAge(Integer.valueOf(s.split(",")[2]));
  23. return p;
  24. }
  25. });
  26. /**
  27. * 传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
  28. * 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了DataFrame
  29. */
  30. DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
  31. df.show();
  32. df.registerTempTable("person");
  33. sqlContext.sql("select name from person where id = 2").show();
  34. /**
  35. * 将DataFrame转成JavaRDD
  36. * 注意:
  37. * 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用
  38. * 2.可以使用row.getAs("列名")来获取对应的列值。
  39. *
  40. */
  41. JavaRDD<Row> javaRDD = df.javaRDD();
  42. JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
  43. /**
  44. *
  45. */
  46. private static final long serialVersionUID = 1L;
  47. @Override
  48. public Person call(Row row) throws Exception {
  49. Person p = new Person();
  50. //p.setId(row.getString(1));
  51. //p.setName(row.getString(2));
  52. //p.setAge(row.getInt(0));
  53. p.setId((String)row.getAs("id"));
  54. p.setName((String)row.getAs("name"));
  55. p.setAge((Integer)row.getAs("age"));
  56. return p;
  57. }
  58. });
  59. map.foreach(new VoidFunction<Person>() {
  60. /**
  61. *
  62. */
  63. private static final long serialVersionUID = 1L;
  64. @Override
  65. public void call(Person t) throws Exception {
  66. System.out.println(t);
  67. }
  68. });
  69. sc.stop();

scala:

  1. 1.case class MyPerson(id:Int,name:String,age:Int,score:Double)
  2. 2.
  3. 3.object Test {
  4. 4. def main(args: Array[String]): Unit = {
  5. 5. val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
  6. 6. val peopleInfo: RDD[String] = session.sparkContext.textFile("./data/people.txt")
  7. 7. val personRDD : RDD[MyPerson] = peopleInfo.map(info =>{
  8. 8.MyPerson(info.split(",")(0).toInt,info.split(",")(1),info.split(",")(2).toInt,info.split(",")(3).toDouble)
  9. 9. })
  10. 10. import session.implicits._
  11. 11. val ds = personRDD.toDS()
  12. 12. ds.createTempView("mytable")
  13. 13. session.sql("select * from mytable ").show()
  14. 14. }
  15. 15.}

2)、动态创建Schema将非json格式的RDD转换成DataFrame

java:

  1. SparkConf conf = new SparkConf();
  2. conf.setMaster("local").setAppName("rddStruct");
  3. JavaSparkContext sc = new JavaSparkContext(conf);
  4. SQLContext sqlContext = new SQLContext(sc);
  5. JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt");
  6. /**
  7. * 转换成Row类型的RDD
  8. */
  9. JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
  10. /**
  11. *
  12. */
  13. private static final long serialVersionUID = 1L;
  14. @Override
  15. public Row call(String s) throws Exception {
  16. return RowFactory.create(
  17. String.valueOf(s.split(",")[0]),
  18. String.valueOf(s.split(",")[1]),
  19. Integer.valueOf(s.split(",")[2])
  20. );
  21. }
  22. });
  23. /**
  24. * 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库
  25. */
  26. List<StructField> asList =Arrays.asList(
  27. DataTypes.createStructField("id", DataTypes.StringType, true),
  28. DataTypes.createStructField("name", DataTypes.StringType, true),
  29. DataTypes.createStructField("age", DataTypes.IntegerType, true)
  30. );
  31. StructType schema = DataTypes.createStructType(asList);
  32. DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
  33. df.show();
  34. sc.stop();

scala:

  1. 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
  2. 2.val peopleInfo: RDD[String] = session.sparkContext.textFile("./data/people.txt")
  3. 3.
  4. 4.val rowRDD: RDD[Row] = peopleInfo.map(info => {
  5. 5. val id = info.split(",")(0).toInt
  6. 6. val name = info.split(",")(1)
  7. 7. val age = info.split(",")(2).toInt
  8. 8. val score = info.split(",")(3).toDouble
  9. 9. Row(id, name, age, score)
  10. 10.})
  11. 11.val structType: StructType = StructType(Array[StructField](
  12. 12. StructField("id", IntegerType),
  13. 13. StructField("name", StringType),
  14. 14. StructField("age", IntegerType),
  15. 15. StructField("score", DoubleType)
  16. 16.))
  17. 17.val frame: DataFrame = session.createDataFrame(rowRDD,structType)
  18. 18.frame.createTempView("mytable")
  19. 19.session.sql("select * from mytable ").show()

4、读取parquet文件创建DataFrame

注意:

  • 可以将DataFrame存储成parquet文件。保存成parquet文件的方式有两种
  1. df.write().mode(SaveMode.Overwrite)format("parquet")
  2. .save("./sparksql/parquet");
  3. df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
  • SaveMode指定文件保存时的模式。

Overwrite:覆盖

Append:追加

ErrorIfExists:如果存在就报错

Ignore:如果存在就忽略

java:

  1. SparkConf conf = new SparkConf();
  2. conf.setMaster("local").setAppName("parquet");
  3. JavaSparkContext sc = new JavaSparkContext(conf);
  4. SQLContext sqlContext = new SQLContext(sc);
  5. JavaRDD<String> jsonRDD = sc.textFile("sparksql/json");
  6. DataFrame df = sqlContext.read().json(jsonRDD);
  7. /**
  8. * 将DataFrame保存成parquet文件,SaveMode指定存储文件时的保存模式
  9. * 保存成parquet文件有以下两种方式:
  10. */
  11. df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
  12. df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
  13. df.show();
  14. /**
  15. * 加载parquet文件成DataFrame
  16. * 加载parquet文件有以下两种方式:
  17. */
  18. DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet");
  19. load = sqlContext.read().parquet("./sparksql/parquet");
  20. load.show();
  21. sc.stop();

scala:

  1. 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
  2. 2.val frame: DataFrame = session.read.json("./data/json")
  3. 3.frame.show()
  4. 4.frame.write.mode(SaveMode.Overwrite).parquet("./data/parquet")
  5. 5.
  6. 6.val df: DataFrame = session.read.format("parquet").load("./data/parquet")
  7. 7.df.createTempView("mytable")
  8. 8.session.sql("select count(*) from mytable ").show()

5、读取JDBC中的数据创建DataFrame(MySql为例)

两种方式创建DataFrame

java:

  1. SparkConf conf = new SparkConf();
  2. conf.setMaster("local").setAppName("mysql");
  3. JavaSparkContext sc = new JavaSparkContext(conf);
  4. SQLContext sqlContext = new SQLContext(sc);
  5. /**
  6. * 第一种方式读取MySql数据库表,加载为DataFrame
  7. */
  8. Map<String, String> options = new HashMap<String,String>();
  9. options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
  10. options.put("driver", "com.mysql.jdbc.Driver");
  11. options.put("user", "root");
  12. options.put("password", "123456");
  13. options.put("dbtable", "person");
  14. DataFrame person = sqlContext.read().format("jdbc").options(options).load();
  15. person.show();
  16. person.registerTempTable("person");
  17. /**
  18. * 第二种方式读取MySql数据表加载为DataFrame
  19. */
  20. DataFrameReader reader = sqlContext.read().format("jdbc");
  21. reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
  22. reader.option("driver", "com.mysql.jdbc.Driver");
  23. reader.option("user", "root");
  24. reader.option("password", "123456");
  25. reader.option("dbtable", "score");
  26. DataFrame score = reader.load();
  27. score.show();
  28. score.registerTempTable("score");
  29. DataFrame result =
  30. sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name");
  31. result.show();
  32. /**
  33. * 将DataFrame结果保存到Mysql中
  34. */
  35. Properties properties = new Properties();
  36. properties.setProperty("user", "root");
  37. properties.setProperty("password", "123456");
  38. result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties);
  39. sc.stop();

scala:

  1. 1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
  2. 2.
  3. 3.val prop = new Properties()
  4. 4.prop.setProperty("user","root")
  5. 5.prop.setProperty("password","123456")
  6. 6./**
  7. 7.* 第一种方式
  8. 8.*/
  9. 9.val df1 = session.read.jdbc("jdbc:mysql://192.168.179.14:3306/spark","person",prop)
  10. 10.df1.show()
  11. 11.df1.createTempView("person")
  12. 12.
  13. 13./**
  14. 14.* 第二种方式
  15. 15.*/
  16. 16.val map = Map[String,String](
  17. 17. "url" -> "jdbc:mysql://192.168.179.14:3306/spark",
  18. 18. "driver " -> "com.mysql.jdbc.Driver",
  19. 19. "user" -> "root",
  20. 20. "password" -> "123456",
  21. 21. "dbtable" -> "score"
  22. 22.)
  23. 23.val df2 = session.read.format("jdbc").options(map).load()
  24. 24.df2.show()
  25. 25.
  26. 26./**
  27. 27.* 第三种方式
  28. 28.*/
  29. 29.val df3 = session.read.format("jdbc")
  30. 30. .option("url", "jdbc:mysql://192.168.179.14:3306/spark")
  31. 31. .option("driver", "com.mysql.jdbc.Driver")
  32. 32. .option("user", "root")
  33. 33. .option("password", "123456")
  34. 34. .option("dbtable", "score")
  35. 35. .load()
  36. 36.df3.show()
  37. 37.df3.createTempView("score")
  38. 38.
  39. 39.val result = session.sql("select person.id,person.name,person.age,score.score from person ,score where person.id = score.id")
  40. 40.
  41. 41.result.show()
  42. 42.//将结果保存到mysql中
  43. 43.result.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.14:3306/spark","result",prop)
  44. 44.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/558596
推荐阅读
相关标签
  

闽ICP备14008679号