赞
踩
- import org.apache.spark.sql.types._
- import org.apache.spark.sql.Row
-
- val peopleRDD = spark.sparkContext.textFile("file:///root/wyqWork/Demo6/employee.txt")
-
- val schemaString = "id name age"
-
- val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
-
- val schema = StructType(fields)
-
- val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
-
- val peopleDF = spark.createDataFrame(rowRDD, schema)
-
- peopleDF.createOrReplaceTempView("people")
-
- val results = spark.sql("SELECT id,name,age FROM people")
-
- results.map(attributes => "id: " + attributes(0)+","+"name:"+attributes(1)+","+"age:"+attributes(2)).show()
- import java.util.Properties
- import org.apache.spark.sql.types._
- import org.apache.spark.sql.Row
- import org.apache.spark.sql.SparkSession
- val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate()
- import spark.implicits._
- val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
- val schema=StructType(List(StructField("id",IntegerType,
- true),StructField("name",StringType,true),StructField("gender",StringType,true),
- StructField("age",IntegerType,true)))
- val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
- val employeeDF=spark.createDataFrame(rowRDD,schema)
- val prop=new Properties()
- prop.put("user","root")
- prop.put("password","10086CYc#")
- prop.put("driver","com.mysql.cj.jdbc.Driver")
- employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop)
- val jdbcDF = spark.read.format("jdbc").
- option("url","jdbc:mysql://localhost:3306/sparktest").
- option("driver","com.mysql.cj.jdbc.Driver").
- option("dbtable", "employee").
- option("user", "root").
- option("password", "10086CYc#").
- load()
- jdbcDF.agg("age" -> "max", "age" -> "sum").show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。