赞
踩
from pyspark.sql.types import StructField, StructType, StringType
# 定义 spark df 的表结构
schema = StructType(
[
StructField('ip', StringType(), True),
StructField('city', StringType(), True)
]
)
ip_city_path = job+'/abcdefg'
ip_city_df = spark.read.csv(ip_city_path, header=True, schema=schema, encoding='utf-8', sep=',')
or
spark.read
.option("charset", "utf-8")
.option("header", "true")
.option("quote", "\"")
.option("delimiter", ",")
.csv(...)
//方式一:直接使用csv方法 val sales4: DataFrame = spark.read.option("header", "true").option("header", false).csv("file:///D:\\Software\\idea_space\\spark_streaming\\src\\data\\exam\\sales.csv") .withColumnRenamed("_c0", "time") .withColumnRenamed("_c1", "id") .withColumnRenamed("_c2", "salary") .withColumn("salary", $"salary".cast("Double")) //方式二:使用format val salesDF3 = spark.read.format("com.databricks.spark.csv") //header默认false头不设置成字段名,true是第一行作为数据表的字段名 // .option("header", "true") //自动类型推断 // .option("inferSchema", true) .load("D:\\Software\\idea_space\\spark_streaming\\src\\data\\exam\\sales.csv") //字段重命名 .withColumnRenamed("_c0", "time") .withColumnRenamed("_c1", "id") .withColumnRenamed("_c2", "salary") //字段重命名后修改类型 .withColumn("salary", $"salary".cast("Double")) //方式三:通过算子将数组转换成样例类对象 private val salesDF: DataFrame = sc.textFile("file:///D:\\Software\\idea_space\\spark_streaming\\src\\data\\exam\\sales.csv") .map(_.split(",")).map(x => Sales(x(0), x(1), x(2).toDouble)).toDF() case class Sales(time:String,id:String,salary:Double) //方式四:通过schema创建 val userRDD:RDD[Row] = sc.textFile("file:///D:\\Software\\idea_space\\spark_streaming\\src\\data\\exam\\demo02.txt").map(_.split(",")) .map(x => Row(x(0), x(1), x(2).toInt)) val schema = StructType(Array( StructField("username", StringType, true), StructField("month", StringType, true), StructField("visitNum", IntegerType, true) )) val userDF: DataFrame = spark.createDataFrame(userRDD,schema)
DF1花了42秒,而DF2只花了10秒. csv文件的大小为60+ GB.
DF1 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("hdfs://bda-ns/user/project/xxx.csv")
DF2 = spark.read.option("header", "true").csv("hdfs://bda-ns/user/project/xxx.csv")
https://www.it1352.com/1847378.html
//方式一:
private val userJsonDF: DataFrame = spark.read.json("file:///D:\\Software\\idea_space\\spark_streaming\\src\\data\\exam\\users.json")
//方式二:
private val userJsonDF: DataFrame = spark.read.format("json").load("D:\\Software\\idea_space\\spark_streaming\\src\\data\\exam\\users.json")
读取
val url = "jdbc:mysql://hadoop1:3306/test"
val tableName = "sales"
private val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","ok")
prop.setProperty("driver","com.mysql.jdbc.Driver")
private val salesDF2: DataFrame = spark.read.jdbc(url,tableName,prop)
写入
val url = "jdbc:mysql://hadoop1:3306/test"
val tableName = "sales"
private val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","ok")
prop.setProperty("driver","com.mysql.jdbc.Driver")
//mode的几种方式:overwrite覆盖,append追加
salesDF.write.mode("overwrite").jdbc(url,tableName,prop)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。