赞
踩
数据主要分为:非结构化数据(txt,csv),半结构化数据(XML,JSON),结构化数据(mysql,oracle),SparkSql主要处理结构化数据。
1.Dataset/DataFrame API
2. SQL
1.创建SparkSession
SparkSession spark = SparkSession.builder()
.appName("SparkSessionTest")
.master("local")
.enableHiveSupport()
.getOrCreate();
2.设置SparkSession 运行时的配置
spark.conf().set("spark.sql.shuffle.partitions", 6);
spark.conf().set("spark.executor.memory", "2g");
Dataset<Long> numDS = spark.range(5, 100, 5);
// 对numDS按照id逆序排序并取出前五个
numDS.orderBy(numDS.col("id").desc()).show(5);
// 用spark.createDataFrame从List中创建一个DataFrame
Dataset<Row> personDF =
spark.createDataFrame(Arrays.asList(new Person("name", 12), new Person("name2", 13)), Person.class);
personDF.printSchema();
personDF.orderBy(personDF.col("name").desc()).show();
// 从json文件中读取数据来创建DataFrame
personDF = spark.read().json("spark-dataset-java/src/main/resources/people.json");
personDF.filter(personDF.col("age").gt(21));
personDF.createOrReplaceTempView("people");
personDF.cache();
Dataset<Row> resultsDF = spark.sql("SELECT * FROM people");
resultsDF.show(10);
//4: 访问catalog元数据
spark.catalog().listDatabases().show();
spark.catalog().listTables().show();
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
Dog dog1 = new Dog("jitty", "red");
Dog dog2 = new Dog("mytty", "yellow");
List<Dog> list = Arrays.asList(dog1, dog2);
JavaRDD<Dog> dogJavaRDD = javaSparkContext.parallelize(list);
//1:从JavaRDD<T>中创建
Dataset<Row> dogDf = spark.createDataFrame(dogJavaRDD, Dog.class);
dogDf.printSchema();
dogDf.show();
Dataset<Row> dogListDf = spark.createDataFrame(list, Dog.class);
dogListDf.printSchema();
dogListDf.show();
JavaRDD<String> peopleRDD = spark.sparkContext() .textFile("spark-dataset-java/src/main/resources/people.txt", 1) .toJavaRDD(); JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> { String[] attributes = record.split(","); return RowFactory.create(attributes[0], attributes[1].trim()); }); String schemaString = "name age"; // Generate the schema based on the string of schema List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); // Apply the schema to the RDD Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema); peopleDataFrame.printSchema(); peopleDataFrame.show();
List<Row> listRows = rowRDD.collect();
Dataset<Row> peopleListRowDataFrame = spark.createDataFrame(listRows, schema);
peopleListRowDataFrame.show();
Dataset<Row> personDataset =
spark.read().json("spark-dataset-java/src/main/resources/IoT_device_info.json");
personDataset.show();
RDD<Person> personRDD = javaSparkContext.parallelize(list).rdd();
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> rddDS = spark.createDataset(personRDD, personEncoder);
rddDS.show();
Dataset<Person> javaBeanDS = spark.createDataset(list, personEncoder);
javaBeanDS.show();
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
Dog dog1 = new Dog("jitty", "red");
Dog dog2 = new Dog("mytty", "yellow");
List<Dog> list = Arrays.asList(dog1, dog2);
JavaRDD<Dog> dogJavaRDD = javaSparkContext.parallelize(list);
//JavaRDD 不能直接转Dateset<Row>,需要走创建的方法
//1:从JavaRDD<T>中创建
Dataset<Row> dogDf = spark.createDataFrame(dogJavaRDD, Dog.class);
dogDf.show();
RDD<Row> dogRowRdd = dogDf.rdd();
dogRowRdd.collect();
Encoder<Dog> personEncoder = Encoders.bean(Dog.class);
Dataset<Dog> javaBeanDS = spark.createDataset(dogJavaRDD.rdd(), personEncoder);
javaBeanDS.show();
RDD<Dog> dogRdd = javaBeanDS.rdd();
dogRdd.collect();
Dataset<Dog> dsFromDf = dogDf.as(Encoders.bean(Dog.class));
dsFromDf.show();
Dataset<Row> dfFromDs = dsFromDf.toDF("first_name", "lovest_color");
dfFromDs.show();
内置支持数据源
Dataset<Row> sessionDF = spark.read().load(Utils.BASE_PATH + "/trackerSession");
sessionDF.show();
sessionDF.select("ip", "cookie").write().save(Utils.BASE_PATH + "/trackerSession_ip_cookie");
1.1可以读取多个文件目录下的数据文件
Dataset<Row> multiSessionDF = spark.read().load(Utils.BASE_PATH + "/trackerSession",
Utils.BASE_PATH + "/trackerSession_ip_cookie");
multiSessionDF.show();
1.2 指定数据源数据格式
//读取json文件, 且将读取出来的数据保存为parquet文件
Dataset<Row> deviceInfoDF =
spark.read().format("json").load(Utils.BASE_PATH + "/IoT_device_info.json");
deviceInfoDF.show();
spark.read().json(Utils.BASE_PATH + "/IoT_device_info.json");
deviceInfoDF.write().format("orc").save(Utils.BASE_PATH + "/iot");
deviceInfoDF.write().orc(Utils.BASE_PATH + "/iot2");
1.3 option传递参数,改变读写数据源的行为
spark.read().option("mergeSchema", "true").parquet(Utils.BASE_PATH + "/trackerSession");
deviceInfoDF.write().option("compression", "snappy").parquet(Utils.BASE_PATH + "/iot3");
1.4 SaveMode指定保存模式
//SaveMode.ErrorIfExists(对应着字符串"error"):表示如果目标文件目录中数据已经存在了,则抛异常(这个是默认的配置)
//SaveMode.Append(对应着字符串"append"):表示如果目标文件目录中数据已经存在了,则将数据追加到目标文件中
//SaveMode.Overwrite(对应着字符串"overwrite"):表示如果目标文件目录中数据已经存在了,则用需要保存的数据覆盖掉已经存在的数据
//SaveMode.Ignore(对应着字符串为:"ignore"):表示如果目标文件目录中数据已经存在了,则不做任何操作
deviceInfoDF.write().option("compression", "snappy")
.mode(SaveMode.Overwrite).parquet(Utils.BASE_PATH + "/iot3");
deviceInfoDF.write().option("compression", "snappy")
.mode("overwrite").parquet(Utils.BASE_PATH + "/iot3");
1.parquet读写
Dataset<Row> df = spark.read().json(Utils.BASE_PATH + "/people.json"); df.show(); df.write().option("compression", "snappy") .mode(SaveMode.Overwrite).parquet(Utils.BASE_PATH + "/parquet"); //2: 读取parquet文件 Dataset<Row> parquetDF = spark.read().parquet(Utils.BASE_PATH + "/parquet"); parquetDF.show(); //3: parquet schema merge //全局设置spark.sql.parquet.mergeSchema = true df.toDF("age", "first_name").write() .mode(SaveMode.Overwrite).parquet(Utils.BASE_PATH + "/parquet_schema_change"); Dataset<Row> schemaMergeDF = spark.read().option("mergeSchema", "true").parquet(Utils.BASE_PATH + "/parquet", Utils.BASE_PATH + "/parquet_schema_change"); schemaMergeDF.show();
//1: 将json文件数据转化成orc文件数据
Dataset<Row> df = spark.read().json(Utils.BASE_PATH + "/people.json");
df.show();
df.write().option("compression", "snappy")
.mode(SaveMode.Overwrite).orc(Utils.BASE_PATH + "/orc");
Dataset<Row> orcFileDF = spark.read().orc(Utils.BASE_PATH + "/orc");
orcFileDF.show();
Dataset<Row> sessionDf = spark.read().parquet(Utils.BASE_PATH + "/trackerSession"); sessionDf.show(); sessionDf.write().mode(SaveMode.Overwrite).json(Utils.BASE_PATH + "/json"); //读取json文件数据 Dataset<Row> jsonDF = spark.read().json(Utils.BASE_PATH + "/json"); jsonDF.show(); //可以从JSON Dataset(类型为String)中创建一个DF List<String> jsonList = Arrays.asList("{\"name\":\"Yin\",\"address\":{\"is_old\":true,\"area\":23000.34}}"); Dataset<String> jsonDataset = spark.createDataset(jsonList, Encoders.STRING()); Dataset<Row> jsonDFFromDS = spark.read().json(jsonDataset); jsonDFFromDS.show(); //读参数的设置 Map<String, String> readOpts = new HashMap<>(); readOpts.put("primitivesAsString", "true"); readOpts.put("allowComments", "true"); jsonDFFromDS = spark.read().options(readOpts).json(jsonDataset); jsonDFFromDS.show(); //写参数的设置 Map<String, String> writeOpts = new HashMap<>(); writeOpts.put("compression", "gzip"); writeOpts.put("dateFormat", "yyyy/MM/dd"); List<StructField> fields = new ArrayList<>(); StructField name = DataTypes.createStructField("name", DataTypes.StringType, true); StructField date = DataTypes.createStructField("date", DataTypes.DateType, true); fields.add(name); fields.add(date); StructType customSchema = DataTypes.createStructType(fields); List<String> dateJsonList = Arrays.asList("{'name':'Yin','date':'26/08/2015 18:00'}"); Dataset<String> dateJsonDataset = spark.createDataset(dateJsonList, Encoders.STRING()); Dataset<Row> dateJsonDFFromDS = spark.read().schema(customSchema).option("dateFormat", "dd/MM/yyyy HH:mm").json(dateJsonDataset); dateJsonDFFromDS.write().mode(SaveMode.Overwrite) .options(writeOpts).json(Utils.BASE_PATH + "/json_date"); spark.read().json(Utils.BASE_PATH + "/json_date").show(); spark.stop();在这里插入代码片
Dataset<Row> jsonDF = spark.read().json(Utils.BASE_PATH + "/people.json"); //将json文件数据转化成csv文件数据 jsonDF.write().mode(SaveMode.Overwrite).csv(Utils.BASE_PATH + "/csv"); Dataset<Row> csvDF = spark.read().csv(Utils.BASE_PATH + "/csv"); csvDF.show(); //从String类型中的Dataset来创建DataFrame List<String> csvStr = Arrays.asList("23,jeffy,26/08/2015 18:00", "34,katy,27/10/2014 18:30"); Dataset<String> csvDS = spark.createDataset(csvStr, Encoders.STRING()); Dataset<Row> csvDFFromDS = spark.read().csv(csvDS); csvDFFromDS.show(); List<StructField> fields = new ArrayList<>(); StructField age = DataTypes.createStructField("age", DataTypes.IntegerType, true); StructField name = DataTypes.createStructField("name", DataTypes.StringType, true); StructField date = DataTypes.createStructField("date", DataTypes.DateType, true); fields.add(age); fields.add(name); fields.add(date); StructType customSchema = DataTypes.createStructType(fields); //设置读参数 Map<String, String> readOpts = new HashMap<>(); readOpts.put("header", "false"); readOpts.put("comment", "~"); readOpts.put("dateFormat", "dd/MM/yyyy HH:mm"); Dataset<Row> data = spark.read().schema(customSchema).options(readOpts).csv(csvDS); //设置写参数 Map<String, String> writeOpts = new HashMap<>(); writeOpts.put("comment", "~"); writeOpts.put("compression", "gzip"); writeOpts.put("dateFormat", "yyyy/MM/dd"); data.write().mode(SaveMode.Overwrite).options(writeOpts).csv(Utils.BASE_PATH + "/csv_options"); spark.read().csv(Utils.BASE_PATH + "/csv_options").show();
//创建一个SparkSession SparkSession spark = SparkSession.builder() .config("spark.driver.host", "localhost") .appName("JdbcTest") .master("local") .getOrCreate(); String mysqlUrl="jdbc:mysql://127.0.0.1:3199/ajgl"; //1:读取csv文件数据 Dataset<Row> df = spark.read() .option("header", "true") .option("inferSchema", "true") .csv(Utils.BASE_PATH + "/jdbc_demo_data.csv"); df.show(); Properties properties=new Properties(); properties.put("user","root"); properties.put("password","admin123"); properties.put("driver","com.mysql.cj.jdbc.Driver"); //写数据库表的参数 Map<String, String> writeOpts = new HashMap<>(); writeOpts.put("truncate", "false"); writeOpts.put("createTableColumnTypes", "age long"); writeOpts.put("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8"); writeOpts.put("batchsize", "100"); writeOpts.put("isolationLevel", "READ_UNCOMMITTED"); //写数据库 df.write().mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl,"person",properties); Map<String, String> readOpts = new HashMap<>(); writeOpts.put("numPartitions", "2"); writeOpts.put("partitionColumn", "id"); writeOpts.put("lowerBound", "3"); writeOpts.put("upperBound", "5"); writeOpts.put("fetchsize", "100"); //分区读数据库 spark.read().options(readOpts).jdbc(mysqlUrl,"person",properties).show(); //和上面map参数等价 spark.read().jdbc(mysqlUrl,"person","id",3,5,2,properties).show(); //等同 spark.read().jdbc(mysqlUrl,"person",new String[]{"id>2 and id<5","id>=5 and id<8"},properties).show(); spark.stop();
//1: 将json文件数据转化成text文件数据 //compression //`none`, `bzip2`, `gzip` Dataset<Row> jsonDF = spark.read().json(Utils.BASE_PATH + "/people.json"); jsonDF.select("name") .write() .mode(SaveMode.Overwrite) .option("compression", "bzip2") .text(Utils.BASE_PATH + "/text"); //读取text文件,返回DataFrame Dataset<Row> textDF = spark.read().text(Utils.BASE_PATH + "/text"); textDF.show(); //读取text文件,返回Dataset<String> Dataset<String> textDS = spark.read().textFile(Utils.BASE_PATH + "/text"); textDS.show(); //1: 将json文件数据保存到spark的table中 Dataset<Row> df = spark.read().json(Utils.BASE_PATH + "/people.json"); df.show(); spark.catalog().listTables().show(); df.write().saveAsTable("person"); spark.catalog().listTables().show(); Dataset<Row> tableDF = spark.read().table("person"); tableDF.show(); //从临时视图中读取 df.createOrReplaceTempView("person_2"); Dataset<Row> viewDF = spark.read().table("person_2"); viewDF.show();
Dataset<Row> sessions = spark.read().parquet(Utils.BASE_PATH + "/trackerSession_date"); sessions.show(); sessions.printSchema(); sessions.createOrReplaceTempView("non_partition_table"); spark.sql("select * from non_partition_table where day = 20170903").show(); //对数据按照年月日进行分区 sessions.write() .mode(SaveMode.Overwrite) .partitionBy("year", "month", "day") .parquet(Utils.BASE_PATH + "/trackerSession_partition"); Dataset<Row> partitionDF = spark.read().parquet(Utils.BASE_PATH + "/trackerSession_partition"); partitionDF.show(); partitionDF.printSchema(); //用sql查询某20170903这天的数据 partitionDF.createOrReplaceTempView("partition_table"); spark.sql("select * from partition_table where day = 20170903").show(); //取20170903这天的数据 Dataset<Row> day03DF = spark.read() .parquet(Utils.BASE_PATH + "/trackerSession_partition/year=2017/month=201709/day=20170903"); day03DF.show(); day03DF.printSchema(); //bucket只能用于hive表中 sessions.write() .partitionBy("year") // .bucketBy(24, "cookie") .saveAsTable("partition_session"); spark.stop();
spark中的数据信息由catalog进行管理,catalog实现有两种方式in-memory,hive metastore;
spark.catalog().createTable("trackerSession_other",
Utils.BASE_PATH + "/trackerSession");
spark.sql("select * from trackerSession_other").show();
spark.sql("create table person(name string, age int) using parquet");
Dataset<Row> person_other = spark.read().json(Utils.BASE_PATH + "/people.json");
person_other.createOrReplaceTempView("person_other");
spark.sql("insert into table person select name, age from person_other");
try {
sessionDf.createTempView("trackerSession");
} catch (AnalysisException e) {
e.printStackTrace();
}
sessionDf.createOrReplaceTempView("trackerSession");
Dataset<Row> sessionRecords = spark.sql("select * from trackerSession");
sessionRecords.show();
//第二种创建临时视图的方式
spark.sql("CREATE TEMPORARY VIEW temp_cookie1 AS SELECT * FROM " +
"trackerSession WHERE cookie = 'cookie1'");
Dataset<Row> cookie1 = spark.sql("select * from temp_cookie1");
cookie1.show();
3.2 全局级别的视图
personDF.createOrReplaceGlobalTempView("person_global");
//需要从spark sql的保留db global_temp 中查询这个全局视图
spark.sql("select * from global_temp.person_global").show();
//spark.newSession().sql("select * from trackerSession").show(); //会抛异常
spark.newSession().sql("select * from global_temp.person_global").show();
//catalog table相关元数据操作
spark.catalog().listTables().show();
Dataset<Row> sessionDf = spark.read().parquet(Utils.BASE_PATH + "/trackerSession"); sessionDf.createOrReplaceTempView("trackerSession"); //查询所有记录 Dataset<Row> sessionRecords = spark.sql("select * from trackerSession"); sessionRecords.show(); //过滤 Dataset<Row> filterSession = spark.sql("select cookie from trackerSession where cookie_label = '固执'"); filterSession.show(); //先join, 然后再分组SQL Dataset<Row> logDf = spark.read().parquet(Utils.BASE_PATH + "/trackerLog"); logDf.printSchema(); logDf.createOrReplaceTempView("trackerLog"); String sql = "select tl.url, count(*) from trackerLog tl join trackerSession ts on tl.cookie = ts.cookie" + " where ts.cookie_label = '固执' and tl.log_type='pageview'" + " group by tl.url" + " order by tl.url desc"; spark.sql(sql).show();
spark.udf().register("myUDF", (Integer arg1, String arg2) -> {
if (arg1 > 1 && arg2.equals("固执")) {
return arg2 + arg1;
} else {
return arg2 + "less";
}
}, DataTypes.StringType);
spark.sql("select myUDF(click_count, cookie_label) as c from trackerSession").show();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。