当前位置:   article > 正文

pyspark操作 rdd dataframe,pyspark.sql.functions详解 行列变换_pyspark udf 每一行修改返回每一行值

pyspark udf 每一行修改返回每一行值

官网文档可以参考:https://spark.apache.org/docs/latest/api/python/index.html

dataframe读写

生成以逗号分隔的数据

stringCSVRDD = spark.sparkContext.parallelize([
    (123, "Katie", 19, "brown"),
    (234, "Michael", 22, "green"),
    (345, "Simone", 23, "blue")
])
  • 1
  • 2
  • 3
  • 4
  • 5

指定模式, StructField(name,dataType,nullable)

其中:

name: 该字段的名字,

dataType:该字段的数据类型,

nullable: 指示该字段的值是否为空

from pyspark.sql.types import StructType, StructField, LongType, StringType  # 导入类型

schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

对RDD应用该模式并且创建DataFrame

swimmers = spark.createDataFrame(stringCSVRDD,schema)

利用DataFrame创建一个临时视图

swimmers.registerTempTable(“swimmers”)

查看DataFrame的行数

swimmers.count()
2.2. 从变量创建

使用自动类型推断的方式创建dataframe

data = [(123, “Katie”, 19, “brown”),
(234, “Michael”, 22, “green”),
(345, “Simone”, 23, “blue”)]
df = spark.createDataFrame(data, schema=[‘id’, ‘name’, ‘age’, ‘eyccolor’])
df.show()
df.count()
2.3. 读取json

读取spark下面的示例数据

file = r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\people.json"
df = spark.read.json(file)
df.show()
2.4. 读取csv

先创建csv文件

import pandas as pd
import numpy as np
df=pd.DataFrame(np.random.rand(5,5),columns=[‘a’,‘b’,‘c’,‘d’,‘e’]).
applymap(lambda x: int(x*10))
file=r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\random.csv"
df.to_csv(file,index=False)

再读取csv文件

monthlySales = spark.read.csv(file, header=True, inferSchema=True)
monthlySales.show()
2.5. 读取MySQL

此时需要将mysql-jar驱动放到spark-2.2.0-bin-hadoop2.7\jars下面

单机环境可行,集群环境不行

重新执行

df = spark.read.format(‘jdbc’).options(
url=‘jdbc:mysql://127.0.0.1’,
dbtable=‘mysql.db’,
user=‘root’,
password=‘123456’
).load()
df.show()

也可以传入SQL语句

sql=“(select * from mysql.db where db=‘wp230’) t”
df = spark.read.format(‘jdbc’).options(
url=‘jdbc:mysql://127.0.0.1’,
dbtable=sql,
user=‘root’,
password=‘123456’
).load()
df.show()
2.6. 从pandas.dataframe创建

如果不指定schema则用pandas的列名

df = pd.DataFrame(np.random.random((4,4)))
spark_df = spark.createDataFrame (df,schema=[‘a’,‘b’,‘c’,‘d’])
2.7. 从列式存储的parquet读取

读取example下面的parquet文件

file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\users.parquet"
df=spark.read.parquet(file)
df.show()
2.8. 从hive读取

如果已经配置spark连接hive的参数,可以直接读取hive数据

spark = SparkSession
.builder
.enableHiveSupport() \
.master(“172.31.100.170:7077”)
.appName(“my_first_app_name”)
.getOrCreate()

df=spark.sql(“select * from hive_tb_name”)
df.show()
2.9.从hdfs读取
直接使用read.csv的方法即可。

直接读取,不需要指定ip和port

data= spark.read.csv(‘hdfs:///tmp/_da_exdata_path/data.csv’, header=True)
data.show()

有些情况下是需要指定ip和端口的

data= spark.read.csv(‘hdfs://localhost:9000/tmp/_da_exdata_path/data.csv’, header=True)
data.show()
3. 保存数据
3.1. 写到csv

创建dataframe

import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=[‘a’, ‘b’, ‘c’, ‘d’])
spark_df = spark.createDataFrame(df)

写到csv

file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.csv"
spark_df.write.csv(path=file, header=True, sep=“,”, mode=‘overwrite’)
3.2. 保存到parquet

创建dataframe

import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=[‘a’, ‘b’, ‘c’, ‘d’])
spark_df = spark.createDataFrame(df)

写到parquet

