赞
踩
本文介绍如何通过spark sql对数据进行各种的合并操作,包括:列合并,行合并,相同key的合并等等。
在实际的数据处理场景中,数据的合并操作非常常用,这里介绍如何通过spark sql来完成常用的合并操作。
准备以下数据:
name,address,age,id,time david,shenzhen,31,1,201903 eason,shenzhen,27,2,201904 jarry,wuhan,35,3,201904 aarry2,wuhan1,34,4,201904 barry3,wuhan2,33,5,201904 carry4,wuhan3,32,6,201904 darry5,wuhan4,31,7,201903 earry6,wuhan9,30,8,201903 david,shenzhen,31,1,201903 eason,shenzhen,27,2,201904 jarry,wuhan,35,3,201904 aarry2,wuhan1,34,4,201904 barry3,wuhan2,33,5,201904 carry4,wuhan3,32,6,201904 darry5,wuhan4,31,7,201903 earry6,wuhan9,30,8,201903 david,shenzhen,31,1,201903 eason,shenzhen,27,2,201904 carry4,wuhan3,32,6,201904
把以上数据保存到文件:idtimedata.csv中,并把该文件保存到hdfs的/curdata/目录下。
然后先在代码前进行sparksession的初始化:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.enableHiveSupport().appName("spark sql merge data").getOrCreate()
# 可以把文件放在本地,也可以放到hdfs中,这里放到本地为例
df = spark.read.csv("/Users/hover/work/testdata/idtimedata.csv", header=True)
通过SparkSql的接口函数可以方便的对数据进行合并操作。
假设你有一个dataframe:df1,可以通过以下方法来合并该df1的字段。
>>df = spark.createDataFrame([('abcd', '123')], ['s', 'd'])
>>df.select(F.concat_ws('-', "s", "d").alias('s')).show()
# 合并多个列
>> merge_cols = [c for c in df.columns if c != 'id']
>> df.select(F.concat_ws(";", *merge_cols).alias("merged_data")).show(truncate=False)
+------------------------+
|merged_data |
+------------------------+
|david;shenzhen;31;201903|
|eason;shenzhen;27;201904|
|jarry;wuhan;35;201904 |
|aarry2;wuhan1;34;201904 |
|barry3;wuhan2;33;201904 |
|carry4;wuhan3;32;201904 |
|darry5;wuhan4;31;201903 |
|earry6;wuhan9;30;201903 |
...
该函数可以把一列的数据合并成一行,并按python的List方式保存,但注意:该函数不会去重。
>> df.select(F.collect_list("id").alias("id_merged")).show(truncate=False)
+---------------------------------------------------------+
|id_merged |
+---------------------------------------------------------+
|[1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 6]|
+---------------------------------------------------------+
可以通过sort_array对合并的lis值进行排序。
>> df.select(F.sort_array(F.collect_list("id"))).show(truncate=False)
+---------------------------------------------------------+
|sort_array(collect_list(id), true) |
+---------------------------------------------------------+
|[1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 6, 7, 7, 8, 8]|
+---------------------------------------------------------+
通过collect_set函数对合并的数据进行去重。
>> df.select(F.sort_array(F.collect_set("id"))).show(truncate=False)
+---------------------------------+
|sort_array(collect_set(id), true)|
+---------------------------------+
|[1, 2, 3, 4, 5, 6, 7, 8] |
+---------------------------------+
这种情况更加常用。先按某个列聚合,再合并其他列的数据。
# 把相同id的人的name聚合成一个list
>> df.groupby("id").agg(F.collect_list("name").alias("name_merge")).show(truncate=False)
+---+------------------------+
|id |name_merge |
+---+------------------------+
|7 |[darry5, darry5] |
|3 |[jarry, jarry] |
|8 |[earry6, earry6] |
|5 |[barry3, barry3] |
|6 |[carry4, carry4, carry4]|
|1 |[david, david, david] |
|4 |[aarry2, aarry2] |
|2 |[eason, eason, eason] |
+---+------------------------+
>> df.select(F.size(F.collect_list("id"))).show(truncate=False)
+----------------------+
|size(collect_list(id))|
+----------------------+
|19 |
+----------------------+
通过函数pyspark.sql.functions.create_map(*cols)可以把一个或多个key-value,其中key对应一列的值,而valu对应一列的值。
>> df.withColumn("finRes", F.create_map([df.id, df.name])).select("finRes").show(truncate=False) +----------------+ |finRes | +----------------+ |Map(1 -> david) | |Map(2 -> eason) | |Map(3 -> jarry) | |Map(4 -> aarry2)| |Map(5 -> barry3)| |Map(6 -> carry4)| |Map(7 -> darry5)| |Map(8 -> earry6)| |Map(1 -> david) | |Map(2 -> eason) | |Map(3 -> jarry) | |Map(4 -> aarry2)| ...
create_map函数的参数是多个列名,它们必须成对出现。作为key-value。
>> df3 = df.withColumn("finRes", F.create_map([df.id, df.name, df.name, df.age])).select("finRes") +------------------------------+ |finRes | +------------------------------+ |Map(1 -> david, david -> 31) | |Map(2 -> eason, eason -> 27) | |Map(3 -> jarry, jarry -> 35) | |Map(4 -> aarry2, aarry2 -> 34)| |Map(5 -> barry3, barry3 -> 33)| |Map(6 -> carry4, carry4 -> 32)| |Map(7 -> darry5, darry5 -> 31)| |Map(8 -> earry6, earry6 -> 30)| |Map(1 -> david, david -> 31) | |Map(2 -> eason, eason -> 27) | |Map(3 -> jarry, jarry -> 35) | |Map(4 -> aarry2, aarry2 -> 34)| |Map(5 -> barry3, barry3 -> 33)| |Map(6 -> carry4, carry4 -> 32)| |Map(7 -> darry5, darry5 -> 31)| |Map(8 -> earry6, earry6 -> 30)| |Map(1 -> david, david -> 31) | |Map(2 -> eason, eason -> 27) | |Map(6 -> carry4, carry4 -> 32)| +------------------------------+
需要注意的是:create_map不会对相同的key值进行合并。若想把相同key的值进行合并,需要自己来完成。比如先进行groupby,在进行create_map操作。
>> df.withColumn("finRes", F.create_map([df.id, F.concat_ws(',', df.name, df.age)])).select("finRes").show(truncate=False)
+-------------------+
|finRes |
+-------------------+
|Map(1 -> david,31) |
|Map(2 -> eason,27) |
|Map(3 -> jarry,35) |
|Map(4 -> aarry2,34)|
|Map(5 -> barry3,33)|
|Map(6 -> carry4,32)|
|Map(7 -> darry5,31)|
|Map(8 -> earry6,30)|
在spark-2.4中提供了多个对数组值进行操作的函数:
array_union函数可以把两个数组值的列合并在一起。
from pyspark.sql import Row
df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2=["c", "d", "a", "f"])])
df.select(F.array_union(df.c1, df.c2)).collect()
[Row(array_union(c1, c2)=['b', 'a', 'c', 'd', 'f'])]
注意:在使用该函数时,合并的值必须是[]类型。
pyspark.sql.functions.array_except(col1, col2)
array_except返回一个值的数组,该值出现在col1的数组中,但不在col2数组中,返回的值不会重复。
>>> df = spark.createDataFrame([Row(c1=["b", "a", "c", "e", "e"], c2=["c", "d", "a", "f"])])
>>> df.show()
+---------------+------------+
| c1| c2|
+---------------+------------+
|[b, a, c, e, e]|[c, d, a, f]|
+---------------+------------+
>>> df.select(F.array_except(df.c1, df.c2)).show()
+--------------------+
|array_except(c1, c2)|
+--------------------+
| [b, e]|
+--------------------+
说明:可以看到,该函数返回其值在col1数组中出现,而不在col2数组中出现过的值。并以数组的形式返回。
pyspark.sql.functions.array_intersect(col1, col2)
该函数求同时出现在col1和col2的数组中的值,并以数组返回。结果会进行去重。
>>> df = spark.createDataFrame([Row(c1=["b", "a", "c", "c", "e"], c2=["c", "d", "a", "f"])])
>>> df.select(F.array_intersect(df.c1, df.c2)).show()
+-----------------------+
|array_intersect(c1, c2)|
+-----------------------+
| [a, c]|
+-----------------------+
说明:可以看到a,c是同时出现在col1和col2数组值中的元素,结果也去重了。
pyspark.sql.functions.array_join(col, delimiter, null_replacement=None)
可以指定分隔符,若为None,则忽略。若为NULL则替换成NULL值。
>>> df = spark.createDataFrame([(["a", "b", "c"],), (["a", None],)], ['data'])
>>> df.select(F.array_join(df.data, "-").alias("joined")).show()
+------+
|joined|
+------+
| a-b-c|
| a|
+------+
>>> df.select(F.array_join(df.data, ",", "NULL").alias("joined")).show()
+------+
|joined|
+------+
| a,b,c|
|a,NULL|
+------+
array_min:返回每个数组值中的最小值。
array_max:返回每个数组值中的最大值。
pyspark.sql.functions.array_repeat(col, count)
其中count是重复的次数。
>>> df = spark.createDataFrame([('ab',)], ['data'])
>>> df.show()
+----+
|data|
+----+
| ab|
+----+
>>> df.select(F.array_repeat(df.data, 3).alias('r')).show()
+------------+
| r|
+------------+
|[ab, ab, ab]|
+------------+
pyspark.sql.functions.array_sort(col)
以升序对输入数组进行排序。输入数组的元素必须是可排序的。空元素将放置在返回数组的末尾。
>>> df = spark.createDataFrame([([2, 1, None, 3],),([1],),([],)], ['data']) >>> df.show() +----------+ | data| +----------+ |[2, 1,, 3]| | [1]| | []| +----------+ >>> df.select(F.array_sort(df.data).alias('r')).show() +----------+ | r| +----------+ |[1, 2, 3,]| | [1]| | []| +----------+
pyspark.sql.functions.array_distinct(col)
>>> df = spark.createDataFrame([([1, 2, 3, 2],), ([4, 5, 5, 4],)], ['data']) >>> df.show() +------------+ | data| +------------+ |[1, 2, 3, 2]| |[4, 5, 5, 4]| +------------+ >>> df.select(F.array_distinct(df.data)).show() +--------------------+ |array_distinct(data)| +--------------------+ | [1, 2, 3]| | [4, 5]| +--------------------+
pyspark.sql.functions.arrays_zip(*cols)
>>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2']) >>> df.show() +---------+---------+ | vals1| vals2| +---------+---------+ |[1, 2, 3]|[2, 3, 4]| +---------+---------+ >>> df.select(F.arrays_zip(df.vals1, df.vals2).alias('zipped')).show(truncate=False) +------------------------+ |zipped | +------------------------+ |[[1, 2], [2, 3], [3, 4]]| +------------------------+ # 若数组值中对应的位置没有值,则合并时设置为空 >>> df = spark.createDataFrame([(([1, 2], [2, 3, 4]))], ['vals1', 'vals2']) >>> df.select(F.arrays_zip(df.vals1, df.vals2).alias('zipped')).show(truncate=False) +-----------------------+ |zipped | +-----------------------+ |[[1, 2], [2, 3], [, 4]]| +-----------------------+
在使用该函数时需要注意:两个dataframe的列的个数必须相同,并且两个dataframe的列的类型也必须相同,否则会报错。
>>> d2 = [([1], 1), ([3], 2)] >>> df2 = spark.createDataFrame(d2, ['c1', 'c2']) >>> df2.show() +---+---+ | c1| c2| +---+---+ |[1]| 1| |[3]| 2| +---+---+ >>> d1 = [{'name': 'Alice', 'age': 1}] >>> df1 = spark.createDataFrame(d1, ['c1', 'c2']) >>> df1.show() +-----+---+ | c1| c2| +-----+---+ |Alice| 1| | bob| 2| +-----+---+ >>> df2.union(df1).show() 报错,错误信息如下: pyspark.sql.utils.AnalysisException: u"Union can only be performed on tables with the compatible column types. string <> array<bigint> at the first column of the second table;;\n'Union\n:- LogicalRDD [c1#684, c2#685L], false\n+- LogicalRDD [c1#609, c2#610L], false\n"
intersect(other)
intersect函数返回同时在两个dataframe中存在的行。要 注意:该函数要求两个dataframe的列和列的类型必须相同。否则会报错。
>>> d2 = [('name1', 1), ('name2', 2)] >>> df2 = spark.createDataFrame(d2, ['c1', 'c2']) >>> d1 = [('bob', 3)] >>> df1 = spark.createDataFrame(d1, ['c1', 'c2']) >>> df1.intersect(df2).show() +---+---+ | c1| c2| +---+---+ +---+---+ >>> d1 = [('bob', 3), ('name1', 1)] >>> df1 = spark.createDataFrame(d1, ['c1', 'c2']) >>> df1.intersect(df2).show() +-----+---+ | c1| c2| +-----+---+ |name1| 1| +-----+---+
本节讲述了如何通过spark-sql(python)来完成数据的合并。包括列和行的合并,对于日常的数据处理来说这些操作都非常有用,需要熟练掌握。但本节没有包括join的操作讲解,join操作情况比较多,放在另外一节进行讲解。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。