赞
踩
package sqlText import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.types.{StructType, IntegerType, StringType, StructField} /** * Created by xiaoxu */ object SparkSQLAgg { def main(args: Array[String]) { System.setProperty("hadoop.home.dir", "E:\\winutils-hadoop-2.6.4\\hadoop-2.6.4") val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getName) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val userData = Array( "2016-04-15,1001,http://spark.apache.org,1000", "2016-04-15,1001,http://hadoop.apache.org,1001", "2016-04-15,1002,http://fink.apache.org,1002", "2016-04-16,1003,http://kafka.apache.org,1020", "2016-04-16,1004,http://spark.apache.org,1010", "2016-04-16,1002,http://hive.apache.org,1200", "2016-04-16,1001,http://parquet.apache.org,1500", "2016-04-16,1001,http://spark.apache.org,1800" ) import org.apache.spark.sql._ val parallelize: RDD[String] = sc.parallelize(userData) val userDateRDDRow = parallelize.map(row => { val splitted = row.split(",") Row(splitted(0).replaceAll("-", ""), splitted(1).toInt, splitted(2), splitted(3).toInt) }) // 构造字段,与数据匹配,便于今后查询 val structTypes = StructType(Array( StructField("date", StringType, true), StructField("id", IntegerType, true), StructField("url", StringType, true), StructField("amount", IntegerType, true) )) val createDataFrame = sqlContext.createDataFrame(userDateRDDRow, structTypes) //统计每个月的数量,直接显示 createDataFrame.groupBy("date").agg("amount" -> "sum").write.json("") // 统计每个月的数量,直接显示,数据量比较大时不能用collect,用write.json("")方法直接保存数据即可 createDataFrame.groupBy("date").agg("amount" -> "sum").map(row => Row(row(0), row(1))).collect.foreach(println)// 停止改程序 sc.stop()
}}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。