file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.parquet"
spark_df.write.parquet(path=file,mode=‘overwrite’)
3.3. 写到hive

打开动态分区

spark.sql(“set hive.exec.dynamic.partition.mode = nonstrict”)
spark.sql(“set hive.exec.dynamic.partition=true”)

使用普通的hive-sql写入分区表

spark.sql(“”"
insert overwrite table ai.da_aipurchase_dailysale_hive
partition (saledate)
select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate
from szy_aipurchase_tmp_szy_dailysale distribute by saledate
“”")

或者使用每次重建分区表的方式

jdbcDF.write.mode(“overwrite”).partitionBy(“saledate”).insertInto(“ai.da_aipurchase_dailysale_hive”)
jdbcDF.write.saveAsTable(“ai.da_aipurchase_dailysale_hive”, None, “append”, partitionBy=‘saledate’)

不写分区表,只是简单的导入到hive表

jdbcDF.write.saveAsTable(“ai.da_aipurchase_dailysale_for_ema_predict”, None, “overwrite”, None)
3.4. 写到hdfs

数据写到hdfs,而且以csv格式保存

jdbcDF.write.mode(“overwrite”).options(header=“true”).csv(“/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv”)
3.5. 写到mysql

会自动对齐字段,也就是说,spark_df 的列不一定要全部包含MySQL的表的全部列才行

overwrite 清空表再导入

spark_df.write.mode(“overwrite”).format(“jdbc”).options(
url=‘jdbc:mysql://127.0.0.1’,
user=‘root’,
password=‘123456’,
dbtable=“test.test”,
batchsize=“1000”,
).save()

append 追加方式

spark_df.write.mode(“append”).format(“jdbc”).options(
url=‘jdbc:mysql://127.0.0.1’,
user=‘root’,
password=‘123456’,
dbtable=“test.test”,
batchsize=“1000”,
).save()

我们采用本地实验的方式,来学习下语法

face.csv文件内容如下

image_id,device_id,date_str,age,gender,glass,hat,feat
2019-03-09_8_0007f1a502433ee0d80c7f14c3bf7bc0face.jpg,8,2019-03-09,11.0,female,noglass,nohat,11111111111
2019-03-09_8_000e791eb5978a9fad084f8ad012c780face.jpg,8,2019-03-09,49.0,female,Glass.TYPE1,nohat,2222222222
2019-03-09_8_0041cad3b76d6b30103ad5dd1396276cface.jpg,8,2019-03-09,24.0,male,noglass,nohat,333333

  • 1
  • 2
  • 3
  • 4
  • 5

python的示例demo


# 并行计算文件
from pyspark import SparkConf
from pyspark import SparkContext

conf =SparkConf().setAppName("file_test")   # 本地4核启动
sparkContext = SparkContext.getOrCreate(conf)    # 创建context
# sparkContext.setLogLevel("info")     # 设置打印日志等级



rdd=sparkContext.textFile("face.csv")  # 每行一个item
print(rdd.first())   # 读取第一行
rdd=rdd.distinct()   # 先去除重复数据
rdd=rdd.map(lambda x: x.split(','))  # 对每个item进行并行操作  flatMap会把所有item平展开合并成一个list
rdd=rdd.filter(lambda x: x[4]=='male' or x[4]=='female')  # 筛选出满足条件的item。现在每个item是个列表了
print(rdd.count())
# rdd.foreach(lambda x: print(x))   # 并行执行某些函数,返回为空   action函数
gender_group_rdd=rdd.groupBy(lambda x:'female' if x[4]=='female' else 'male') # 按性别分组,[(key,results),(key,results),]
for (key,value) in gender_group_rdd.collect():
    print(key, type(value))
# print(gender_group_rdd[0])


print('==============key-value===================')

# key-value
# rdd转换
device_rdd=rdd.map(lambda x:(x[1],1))     # 将每个item转化为(key,value),这样可以进行group。rdd中的key和value都是以元素(key,value)的形式存在的
print((device_rdd.keys().collect()))   # 获取所有的key
print((device_rdd.values().collect()))  # 获取所有的value
print(device_rdd.lookup('8'))   # 根据key,查找value,action行为,返回list

# 排序函数
count_rdd=device_rdd.sortByKey(ascending=True)  # 按key排序
count_rdd=device_rdd.sortBy(lambda x: x[1],ascending=True)  # 自定义排序规则
print(count_rdd.collect())

# 变换计算函数
count_rdd=device_rdd.mapValues(lambda y:y+1-1)  # 将所有value进行操作
count_rdd=count_rdd.reduceByKey(lambda x,y:x+y)  # 对key相同的value进行求和,并行后只存在不重复的key
print(count_rdd.collectAsMap())  # 以字典的形式返回数据
print(count_rdd.take(30))  # 读取前n行列表
print(count_rdd.takeOrdered(3))
print(count_rdd.takeOrdered(3, key=lambda x:-x[1]))  # 自定义排序规则

# 直接计算函数
device_count = device_rdd.countByKey()  # 按key进行组内求个数,直接是action函数
print(device_count.items())

# 没有很方便的groupby后的egg函数。必须要用dataframe
#

#

# int rdd
print('====================int rdd=================')
int_rdd = count_rdd.map(lambda x: x[1])  # 取出内个摄像头的人脸数目
print(int_rdd.stats())
print(int_rdd.min(),int_rdd.max(),int_rdd.stdev(),int_rdd.count(),int_rdd.sum(),int_rdd.mean())
count_dif = int_rdd.countByValue()
print(count_dif.items())

# print(rdd.collect())  # 打印全部数据

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

dataframe语法

DataFrame 的函数

Action 操作

collect() ,返回值是一个数组,返回dataframe集合所有的行
collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行
count() 返回一个number类型的,返回dataframe集合的行数
describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算,只这对数值类型的字段。例如df.describe("age", "height").show()
first() 返回第一行 ,类型是row类型
head() 返回第一行 ,类型是row类型
head(n:Int)返回n行  ,类型是row 类型
show()返回dataframe集合的值 默认是20行,返回类型是unit
show(n:Int)返回n行,,返回值类型是unit
table(n:Int) 返回n行  ,类型是row 类型
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

dataframe的基本操作

cache()同步数据的内存
columns 返回一个string类型的数组,返回值是所有列的名字
dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型
explan()打印执行计划  物理的
explain(n:Boolean) 输入值为 false 或者true ,返回值是unit  默认是false ,如果输入true 将会打印 逻辑的和物理的
isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false
persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型
printSchema() 打印出字段名称和类型 按照树状结构来打印
registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了
schema 返回structType 类型,将字段名称和类型按照结构体类型返回
toDF()返回一个新的dataframe类型的
toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,
unpersist() 返回dataframe.this.type 类型,去除模式中的数据
unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

集成查询:

1、 agg(expers:column*) 返回dataframe类型 ,同数学计算求值
	df.agg(max("age"), avg("salary"))
	df.groupBy().agg(max("age"), avg("salary"))
2、 agg(exprs: Map[String, String])  返回dataframe类型 ,同数学计算求值 map类型的
	df.agg(Map("age" -> "max", "salary" -> "avg"))
	df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
3、 agg(aggExpr: (String, String), aggExprs: (String, String)*)  返回dataframe类型 ,同数学计算求值
	df.agg(Map("age" -> "max", "salary" -> "avg"))
	df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
4、 apply(colName: String) 返回column类型,捕获输入进去列的对象
5、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名
6、 col(colName: String)  返回column类型,捕获输入进去列的对象
7、 cube(col1: String, cols: String*) 返回一个GroupedData类型,根据某些字段来汇总
8、 distinct 去重 返回一个dataframe类型
9、 drop(col: Column) 删除某列 返回dataframe类型
10、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe
11、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的
12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe类型,这个 将一个字段进行更多行的拆分
df.explode("name","names") {name :String=> name.split(" ")}.show();
将name字段根据空格来拆分,拆分的字段放在names里面
13、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型 df.filter("age>10").show();  df.filter(df("age")>10).show();   df.where(df("age")>10).show(); 都可以
14、 groupBy(col1: String, cols: String*) 根据某写字段来汇总返回groupedate类型   df.groupBy("age").agg(Map("age" ->"count")).show();df.groupBy("age").avg().show();都可以
15、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素
16、 join(right: DataFrame, joinExprs: Column, joinType: String)
一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemi
df.join(ds,df("name")===ds("name") and  df("age")===ds("age"),"outer").show();
17、 limit(n: Int) 返回dataframe类型  去n 条数据出来
18、 na: DataFrameNaFunctions ,可以调用dataframenafunctions的功能区做过滤 df.na.drop().show(); 删除为空的行
19、 orderBy(sortExprs: Column*) 做alise排序
20、 select(cols:string*) dataframe 做字段的刷选 df.select($"colA", $"colB" + 1)
21、 selectExpr(exprs: String*) 做字段的刷选 df.selectExpr("name","name as names","upper(name)","age+1").show();
22、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默认是asc
23、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show();
24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
25、 withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name")).show();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

数据类型转换

BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

两种方式

from pyspark.sql.types import DoubleType,IntegerType
changedTypedf = dataframe.withColumn("label", dataframe["show"].cast(DoubleType()))


或者
changedTypedf = dataframe.withColumn("label", dataframe["show"].cast("double"))

如果改变原有列的类型
toDoublefunc = UserDefinedFunction(lambda x: float(x),DoubleType())

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

更复杂的类型变换

types.ArrayType(types.IntegerType()).simpleString()   
'array<int>'
types.MapType(types.StringType(), types.IntegerType()).simpleString()
'map<string,int>'
  • 1
  • 2
  • 3
  • 4

dataframe python示例


# 并行计算文件
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import DataFrame,SQLContext,HiveContext,SparkSession
from pyspark.sql.functions import isnull,isnan,udf
from pyspark.sql import functions
from pyspark.sql import types
from pyspark.sql.types import DoubleType,IntegerType,StringType,DateType
import datetime,time




# 创建
print('=============读取保存==================')
conf =SparkConf().setAppName("file_test")   # 本地4核启动
sparkContext = SparkContext.getOrCreate(conf)    # 创建context
# sparkContext.setLogLevel("info")     # 设置打印日志等级
sqlContext = SQLContext(sparkContext)
df = sqlContext.read.csv("face.csv",header="true")  # header设置为true将设置文件中的第一行作为表头
print(df.first())
print(df.head(2))
df.write.csv('age_gender.csv',header=True,mode='overwrite')

# 应用
print('===========遍历================')
def apply1(x):
    pass
    # print(x['image_id'])
df.foreach(apply1)



# 变换
print('===========变换================')
df = df.withColumn("age", df["age"].cast("Int"))  # 修改列的类型
print(df.show(3))
new_df = df.withColumn('userid',df['age'].cast('int')%10)    # 新增一列,cast 可用于列类型变换df.select(col.cast('int'))
print(new_df.show(3))

# new_df = df.withColumn('image_id', '')   # 修改列的值
# print(new_df.show(3))
df = df.withColumnRenamed( "date_str","date")   # 修改列名,方便join
print(df.show(3))

# dataframe中的apply函数,可以遍历每一行进行变换
# 定义一个 udf 函数
def today(day):
    if day==None:
        return datetime.datetime.now()
    else:
        return datetime.datetime.strptime(day,"%y-%m-%d")

# 返回类型为字符串类型
udfday = udf(today, DateType())
df.withColumn('date', udfday(df.date))  # 对每行的指定列进行变换
print(df.show(3))

# 填充缺失值
df=df.fillna('')
print(df.show(3))

# 替换值
df = df.replace('male','male1')  # 直接替换值

# 删除列
new_df = new_df.drop('userid')   # 删除列
# 删除行
df = df.na.drop()  # 扔掉任何列包含na的行
df = df.dropna(subset=['image_id', 'feat'])  # 扔掉image_id或feat中任一一列包含na的行




# 筛选过滤
print('============过滤================')
df = df.filter(~isnull("device_id"))  # 把a列里面数据为null的筛选出来(代表python的None类型)  dataframe里面取逻辑运算为&  | ~ 因为dataframe重写了符号运算
df = df.filter(~isnan("device_id"))  # 把a列里面数据为nan的筛选出来(Not a Number,非数字数据)

# df=df.where("gender=='female'" )   # 过滤where和filter都支持直接python表达式的方式   表达式内可以使用and or not
# df=df.where(df['gender']=='female')  # 过滤where和filter都支持boolean矩阵, &  | ~
print(df.show(3))
device_dif=df.select('device_id').distinct()   # 去除重复
device_dif=df.select('device_id','age').dropDuplicates(['age'])   # 按指定字段去重
print('摄像头id列表',)
device_dif.show()     # show 是action动作
print('摄像头数目',device_dif.count())  # count 是action动作





# 统计
print('============统计=================')
df.stat.freqItems(['device_id','gender'], 0.3).show()  # 显示列的取值出现频率超过一定百分比的列取值。有多列是就分别计算每列的高频率列取值(注意不是列组合)
df.groupby('gender').count().show()   # 分组统计数量
df.crosstab('gender', 'age').show()   # 交叉统计,统计不能性别不同年龄的人脸数目
df.groupBy('gender').agg({'device_id':'count',"age":"avg"}).show()  # 分组计算,对自己想要的列进行想要的计算
df.groupBy('gender').agg(functions.avg('age'), functions.min('age'), functions.max('age')).show()  # 每个完成多种统计计算



# 数据集和pandas的转化
print('============数据集类型转化==============')
pandas_df = df.toPandas()
spark_df = sqlContext.createDataFrame(pandas_df)

# 与Spark RDD的相互转换:
rdd_df = df.rdd
df = rdd_df.toDF()

# SQL操作
print('===============sql操作===================')

df.createOrReplaceTempView("face")   # DataFrame注册成SQL的表
conf = SparkConf()
ss = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate()
df = ss.sql("SELECT age, gender FROM face WHERE age >= 13 AND age <= 19")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121

参考:https://blog.csdn.net/sinat_26917383/article/details/80500349

pyspark dataframe 将json字符串列转为多列

对于json对象中包含不同的key值,需要先获取所有key, 将json字符串转为struct对象, 然后再转为多列


from pyspark import SparkConf,SparkContext,SparkContext,SQLContext
from pyspark.sql import SparkSession,SQLContext,functions,types,DataFrame,SQLContext,HiveContext,SparkSession

from pyspark.sql.functions import isnull,isnan,udf,from_json, col
from pyspark.sql.types import DoubleType,IntegerType,StringType,DateType,StructType,StructField
import datetime,time
import json
import os

# 创建spark本地运行,日志目录
try:
   os.mkdir('/tmp/spark-events')
except Exception as e:
   print(e)

# 创建
print('=============读取保存==================')
conf =SparkConf().setAppName("test")   # 本地4核启动
sparkContext = SparkContext.getOrCreate(conf)    # 创建context
sparkContext.setLogLevel("warn")     # 设置打印日志等级
sqlContext = SQLContext(sparkContext)



dslist=[{'r':1,'data':'{"key1":"value1","key2":"value2"}'},{'r':2,'data':'{"key3":"value11","key1":"value3"}'}]
df = sqlContext.createDataFrame(dslist)
df.show(truncate=False)
df.printSchema()
print('=====================')

# 获取所有keys,方法1
rdd_data = df.rdd.map(lambda row: list(json.loads(row.data).keys()))
all_keys = rdd_data.collect()
row_keys = []
for row_key in all_keys:
    row_keys = row_keys+row_key
all_keys = list(set(row_keys))   # key去重
print(all_keys)

field = [StructField(key, StringType()) for key in all_keys]

json_schema = StructType(field)
print('=====================')

# 获取所有keys,方法2,没测试  不成功
# print('=====================')
# json_schema = sqlContext.read.json(df.rdd.map(lambda row: row.data)).schema
# new_df = sqlContext.read.json(df.rdd.map(lambda r: r.data))
# print(json_schema)


new_df = df.withColumn('json', from_json(col('data'), json_schema))
new_df.printSchema()
new_df.show(truncate=False)


# def flatten_struct(schema, prefix=""):
#     result = []
#     for elem in schema:
#         if isinstance(elem.dataType, StructType):
#             result += flatten_struct(elem.dataType, prefix + elem.name + ".")
#         else:
#             result.append(col(prefix + elem.name).alias(prefix + elem.name))
#     return result
# new_df = new_df.select(new_df.schema)

new_col=[col('r')]+[col('json.'+key).alias(key) for key in all_keys]
new_df = new_df.select(new_col)
new_df.show(truncate=False)



# python json转为df
# dslist=[{"key1":"value1","key2":"value2","r":1},{"key1":"value11","key3":"value3","r":2}]
# df1 = sqlContext.createDataFrame(dslist)
# df1.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

pyspark dataframe列的合并与拆分

使用Spark SQL在对数据进行处理的过程中,可能会遇到对一列数据拆分为多列,或者把多列数据合并为一列。这里记录一下目前想到的对DataFrame列数据进行合并和拆分的几种方法。

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("dataframe_split") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext
df = spark.read.csv('hdfs://master:9000/dataset/dataframe_split.csv', inferSchema=True, header=True)
df.show(3)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

原始数据如下所示
这里写图片描述

dataframe列数据的分割

from pyspark.sql.functions import split, explode, concat, concat_ws
df_split = df.withColumn("s", split(df['score'], " "))
df_split.show()
  • 1
  • 2
  • 3

这里写图片描述

dataframe列数据的拆分

zipWithIndex:给每个元素生成一个索引

排序首先基于分区索引,然后是每个分区内的项目顺序.因此,第一个分区中的第一个item索引为0,最后一个分区中的最后一个item的索引最大.当RDD包含多个分区时此方法需要触发spark作业.
这里写图片描述

first_row = df.first()
numAttrs = len(first_row['score'].split(" "))
print("新增列的个数", numAttrs)
attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()
print("列名:", attrs)
for name, index in attrs:
    df_split = df_split.withColumn(name, df_split['s'].getItem(index))
df_split.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

这里写图片描述

dataframe将一行分成多行

df_explode = df.withColumn("e", explode(split(df['score'], " ")))
df_explode.show()
  • 1
  • 2

这里写图片描述

dataframe列数据的合并

列的合并有两个函数:一个不添加分隔符concat(),一个添加分隔符concat_ws()

concat

df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], \
                                                       df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_concat.show()
  • 1
  • 2
  • 3

