赞
踩
一:准备数据源
在项目下新建一个student.txt文件,里面的内容为:
- 1,zhangsan,20
- 2,lisi,21
- 3,wanger,19
- 4,fangliu,18
二:实现
Java版:
1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:
- package com.cxd.sql;
-
- import java.io.Serializable;
-
- @SuppressWarnings("serial")
- public class Student implements Serializable {
-
- String sid;
- String sname;
- int sage;
- public String getSid() {
- return sid;
- }
- public void setSid(String sid) {
- this.sid = sid;
- }
- public String getSname() {
- return sname;
- }
- public void setSname(String sname) {
- this.sname = sname;
- }
- public int getSage() {
- return sage;
- }
- public void setSage(int sage) {
- this.sage = sage;
- }
- @Override
- public String toString() {
- return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
- }
-
- }
-
-
-

2.转换,具体代码如下
- package com.cxd.sql;
-
- import java.util.ArrayList;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.RowFactory;
- import org.apache.spark.sql.SaveMode;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.types.DataTypes;
- import org.apache.spark.sql.types.StructField;
- import org.apache.spark.sql.types.StructType;
-
- public class TxtToParquetDemo {
-
- public static void main(String[] args) {
-
- SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
- SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
-
- reflectTransform(spark);//Java反射
- dynamicTransform(spark);//动态转换
- }
-
- /**
- * 通过Java反射转换
- * @param spark
- */
- private static void reflectTransform(SparkSession spark)
- {
- JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
-
- JavaRDD<Student> rowRDD = source.map(line -> {
- String parts[] = line.split(",");
-
- Student stu = new Student();
- stu.setSid(parts[0]);
- stu.setSname(parts[1]);
- stu.setSage(Integer.valueOf(parts[2]));
- return stu;
- });
-
- Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
- df.select("sid", "sname", "sage").
- coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
- }
- /**
- * 动态转换
- * @param spark
- */
- private static void dynamicTransform(SparkSession spark)
- {
- JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
-
- JavaRDD<Row> rowRDD = source.map( line -> {
- String[] parts = line.split(",");
- String sid = parts[0];
- String sname = parts[1];
- int sage = Integer.parseInt(parts[2]);
-
- return RowFactory.create(
- sid,
- sname,
- sage
- );
- });
-
- ArrayList<StructField> fields = new ArrayList<StructField>();
- StructField field = null;
- field = DataTypes.createStructField("sid", DataTypes.StringType, true);
- fields.add(field);
- field = DataTypes.createStructField("sname", DataTypes.StringType, true);
- fields.add(field);
- field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
- fields.add(field);
-
- StructType schema = DataTypes.createStructType(fields);
-
- Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
- df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
-
-
- }
-
- }

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {
case class Student(id:Int,name:String,age:Int)
def main(args:Array[String])
{
val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
import spark.implicits._
reflectCreate(spark)
dynamicCreate(spark)
}
/**
* 通过Java反射转换
* @param spark
*/
private def reflectCreate(spark:SparkSession):Unit={
import spark.implicits._
val stuRDD=spark.sparkContext.textFile("student2.txt")
//toDF()为隐式转换
val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
//stuDf.select("id","name","age").write.text("result") //对写入文件指定列名
stuDf.printSchema()
stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //将查询结果写入一个文件
nameDf.show()
}
/**
* 动态转换
* @param spark
*/
private def dynamicCreate(spark:SparkSession):Unit={
val stuRDD=spark.sparkContext.textFile("student.txt")
import spark.implicits._
val schemaString="id,name,age"
val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema=StructType(fields)
val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
val stuDf=spark.createDataFrame(rowRDD, schema)
stuDf.printSchema()
val tmpView=stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //将查询结果写入一个文件
nameDf.show()
}
}

注:1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。
2.此代码不适用于spark2.0以前的版本。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。