赞
踩
表 6-2 employee 表原有数据
表 6-3 employee 表新增数据
新建数据库sparktest,新建数据表employee,并输入数据:
create database sparktest;
use spraktest;
create table employee(id int(4),name char(50), gender char(20), age int(10));
insert into employee values(1,'Alice','F',22);
insert into employee values(2,'John','M',25);
(如果你前面已经采用下面方式启动了spark-shell,就不需要重复启动了)
进入spark时写附加命令(前提是记得建所需要的包进行解压到spark的jars目录下)
./bin/spark-shell \
--jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \
--driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
开启 Mysql 服务器
service mysql start
以下问题可能是 没有开始服务器 或者 密码错误
import java.util.Properties import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate() import spark.implicits._ val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" ")) val schema=StructType(List(StructField("id",IntegerType, true),StructField("name",StringType,true),StructField("gender",StringType,true), StructField("age",IntegerType,true))) val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt)) val employeeDF=spark.createDataFrame(rowRDD,schema) val prop=new Properties() prop.put("user","root") prop.put("password","210374520") prop.put("driver","com.mysql.jdbc.Driver") employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop) val jdbcDF = spark.read.format("jdbc"). option("url","jdbc:mysql://localhost:3306/sparktest"). option("driver","com.mysql.jdbc.Driver"). option("dbtable", "employee"). option("user", "root"). option("password", "210374520"). load() jdbcDF.agg("age" -> "max", "age" -> "sum").show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。