这里写图片描述

caoncat_ws

df_ws = df_split.withColumn("score_concat", concat_ws('-', df_split['score_0'], \
                                                       df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_ws.show()
  • 1
  • 2
  • 3

这里写图片描述

dataframe多行转多列

pivot: 旋转当前[[dataframe]]列并执行指定的聚合

#DataFrame 数据格式:每个用户对每部电影的评分 userID 用户ID,movieID 电影ID,rating评分
df=spark.sparkContext.parallelize([[15,399,2], \
                                   [15,1401,5], \
                                   [15,1608,4], \
                                   [15,20,4], \
                                   [18,100,3], \
                                   [18,1401,3], \
                                   [18,399,1]])\
                    .toDF(["userID","movieID","rating"])
#pivot 多行转多列
resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)
#结果
resultDF.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

参考文献:

Spark DataFrame 列的合并与拆分

Spark DataFrame 多行转多列

参考:https://blog.csdn.net/intersting/article/details/84500978

pyspark 自定义聚合函数 UDAF

自定义聚合函数 UDAF 目前有点麻烦,PandasUDFType.GROUPED_AGG 在2.3.2的版本中不知怎么回事,不能使用!

这样的话只能曲线救国了!

PySpark有一组很好的聚合函数(例如,count,countDistinct,min,max,avg,sum),但这些并不适用于所有情况(特别是如果你试图避免代价高昂的Shuffle操作)。

PySpark目前有pandas_udfs,它可以创建自定义聚合器,但是你一次只能“应用”一个pandas_udf。如果你想使用多个,你必须预先形成多个groupBys …并且避免那些改组。

在这篇文章中,我描述了一个小黑客,它使您能够创建简单的python UDF,它们对聚合数据起作用(此功能只应存在于Scala中!)。

from pyspark.sql import functions as F
from pyspark.sql import types as T

a = sc.parallelize([[1, 'a'],
                    [1, 'b'],
                    [1, 'b'],
                    [2, 'c']]).toDF(['id', 'value'])
a.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

我使用collect_list将给定组中的所有数据放入一行。我打印下面这个操作的输出。

a.groupBy('id').agg(F.collect_list('value').alias('value_list')).show()
  • 1

然后我创建一个UDF,它将计算这些列表中字母’a’的所有出现(这可以很容易地在没有UDF的情况下完成,但是你明白了)。此UDF包含collect_list,因此它作用于collect_list的输出。

def find_a(x):
  """Count 'a's in list."""
  output_count = 0
  for i in x:
    if i == 'a':
      output_count += 1
  return output_count

find_a_udf = F.udf(find_a, T.IntegerType())

a.groupBy('id').agg(find_a_udf(F.collect_list('value')).alias('a_count')).show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

我们去!作用于聚合数据的UDF!接下来,我展示了这种方法的强大功能,结合何时让我们控制哪些数据进入F.collect_list。

首先,让我们创建一个带有额外列的数据框。

from pyspark.sql import functions as F
from pyspark.sql import types as T

a = sc.parallelize([[1, 1, 'a'],
                    [1, 2, 'a'],
                    [1, 1, 'b'],
                    [1, 2, 'b'],
                    [2, 1, 'c']]).toDF(['id', 'value1', 'value2'])
a.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

请注意,我如何在collect_list中包含一个when。请注意,UDF仍然包含collect_list。

a.groupBy('id').agg(find_a_udf( F.collect_list(F.when(F.col('value1') == 1, F.col('value2')))).alias('a_count')).show()
  • 1

还有一种做法就是用pandas_udf, series 添加一列分组变量然后去重。

还有就是使用输入输出都是dataframe 的 pandas_udf

参考:https://www.cnblogs.com/wdmx/p/10156500.html

pyspark.sql.functions详解

pyspark.sql.functions包含了很多内置函数。

1.pyspark.sql.functions.abs(col)
计算绝对值。

2.pyspark.sql.functions.acos(col)
计算给定值的反余弦值; 返回的角度在0到π的范围内。

3.pyspark.sql.functions.add_months(start, months)
返回start后months个月的日期

4.pyspark.sql.functions.array_contains(col, value)
集合函数:如果数组包含给定值,则返回True。 收集元素和值必须是相同的类型。

5.pyspark.sql.functions.ascii(col)
计算字符串列的第一个字符的数值。

6.pyspark.sql.functions.avg(col)
聚合函数:返回组中的值的平均值。

7.pyspark.sql.functions.cbrt(col)
计算给定值的立方根。

8.pyspark.sql.functions.ceil(col)
计算给定值的上限。

9.pyspark.sql.functions.coalesce(*cols)
返回不为空的第一列。

10.pyspark.sql.functions.col(col)
根据给定的列名返回一个列。

11.pyspark.sql.functions.collect_list(col)
聚合函数:返回重复对象的列表。

12.pyspark.sql.functions.collect_set(col)
聚合函数:返回一组消除重复元素的对象。

13.pyspark.sql.functions.concat(*cols)
将多个输入字符串列连接成一个字符串列。

14.pyspark.sql.functions.concat_ws(sep, *cols)
使用给定的分隔符将多个输入字符串列连接到一个字符串列中。

15.pyspark.sql.functions.corr(col1, col2)
返回col1和col2的皮尔森相关系数的新列。

16.pyspark.sql.functions.cos(col)
计算给定值的余弦。

17.pyspark.sql.functions.cosh(col)
计算给定值的双曲余弦。

18.pyspark.sql.functions.count(col)
聚合函数:返回组中的项数量。

19.pyspark.sql.functions.countDistinct(col, *cols)
返回一列或多列的去重计数的新列。

20.pyspark.sql.functions.current_date()
以日期列的形式返回当前日期。

21.pyspark.sql.functions.current_timestamp()
将当前时间戳作为时间戳列返回。

22.pyspark.sql.functions.date_add(start, days)
返回start后days天的日期

23.pyspark.sql.functions.date_format(date, format)
将日期/时间戳/字符串转换为由第二个参数给定日期格式指定格式的字符串值。
一个模式可能是例如dd.MM.yyyy,可能会返回一个字符串,如“18 .03.1993”。 可以使用Java类java.text.SimpleDateFormat的所有模式字母。
注意:尽可能使用像年份这样的专业功能。 这些受益于专门的实施。

24.pyspark.sql.functions.date_sub(start, days)
返回start前days天的日期

25.pyspark.sql.functions.datediff(end, start)
返回从start到end的天数。

26.pyspark.sql.functions.dayofmonth(col)
将给定日期的月份的天解压为整数。

27.pyspark.sql.functions.dayofyear(col)
将给定日期的年份中的某一天提取为整数。

28.pyspark.sql.functions.desc(col)
基于给定列名称的降序返回一个排序表达式。

29.pyspark.sql.functions.exp(col)
计算给定值的指数。

30.pyspark.sql.functions.expm1(col)
计算给定值的指数减1。

31.pyspark.sql.functions.factorial(col)
计算给定值的阶乘。

32.pyspark.sql.functions.floor(col)
计算给定值的最小。

33.pyspark.sql.functions.format_number(col, d)
将数字X格式化为像’#, - #, - #.-'这样的格式,四舍五入到小数点后的位置,并以字符串形式返回结果。
参数:● col – 要格式化的数值的列名称
● d – N小数位

34.pyspark.sql.functions.format_string(format, *cols)
以printf样式格式化参数,并将结果作为字符串列返回。
参数:● format – 要格式化的格式
● cols - 要格式化的列

35.pyspark.sql.functions.hex(col)
计算给定列的十六进制值,可以是StringType,BinaryType,IntegerType或LongType

36.pyspark.sql.functions.hour(col)
将给定日期的小时数提取为整数。

37.pyspark.sql.functions.hypot(col1, col2)
计算sqrt(a ^ 2 ^ + b ^ 2 ^),无中间上溢或下溢。

38.pyspark.sql.functions.initcap(col)
在句子中将每个单词的第一个字母翻译成大写。

39.pyspark.sql.functions.isnan(col)
如果列是NaN,则返回true的表达式。

40.pyspark.sql.functions.kurtosis(col)
聚合函数:返回组中的值的峰度。

41.pyspark.sql.functions.last(col)
聚合函数:返回组中的最后一个值。

42.pyspark.sql.functions.last_day(date)
返回给定日期所属月份的最后一天。

43.pyspark.sql.functions.lit(col)
创建一个文字值的列

44.pyspark.sql.functions.log(arg1, arg2=None)
返回第二个参数的第一个基于参数的对数。
如果只有一个参数,那么这个参数就是自然对数。

45.pyspark.sql.functions.log1p(col)
计算给定值的自然对数加1。

46.pyspark.sql.functions.log2(col)
返回参数的基数为2的对数。

47.pyspark.sql.functions.lower(col)
将字符串列转换为小写。

48.pyspark.sql.functions.ltrim(col)
从左端修剪指定字符串值的空格。

49.pyspark.sql.functions.minute(col)
提取给定日期的分钟数为整数

50.pyspark.sql.functions.monotonically_increasing_id()
生成单调递增的64位整数的列。

生成的ID保证是单调递增和唯一的,但不是连续的。 当前的实现将分区ID放在高31位,并将每个分区内的记录号放在低33位。 假设
数据帧的分区少于10亿个,每个分区少于80亿条记录

例如,考虑一个DataFrame有两个分区,每个分区有三个记录。 该表达式将返回以下ID:0,1,2,8589934592(1L << 33),
8589934593,8589934594

51.pyspark.sql.functions.month(col)
将给定日期的月份提取为整数

52.pyspark.sql.functions.months_between(date1, date2)
返回date1和date2之间的月数。

53.pyspark.sql.functions.rand(seed=None)
用i.i.d生成一个随机列 来自样本[0.0,1.0]。

54.pyspark.sql.functions.randn(seed=None)
用i.i.d生成一列 来自标准正态分布的样本。

55.pyspark.sql.functions.reverse(col)
反转字符串列并将其作为新的字符串列返回

56.pyspark.sql.functions.rtrim(col)
从右端修剪指定字符串值的空格

57.pyspark.sql.functions.skewness(col)
聚合函数:返回组中值的偏度

58.pyspark.sql.functions.sort_array(col, asc=True)
集合函数:按升序对给定列的输入数组进行排序。
参数:col – 列或表达式名称

59.pyspark.sql.functions.split(str, pattern)
将模式分割(模式是正则表达式)。
注:pattern是一个字符串表示正则表达式。

60.pyspark.sql.functions.sqrt(col)
计算指定浮点值的平方根

61.pyspark.sql.functions.stddev(col)
聚合函数:返回组中表达式的无偏样本标准差

62.pyspark.sql.functions.sumDistinct(col)
聚合函数:返回表达式中不同值的总和

63.pyspark.sql.functions.to_date(col)
将StringType或TimestampType的列转换为DateType

64.pyspark.sql.functions.trim(col)
修剪指定字符串列的两端空格。

65.pyspark.sql.functions.trunc(date, format)
返回截断到格式指定单位的日期

参数: format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’

66.pyspark.sql.functions.var_samp(col)
聚合函数:返回组中值的无偏差

67.pyspark.sql.functions.variance(col)
聚合函数:返回组中值的总体方差

参考:原文:https://blog.csdn.net/htbeker/article/details/86233819

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/66463
推荐阅读
相关标签
  

闽ICP备14008679号