赞
踩
基于 mac 操作系统
pip install pyspark
“multiLine”
模式,否则会报错data_path = "./test_file.json" # 本地
# data_path = "hdfs://..."
df = spark.read.json(data_path)
df = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json(data_path)
data_path = "hdfs://..."
df = spark.read.parquet(data_path)
一切操作之前需要先建立一个SparkSession对象(运行Spark code的Entrance point,可以理解为交互部件):
详见: pyspark.sql module
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
# spark = SparkSession.builder.appName('mu').master('local').getOrCreate()
Traceback (most recent call last): File "/Users/my_name/caogao/code_test_1/code_test_pyspark.py", line 5, in <module> spark = SparkSession.builder.master("local").getOrCreate() File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/sql/session.py", line 186, in getOrCreate sc = SparkContext.getOrCreate(sparkConf) File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 376, in getOrCreate SparkContext(conf=conf or SparkConf()) File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 136, in __init__ conf, jsc, profiler_cls) File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 198, in _do_init self._jsc = jsc or self._initialize_context(self._conf._jconf) File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 315, in _initialize_context return self._jvm.JavaSparkContext(jconf) File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/java_gateway.py", line 1569, in __call__ answer, self._gateway_client, None, self._fqn) File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
则在开头添加代码
import pyspark
conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1')
sc = pyspark.SparkContext(master='local', appName='myAppName',conf=conf)
参考:解决方案
test = [] test.append((1, 'age', '30', 50, 40)) test.append((1, 'city', 'beijing', 50, 40)) test.append((1, 'gender', 'fale', 50, 40)) test.append((1, 'height', '172cm', 50, 40)) test.append((1, 'weight', '70kg', 50, 40)) test.append((2, 'age', '26', 100, 80)) test.append((2, 'city', 'beijing', 100, 80)) test.append((2, 'gender', 'fale', 100, 80)) test.append((2, 'height', '170cm', 100, 80)) test.append((2, 'weight', '65kg', 100, 80)) test.append((3, 'age', '35', 99, 99)) test.append((3, 'city', 'nanjing', 99, 99)) test.append((3, 'gender', 'female', 99, 99)) test.append((3, 'height', '161cm', 99, 99)) test.append((3, 'weight', '50kg', 99, 99)) df = spark.createDataFrame(test, ['user_id', 'attr_name','attr_value', 'income', 'expenses'])
或者直接
df = spark.createDataFrame([('1', 'Joe', '70000', '1'), ('2', 'Henry', '80000', None)],
['Id', 'Name', 'Sallary', 'DepartmentId'])
1. 打印数据
df.show()默认打印前20条数据,当然可以指定具体打印多少条数据。
如果有些属性值特别长,pyspark会截断数据导致打不全,这时候可以使用. df.show(truncate=False)
>>> df.show() +-------+---------+----------+------+--------+ |user_id|attr_name|attr_value|income|expenses| +-------+---------+----------+------+--------+ | 1| age| 30| 50| 40| | 1| city| beijing| 50| 40| | 1| gender| fale| 50| 40| | 1| height| 172cm| 50| 40| | 1| weight| 70kg| 50| 40| | 2| age| 26| 100| 80| | 2| city| beijing| 100| 80| | 2| gender| fale| 100| 80| | 2| height| 170cm| 100| 80| | 2| weight| 65kg| 100| 80| | 3| age| 35| 99| 99| | 3| city| nanjing| 99| 99| | 3| gender| female| 99| 99| | 3| height| 161cm| 99| 99| | 3| weight| 50kg| 99| 99| +-------+---------+----------+------+--------+ >>> df.show(3) +-------+---------+----------+------+--------+ |user_id|attr_name|attr_value|income|expenses| +-------+---------+----------+------+--------+ | 1| age| 30| 50| 40| | 1| city| beijing| 50| 40| | 1| gender| fale| 50| 40| +-------+---------+----------+------+--------+ only showing top 3 rows
2. 打印概要
>>> df.printSchema()
root
|-- user_id: long (nullable = true)
|-- attr_name: string (nullable = true)
|-- attr_value: string (nullable = true)
|-- income: long (nullable = true)
|-- expenses: long (nullable = true)
3. 查询总行数
>>> df.count()
15
4. 获取头几行到本地
>>> list = df.head(3)
>>> df.head(3)
[Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40)]
>>> df.take(5)
[Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40), Row(user_id=1, attr_name=u'height', attr_value=u'172cm', income=50, expenses=40), Row(user_id=1, attr_name=u'weight', attr_value=u'70kg', income=50, expenses=40)]
5. 查询某列为null的行
>>> from pyspark.sql.functions import isnull
>>> df = df.filter(isnull("income"))
>>> df.show()
19/02/22 17:05:51 WARN DFSClient: Slow ReadProcessor read fields took 87487ms (threshold=30000ms); ack: seqno: 198 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 17565965 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
+-------+---------+----------+------+--------+
6. 输出list类型,list中每个元素是Row类:
>>> df.collect()
[Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40), Row(user_id=1, attr_name=u'height', attr_value=u'172cm', income=50, expenses=40), Row(user_id=1, attr_name=u'weight', attr_value=u'70kg', income=50, expenses=40), Row(user_id=2, attr_name=u'age', attr_value=u'26', income=100, expenses=80), Row(user_id=2, attr_name=u'city', attr_value=u'beijing', income=100, expenses=80), Row(user_id=2, attr_name=u'gender', attr_value=u'fale', income=100, expenses=80), Row(user_id=2, attr_name=u'height', attr_value=u'170cm', income=100, expenses=80), Row(user_id=2, attr_name=u'weight', attr_value=u'65kg', income=100, expenses=80), Row(user_id=3, attr_name=u'age', attr_value=u'35', income=99, expenses=99), Row(user_id=3, attr_name=u'city', attr_value=u'nanjing', income=99, expenses=99), Row(user_id=3, attr_name=u'gender', attr_value=u'female', income=99, expenses=99), Row(user_id=3, attr_name=u'height', attr_value=u'161cm', income=99, expenses=99), Row(user_id=3, attr_name=u'weight', attr_value=u'50kg', income=99, expenses=99)]
注:此方法将所有数据全部导入到本地,返回一个Array对象。当然,我们可以取出Array中的值,是一个Row,我们也可以取出Row中的值。
>>> list = df.collect()
>>> 19/02/22 16:54:04 WARN DFSClient: Slow ReadProcessor read fields took 43005ms (threshold=30000ms); ack: seqno: 179 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 18446744073455908425 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
>>> list[0]
Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40)
>>> list[0][1]
u'age'
7. 查询概况
>>> df.describe().show()
19/02/22 16:58:23 WARN DFSClient: Slow ReadProcessor read fields took 78649ms (threshold=30000ms); ack: seqno: 188 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 187817284 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
+-------+------------------+---------+------------------+-----------------+------------------+
|summary| user_id|attr_name| attr_value| income| expenses|
+-------+------------------+---------+------------------+-----------------+------------------+
| count| 15| 15| 15| 15| 15|
| mean| 2.0| null|30.333333333333332| 83.0| 73.0|
| stddev|0.8451542547285166| null| 4.509249752822894|24.15722311383137|25.453037988757707|
| min| 1| age| 161cm| 50| 40|
| max| 3| weight| nanjing| 100| 99|
+-------+------------------+---------+------------------+-----------------+------------------+
8. 去重set操作
>>> df.distinct().show()
+-------+
|user_id|
+-------+
| 1|
| 3|
| 2|
+-------+
df.groupBy("col1").agg(F.countDistinct("col2")).orderBy("col1", ascending=False).show()
# 和下面分多次统计,效果相同
df1 = req_df.filter("col1=1").select("col2").dropDuplicates(subset=["col2"])
df1.count()
...
...
dfn = req_df.filter("col1=n").select("col2").dropDuplicates(subset=["col2"])
dfn.count()
A = [("A", 1, "AAA", "AAAAA"), ("A", 2, "AAA", "AAAAA")] df = spark.createDataFrame(A,['name','id', "name1", "name2"]) df.show() +----+---+-----+-----+ |name| id|name1|name2| +----+---+-----+-----+ | A| 1| AAA|AAAAA| | A| 2| AAA|AAAAA| +----+---+-----+-----+ # 直接 df.dropDuplicates() 只有当整行相同时才能去重 df.dropDuplicates().show() +----+---+-----+-----+ |name| id|name1|name2| +----+---+-----+-----+ | A| 2| AAA|AAAAA| | A| 1| AAA|AAAAA| +----+---+-----+-----+ # 针对某些列去重1 df.dropDuplicates(subset=["name", "name1", "name2"]).show() +----+---+-----+-----+ |name| id|name1|name2| +----+---+-----+-----+ | A| 1| AAA|AAAAA| +----+---+-----+-----+ # 针对某些列去重2 df.dropDuplicates(subset=[c for c in df.columns if c != "id"]).show() +----+---+-----+-----+ |name| id|name1|name2| +----+---+-----+-----+ | A| 1| AAA|AAAAA| +----+---+-----+-----+
1. 选择一列或多列:select
一般来说,select
和 selectExpr
是一样的,区别可以看 Spark—DataFrame学习(二)——select、selectExpr函数
df.select("age").show()
df["age"]
df.age
df.select(“name”)
df.select(df[‘name’], df[‘age’]+1)
df.select(df.a, df.b, df.c) # 选择a、b、c三列
df.select(df["a"], df["b"], df["c"]) # 选择a、b、c三列
2. where按条件选择(filter 和 where 是一样的)
语法:where(conditionExpr: String)
传入筛选条件表达式,可以用and
和or
。得到DataFrame类型的返回结果
注意:字符串 b 需要加引号
>>> df.where("id = 1 or c1 = 'b'" ).show()
+-------+---------+----------+------+--------+
| id |attr_name|attr_value|income| c1 |
+-------+---------+----------+------+--------+
| 1| age| 30| 50| c|
| 2| city| beijing| 50| b|
| 2| gender| fale| 50| b|
| 3| height| 172cm| 50| b|
| 4| weight| 70kg| 50| b|
+-------+---------+----------+------+--------+
3. filter 根据字段选择(filter 和 where 是一样的)
注意:filter 有好几种用法,推荐第一种
df.filter("id = 1 or c1 = 'b'" ).show()
df.filter((df.id =="1") & (df.c1=="b"))
df.filter((df.id =="1") | (df.c1=="b"))
df.filter('id=="1"').filter('c1=="b"')
df.filter("id == 1 or c1 == 'b'")
A = [('Pirate',True),('Monkey',False), ('Ninja',True),('Dodo',False), ('Spa',False)] df = spark.createDataFrame(A,['name','is_boy']) df.show() +------+------+ | name|is_boy| +------+------+ |Pirate| true| |Monkey| false| | Ninja| true| | Dodo| false| | Spa| false| +------+------+ # 大写 True 可以 df.filter("is_boy=True").show() +------+------+ | name|is_boy| +------+------+ |Pirate| true| | Ninja| true| +------+------+ # 小写 true 也可以 df.filter("is_boy=true").show() +------+------+ | name|is_boy| +------+------+ |Pirate| true| | Ninja| true| +------+------+ # 下面这种写法也可以(默认=True) df.filter("is_boy").show() +------+------+ | name|is_boy| +------+------+ |Pirate| true| | Ninja| true| +------+------+ df.filter("is_boy=False").show() +------+------+ | name|is_boy| +------+------+ |Monkey| false| | Dodo| false| | Spa| false| +------+------+
可以有2种用法
import pyspark.sql.functions as F df_.show() +----+-----+ |name|value| +----+-----+ | a| null| | b| 2| | c| null| +----+-----+ df_.filter("value is null").show() df_.filter(F.col("value").isNull()).show() +----+-----+ |name|value| +----+-----+ | a| null| | c| null| +----+-----+ df_.filter("value is not null").show() df_.filter(F.col("value").isNotNull()).show() +----+-----+ |name|value| +----+-----+ | b| 2| +----+-----+
df_.show() +----+-----+ |name|value| +----+-----+ | a| | | b| 2| | c| | +----+-----+ df_.filter("value=''").show() # 空字符串 +----+-----+ |name|value| +----+-----+ | a| | | c| | +----+-----+
1. orderBy:按指定字段排序,默认为升序
>>> df.orderBy(df.income.desc()).show() 19/02/22 18:02:31 WARN DFSClient: Slow ReadProcessor read fields took 87360ms (threshold=30000ms); ack: seqno: 325 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 14139744 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]] +-------+---------+----------+------+--------+ |user_id|attr_name|attr_value|income|expenses| +-------+---------+----------+------+--------+ | 2| gender| fale| 100| 80| | 2| weight| 65kg| 100| 80| | 2| height| 170cm| 100| 80| | 2| age| 26| 100| 80| | 2| city| beijing| 100| 80| | 3| gender| female| 99| 99| | 3| age| 35| 99| 99| | 3| height| 161cm| 99| 99| | 3| weight| 50kg| 99| 99| | 3| city| nanjing| 99| 99| | 1| age| 30| 50| 40| | 1| height| 172cm| 50| 40| | 1| city| beijing| 50| 40| | 1| weight| 70kg| 50| 40| | 1| gender| fale| 50| 40| +-------+---------+----------+------+--------+
sample是抽样函数,其中withReplacement = True or False代表是否有放回。42是seed。
t1 = train.sample(False, 0.2, 42)
withColumn
方法from pyspark.sql.functions import lit
df.withColumn('newCol', lit(0)).show()
## 输出
+---+-----+-------+------------+------+
| Id| Name|Sallary|DepartmentId|newCol|
+---+-----+-------+------------+------+
| 1| Joe| 70000| 1| 0|
| 2|Henry| 80000| null| 0|
+---+-----+-------+------------+------+
# spark-1 # 在创建dataframe的时候重命名 data = spark.createDataFrame(data=[("Alberto", 2), ("Dakota", 2)], schema=['name','length']) data.show() data.printSchema() # spark-2 # 使用selectExpr方法 # 原始column as 修改之后的column # cast 是修改整列的属性 color_df2 = color_df.selectExpr('cast(color as long) as color2','length as length2') color_df2.show() # spark-3 # withColumnRenamed方法 color_df2 = color_df.withColumnRenamed('color','color2')\ .withColumnRenamed('length','length2') color_df2.show() # spark-4 # alias 方法 color_df.select(color_df.color.alias('color2')).show()
In [63]: df.groupby('Sallary').count().show()
+-------------+-----+
|app_category2|count|
+-------------+-----+
| null| 231|
| 77| 215|
| 81| 378|
| 84| 14|
+-------------+-----+
注意!正确参数是 ascending,如果误拼写成 ascending,不会报错,但是不能正确排序,要注意!!!
valuesA = [('Pirate','boy',1),('Monkey','girl',2),('Monkey','boy',3),('Ninja','girl',3),('Spa','boy',4), ('Spa','boy',5), ('Spa','girl',7)] df = spark.createDataFrame(valuesA,['name','sex','value']) In [8]: df.show() +------+----+-----+ | name| sex|value| +------+----+-----+ |Pirate| boy| 1| |Monkey|girl| 2| |Monkey| boy| 3| | Ninja|girl| 3| | Spa| boy| 4| | Spa| boy| 5| | Spa|girl| 7| +------+----+-----+ # 从大到小排序 df.groupBy("name", "sex").count().orderBy("count", ascending=False).show() +------+----+-----+ | name| sex|count| +------+----+-----+ | Spa| boy| 2| |Monkey| boy| 1| | Spa|girl| 1| |Monkey|girl| 1| |Pirate| boy| 1| | Ninja|girl| 1| +------+----+-----+
from pyspark.sql import functions as F df.show() +---+-----+----+ | id|value|name| +---+-----+----+ | a| null| Leo| | a| 11|null| | a| 11|Mike| | a| 22| Leo| +---+-----+----+ # collect_list 汇总到列表中;collect_set 汇总到列表中,再去重 df.groupBy("id").agg(F.sum("value").alias("value_sum"), F.collect_set("value").alias("value_collect_set"), F.collect_list("name").alias("name_collect_list")).show() +---+---------+-----------------+-----------------+ | id|value_sum|value_collect_set|name_collect_list| +---+---------+-----------------+-----------------+ | a| 44| [22, 11]| [Leo, Mike, Leo]| +---+---------+-----------------+-----------------+
df_.show() +----+--------------------+ |name| ad_list| +----+--------------------+ | a| [1, 2]| | b|[11, 22, 33, 44, 55]| +----+--------------------+ df_.withColumn("new", F.explode("ad_list")).show() +----+--------------------+---+ |name| ad_list|new| +----+--------------------+---+ | a| [1, 2]| 1| | a| [1, 2]| 2| | b|[11, 22, 33, 44, 55]| 11| | b|[11, 22, 33, 44, 55]| 22| | b|[11, 22, 33, 44, 55]| 33| | b|[11, 22, 33, 44, 55]| 44| | b|[11, 22, 33, 44, 55]| 55| +----+--------------------+---+
当pk_key为列表时,可以用星号 *pk_key 来取出pk_key的值(似乎不用星号也行,还没看到不用星号会失败的情况)
详见下面关于 python 中单星号的用法(解压参数列表)
df1.show() +----+-----+ |name|value| +----+-----+ | a| 1| | a| 2| | a| 2| +----+-----+ df1.groupBy(*pk).agg(F.sum("value")).show() +----+-----+----------+ |name|value|sum(value)| +----+-----+----------+ | a| 1| 1| | a| 2| 4| +----+-----+----------+ # 在这里,pk 带不带星号,没有区别 df1.groupBy(pk).agg(F.sum("value")).show() +----+-----+----------+ |name|value|sum(value)| +----+-----+----------+ | a| 1| 1| | a| 2| 4| +----+-----+----------+
单星号的其中一个用法是解压参数列表
def func(a, b):
print a, b
param = [1, 2]
func(*param)
1 2
left join
,否则,用普通 join 即可在使用 left join 的时候,左表比右表大,join 不上的部分,会以 null
显示,需要手动把这些null替换为其他值,便于计算,比如替换为0
# 这个参数使用的场合为:假如某个字段默认是null,你想其返回的不是null,而是比如0或其他值,可以使用这个函数
df = df.join(df1,'t_id','left').withColumn('is_name',F.coalesce('my_col',F.lit(0))).drop('my_col')
# 其实就是把 my_col 列中为 NULL 的替换为 0
df = spark.createDataFrame([('1', 'Joe'), ('4', 'Henry'), ('1', 'Nan'), ('4', 'Hesssnry')], ['ad_id', 'Name']) df2 = spark.createDataFrame([('1', 'A'), ('4', 'B'), ('5', 'C')], ['ad_id', 'ocpc_type']) df3 = df2.join(df, on='ad_id', how='left') df3.show() +-----+---------+--------+ |ad_id|ocpc_type| Name| +-----+---------+--------+ | 5| C| null| | 1| A| Joe| | 1| A| Nan| | 4| B| Henry| | 4| B|Hesssnry| +-----+---------+--------+ df3.filter('ocpc_type == "A"').show() +-----+---------+----+ |ad_id|ocpc_type|Name| +-----+---------+----+ | 1| A| Joe| | 1| A| Nan| +-----+---------+----+ # 如果变换下join到顺序 df3 = df.join(df2, on='ad_id', how='left') df3.show() +-----+--------+---------+ |ad_id| Name|ocpc_type| +-----+--------+---------+ | 1| Joe| A| | 1| Nan| A| | 4| Henry| B| | 4|Hesssnry| B| +-----+--------+---------+
可以理解为,哪个表在join操作的前面,就以其为主,后面的为补充
left_semi
取 df1 和 df2 相交的部分,df1的数据
left_anti
取 df1 和 df2 相交的部分,df1的余下数据
df1.show() +---+---+ | id|num| +---+---+ | A| 1| | B| 2| | C| 3| +---+---+ df2.show() +---+---+ | id|num| +---+---+ | C| 33| | D| 4| | E| 5| +---+---+ # 1. 普通的 left join,右表中 join 不上的会以 null 填充 # left/left_outer/leftouter 都是一样的 df1.join(df2, "id", "left").show() +---+---+----+ | id|num| num| +---+---+----+ | B| 2|null| | C| 3| 33| | A| 1|null| +---+---+----+ # 2. 取 df1 和 df2 相交的部分,df1的数据(注意到 num 的取值为3,而不是33) # # semi/leftsemi/left_semi 都是一样的 df1.join(df2, "id", "left_semi").show() +---+---+ | id|num| +---+---+ | C| 3| +---+---+ # 3. 取 df1 和 df2 相交的部分,df1的余下数据(从左表中去掉右表存在的部分) # anti/leftanti/left_anti 都是一样的 df1.join(df2, "id", "left_anti").show() +---+---+ | id|num| +---+---+ | B| 2| | A| 1| +---+---+
当某个表 join 时,如果 join 的 pk_key 有重复的话,会出现组合爆炸的情况,需要保证 join 双方都没有重复的 pk_key
valuesA = [('Pirate',1),('Monkey',2),('Monkey',3),('Ninja',3),('Spaghetti',4)] TableA = spark.createDataFrame(valuesA,['name','id']) valuesB = [('Rutabaga',11) ,('Monkey',22) ,('Monkey',222),('Ninja',33),('Darth Vader',44)] TableB = spark.createDataFrame(valuesB,['name','id2']) TableA.join(TableB,on='name').show(50,False) +------+---+---+ |name |id |id2| +------+---+---+ |Ninja |3 |33 | |Monkey|2 |222| |Monkey|2 |22 | |Monkey|3 |222| |Monkey|3 |22 | +------+---+---+ # left join 保证了左表的数据不丢失,join 不上的,右表会以 null 填充 TableA.join(TableB,on='name',how='left').show(50,False) +---------+---+----+ |name |id |id2 | +---------+---+----+ |Spaghetti|4 |null| |Ninja |3 |33 | |Pirate |1 |null| |Monkey |2 |22 | |Monkey |2 |222 | |Monkey |3 |22 | |Monkey |3 |222 | +---------+---+----+ # 由于 tableA 和 tableB 中虽然有重复的 pk_key,但是值是不一样的,没法去重 TableA.dropDuplicates().join(TableB.dropDuplicates(),on='name').show(50,False) +------+---+---+ |name |id |id2| +------+---+---+ |Ninja |3 |33 | |Monkey|3 |22 | |Monkey|3 |222| |Monkey|2 |22 | |Monkey|2 |222| +------+---+---+
In [25]: valuesC = [('Pirate',1),('Monkey',222),('Monkey',111),('Ninja',3),('Spaghetti',4)] In [26]: TableC = spark.createDataFrame(valuesC,['name','id']) In [27]: TableC.show() +---------+---+ | name| id| +---------+---+ | Pirate| 1| | Monkey|222| | Monkey|111| | Ninja| 3| |Spaghetti| 4| +---------+---+ In [28]: TableC.dropDuplicates().show() +---------+---+ | name| id| +---------+---+ | Pirate| 1| | Ninja| 3| | Monkey|111| | Monkey|222| |Spaghetti| 4| +---------+---+ ---------------------------------------------------------------------------------------------- In [23]: valuesC = [('Pirate',1),('Monkey',222),('Monkey',222),('Ninja',3),('Spaghetti',4)] In [24]: TableC = spark.createDataFrame(valuesC,['name','id']) In [25]: TableC.show() +---------+---+ | name| id| +---------+---+ | Pirate| 1| | Monkey|222| | Monkey|222| | Ninja| 3| |Spaghetti| 4| +---------+---+ # 去重, join 之前必须保证 join 两者表中去重过 In [26]: TableC.dropDuplicates().show() +---------+---+ | name| id| +---------+---+ | Pirate| 1| | Ninja| 3| | Monkey|222| |Spaghetti| 4| +---------+---+
注意,如果列名重复,join 之后会出现重复列
df1 = spark.createDataFrame([("A", 1), ("B", 2)], ["name", "num"]) df1.show() +----+---+ |name|num| +----+---+ | A| 1| | B| 2| +----+---+ df2 = spark.createDataFrame([("A", 1), ("B", 2), ("C", 3)], ["name", "num"]) df2.show() +----+---+ |name|num| +----+---+ | A| 1| | B| 2| | C| 3| +----+---+ df3 = df1.join(df2, "name") df3.show() +----+---+---+ |name|num|num| +----+---+---+ | B| 2| 2| | A| 1| 1| +----+---+---+
from pyspark.sql import functions as F
mysql
SELECT COALESCE(NULL, NULL, 1);
Return 1
# 如果传入的参数所有都是null,则返回null,比如
SELECT COALESCE(NULL, NULL, NULL, NULL);
Return NULL
# 参数说明:如果a==null,则选择b;如果b==null,则选择c;如果a!=null,则选择a;如果a b c 都为null ,则返回为null(没意义)
select coalesce(a,b,c);
Spark
# 这个参数使用的场合为:假如某个字段默认是null,你想其返回的不是null,而是比如0或其他值,可以使用这个函数
df = df.join(df1,'t_id','left').withColumn('is_name',F.coalesce('my_col',F.lit(0))).drop('my_col')
# 其实就是把 my_col 列中为 NULL 的替换为 0
法一 (推荐)
# 时间戳转日期(这里的时间戳是毫秒,所以需要除以1000)【推荐】
df.select("time_stamp").withColumn("time_date", F.from_unixtime(F.col("time_stamp")/1000, 'yyyy-MM-dd HH:mm:ss.SS')).filter("time_date > '2022-10-19 21:00:00'")
# 时间戳转日期(这里的server_time时间戳是毫秒,所以需要除以1000)【麻烦一些】
df = df.withColumn("server_time_ts", (F.col("server_time").cast(LongType()) / 1000.).cast(LongType())).withColumn("server_time_date", F.from_unixtime("server_time_ts"))
# 日期转时间戳
df.select("time_date").withColumn("time_stamp", unix_timestamp("time_date", "yyyy-MM-dd HH:mm:ss"))
法二
注意! 这里用到的是 spark SQL 的语法,而不是python的语法 ,参考 Spark SQL
valuesA = [('Pirate',1609785094),('Monkey',1609785094),('Monkey',1609785094),('Ninja',1609785094),('Spaghetti',0)]
TableA = spark.createDataFrame(valuesA,['name','time'])
new_time = F.expr("FROM_UNIXTIME(`time`, 'yyyy-MM-dd')")
# print new_time 看看
df2 = df.where(new_time == "2021-01-01")
df2.show()
如果是 python 的话,则用下面的语法
#coding:UTF-8
import time
dt = "2016-05-05 20:28:54"
#转换成时间数组
timeArray = time.strptime(dt, "%Y-%m-%d %H:%M:%S")
#转换成新的时间格式(20160505-20:28:54)
dt_new = time.strftime("%Y%m%d-%H:%M:%S",timeArray)
print dt_new
参考:Spark sql实战–如何比较两个dataframe是否相等
a = [('Pirate',1),('Monkey',2)] A = spark.createDataFrame(a,['name','id']) In [3]: A.show() +------+---+ | name| id| +------+---+ |Pirate| 1| |Monkey| 2| +------+---+ b = [('Monkey',2),('Pirate',1)] B = spark.createDataFrame(b,['name','id']) In [6]: B.show() +------+---+ | name| id| +------+---+ |Monkey| 2| |Pirate| 1| +------+---+ def match_df(df1, df2): count1 = len(df1.subtract(df2).take(1)) count2 = len(df2.subtract(df1).take(1)) return True if count1 == count2 and count1 == 0 else False print match_df(A, B) True
# df1不在df2中的部分,可以理解为 df1-(df1和df2的交集) df1.subtract(df2) In [31]: df1.show() +-----+ |value| +-----+ | 1| | 2| | 3| +-----+ In [32]: df2.show() +-----+ |value| +-----+ | 2| | 3| | 4| +-----+ In [33]: df1.subtract(df2).show() +-----+ |value| +-----+ | 1| +-----+
df1.intersect(df2)
df1.union(df2)
# 并去重
df1.union(df2).distinct()
df1 = spark.createDataFrame([("A", 1, 0), ("B", 1, 0)], ["id", "is_girl", "is_boy"]) +---+-------+------+ | id|is_girl|is_boy| +---+-------+------+ | A| 1| 0| | B| 1| 0| +---+-------+------+ df2 = spark.createDataFrame([("C", 1, 0), ("D", 1, 0)], ["id", "is_boy", "is_girl"]) +---+------+-------+ | id|is_boy|is_girl| +---+------+-------+ | C| 1| 0| | D| 1| 0| +---+------+-------+ # 直接union的话,由于字段顺序不同,只会机械得将2张表组合在一起,并不会自动调换字段的顺序 # 这样拼接是错误的!!! df1.union(df2).show() +---+-------+------+ | id|is_girl|is_boy| +---+-------+------+ | A| 1| 0| | B| 1| 0| | C| 1| 0| | D| 1| 0| +---+-------+------+ # 需要手动修改字段顺序,保证字段顺序一致 df1.selectExpr("id", "is_girl", "is_boy").union(df2.selectExpr("id", "is_girl", "is_boy")).show() +---+-------+------+ | id|is_girl|is_boy| +---+-------+------+ | A| 1| 0| | B| 1| 0| | C| 0| 1| | D| 0| 1| +---+-------+------+
df1 = spark.createDataFrame([("A", 1), ("A", 11), ("B", 2), ("B", 3)], ["name", "num"]).select("name") df2 = spark.createDataFrame([("A", 1), ("B", 2), ("B", 3)], ["name", "num"]).select("name") df1.show() +----+ |name| +----+ | A| | A| | B| | B| +----+ df2.show() +----+ |name| +----+ | A| | B| | B| +----+ # intersect 自带左右两端去重 In [28]: df1.intersect(df2).show() +----+ |name| +----+ | B| | A| +----+ # 如果有重复,join会导致重复更严重 In [29]: df1.join(df2, "name").show() +----+ |name| +----+ | B| | B| | B| | B| | A| | A| +----+ # 手动两端去重(和 intersect 效果一样了) In [30]: df1.dropDuplicates().join(df2.dropDuplicates(), "name").show() +----+ |name| +----+ | B| | A| +----+
valuesA = [('Pirate',1),('Monkey',2),('Monkey',3),('Ninja',3),('Spaghetti',4)] A = spark.createDataFrame(valuesA,['name','id']) ########## 法一 ############ A.agg({'id': 'avg'}).show() +-------+ |avg(id)| +-------+ | 2.6| +-------+ A.agg({'id': 'sum'}).show() +-------+ |sum(id)| +-------+ | 13| +-------+ ############ 法二 ############ from pyspark.sql import functions as F A.agg(F.avg('id').alias('id_avg')).show() +------+ |id_avg| +------+ | 2.6| +------+ A.agg(F.sum('id').alias('id_sum')).show() +------+ |id_sum| +------+ | 13| +------+
collect 可以取出列中的元素值
import pyspark.sql.functions as F A = [[1,'CAT1',10], [2, 'CAT2', 20], [3, 'CAT3', 70]] df = spark.createDataFrame(A, ['id', 'cate', 'value']) df.show() +---+----+-----+ | id|cate|value| +---+----+-----+ | 1|CAT1| 10| | 2|CAT2| 20| | 3|CAT3| 70| +---+----+-----+ # 求列和 法一 df.agg(F.sum("value")).show() +----------+ |sum(value)| +----------+ | 100| +----------+ # 求列和 法二 df.groupBy("cate").sum("value").show() +----+----------+ |cate|sum(value)| +----+----------+ |CAT2| 20| |CAT1| 10| |CAT3| 70| +----+----------+ # 求列和 法三 df.groupBy("value").sum().collect() Out[36]: [Row(value=10, sum(id)=1, sum(value)=10), Row(value=20, sum(id)=2, sum(value)=20), Row(value=70, sum(id)=3, sum(value)=70)] df.groupBy("value").sum().collect()[0][1] Out[37]: 1 df.groupBy("value").sum().collect()[0][2] Out[38]: 10 # 求列和 法四 df.agg({"value":"sum"}).collect() Out[39]: Row(sum(value)=100) df.agg({"value":"sum"}).collect()[0][0] Out[41]: 100 # 求列和 法五(推荐) df.agg(F.sum("value")).collect()[0][0] Out[47]: 100
开始求占比
# 获取列求和值 value_sum = df.agg(F.sum("value")).collect()[0][0] # 新增一列 df2 = df.withColumn("sum", F.lit(value_sum)) df2.show() +---+----+-----+---+ | id|cate|value|sum| +---+----+-----+---+ | 1|CAT1| 10|100| | 2|CAT2| 20|100| | 3|CAT3| 70|100| +---+----+-----+---+ df2 = df2.withColumn("ratio", F.round(F.col("value") / F.col("sum"), 3)) df2.show() +---+----+-----+---+-----+ | id|cate|value|sum|ratio| +---+----+-----+---+-----+ | 1|CAT1| 10|100| 0.1| | 2|CAT2| 20|100| 0.2| | 3|CAT3| 70|100| 0.7| +---+----+-----+---+-----+
import pyspark.sql.functions as F df1.show() +----+-----+ |name|value| +----+-----+ | a| 1| | b| 2| +----+-----+ # 将两列通过下划线 “_”,进行合并 df1.select(F.concat_ws("_", F.col("name"), F.col("value").alias("name_value")), "name").show() # 或者 df1.withColumn("new_col", F.concat_ws('_', 'col1', 'col2')) +-----------------------------------------+----+ |concat_ws(_, name, value AS `name_value`)|name| +-----------------------------------------+----+ | a_1| a| | b_2| b| +-----------------------------------------+----+
参考:【Pyspark】UDF函数的使用、UDF传入多个参数、UDF传出多个参数、传入特殊数据类型
from pyspark.sql.types import ArrayType, IntegerType from pyspark.sql import functions as F A = [("a", [1,2,3], [10, 20, 30]), ("b", [4, 5, 6], [100, 200, 300])] df1 = spark.createDataFrame(A, ["name", "value1", "value2"]) df1.show() +----+---------+---------------+ |name| value1| value2| +----+---------+---------------+ | a|[1, 2, 3]| [10, 20, 30]| | b|[4, 5, 6]|[100, 200, 300]| +----+---------+---------------+ # 自定义函数 def func(list1, list2): """ list1 和 list2 分别是表的两个列名 """ list3 = [] for i, j in zip(list1, list2): list3.append(i * j) return list3 # udf 需要指定函数的输出类型,这里是整数列表 func_udf = F.udf(func, ArrayType(IntegerType())) df2 = df1.withColumn("new_col", func_udf("value1", "value2")) df2.show() +----+---------+---------------+-----------------+ |name| value1| value2| new_col| +----+---------+---------------+-----------------+ | a|[1, 2, 3]| [10, 20, 30]| [10, 40, 90]| | b|[4, 5, 6]|[100, 200, 300]|[400, 1000, 1800]| +----+---------+---------------+-----------------+
参考:
需求是,先对表中数据分组,再在组内进行排序
from pyspark.sql import Window from pyspark.sql import functions as F df = spark.createDataFrame(( ["A", 1, "Science", 20], ["B", 1, "Science", 80], ["C", 2, "Science", 90], ["D", 2, "Science", 40], ["E", 3, "Science", 60], ["F", 4, "Art", 60], ["G", 4, "Art", 50], ["H", 5, "Art", 90], ["I", 5, "Art", 100], ["J", 6, "Art", 20], ), ["name", "class", "subject", "score"]) # 按照 subject 分组,而后按照 score 从大到小排序 # 从大到小排序 F.desc("score") ,从小到大排序 F.asc("score") window = Window.partitionBy("subject").orderBy(F.desc("score")) df = df.withColumn("rank", F.row_number().over(window)) +----+-----+-------+-----+----+ |name|class|subject|score|rank| +----+-----+-------+-----+----+ | C| 2|Science| 90| 1| | B| 1|Science| 80| 2| | E| 3|Science| 60| 3| | D| 2|Science| 40| 4| | A| 1|Science| 20| 5| | I| 5| Art| 100| 1| | H| 5| Art| 90| 2| | F| 4| Art| 60| 3| | G| 4| Art| 50| 4| | J| 6| Art| 20| 5| +----+-----+-------+-----+----+ # 过滤出每组的第一名 df.filter("rank=1").show() +----+-----+-------+-----+----+ |name|class|subject|score|rank| +----+-----+-------+-----+----+ | C| 2|Science| 90| 1| | I| 5| Art| 100| 1| +----+-----+-------+-----+----+
pandas 的 cumsum()
函数可以实现对列的累积求和。使用示例如下:
import pandas as pd
data = [1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0]
data = pd.DataFrame(data, columns=['diff'])
data['cumsum_num'] = data['diff'].cumsum()
print(data)
输出结果:
diff cumsum_num
0 1 1
1 0 1
2 0 1
3 0 1
4 1 2
5 0 2
6 0 2
7 1 3
8 0 3
9 0 3
10 0 3
对于 pyspark 没有 cumsum()
函数可以直接进行累加求和,若要实现累积求和可以通过对一列有序的列建立排序的 Window
进行求和,代码如下所示:
创建 DataFrame 对象:
import pyspark from pyspark.sql import functions as F from pyspark.sql import SparkSession from pyspark.sql import Window import pandas as pd conf = pyspark.SparkConf().setAll([]) spark_session = SparkSession.builder.appName('test_app').config(conf=conf).getOrCreate() sc = spark_session.sparkContext sc.setLogLevel('WARN') data = [1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0] data = pd.DataFrame(data, columns=['diff']) data['number'] = range(len(data)) df = spark_session.createDataFrame(data, schema=['diff', 'number']) df.show()
原 DataFrame 数据:
+----+------+
|diff|number|
+----+------+
| 1| 0|
| 0| 1|
| 0| 2|
| 0| 3|
| 1| 4|
| 0| 5|
| 0| 6|
| 1| 7|
| 0| 8|
| 0| 9|
| 0| 10|
+----+------+
根据 number 排序实现累积求和:
win = Window.orderBy('number')
df.withColumn('cumsum_num', F.sum(df['diff']).over(win)).show()
结果为:
+----+------+----------+
|diff|number|cumsum_num|
+----+------+----------+
| 1| 0| 1|
| 0| 1| 1|
| 0| 2| 1|
| 0| 3| 1|
| 1| 4| 2|
| 0| 5| 2|
| 0| 6| 2|
| 1| 7| 3|
| 0| 8| 3|
| 0| 9| 3|
| 0| 10| 3|
+----+------+----------+
u"中文"
即可
In [61]: A = [("a", "快手"), ("b", "抖音")] In [62]: df__ = spark.createDataFrame(A, ["id", "name"]) In [63]: df__.show() +---+------+ | id| name| +---+------+ | a|快手| | b|抖音| +---+------+ In [58]: A = [("a", u"快手"), ("b", u"抖音")] In [59]: df__ = spark.createDataFrame(A, ["id", "name"]) In [60]: df__.show() +---+----+ | id|name| +---+----+ | a| 快手| | b| 抖音| +---+----+
from pyspark.sql.functions import split from pyspark.sql import functions as F A = [("A", "20%"), ("B", "18%")] df_ = spark.createDataFrame(A, ["name", "ratio1"]) df2_ = df_.withColumn('ratio1_new', split(F.col("ratio1"), "%").getItem(0) * F.lit(0.01)) df2_.show() +----+------+----------+ |name|ratio1|ratio1_new| +----+------+----------+ | A| 20%| 0.2| | B| 18%| 0.18| +----+------+----------+
如果某一列是列表,想要从该列表中取某个元素
getItem(index)
函数getItem(-1)
,需要写 UDF 函数def func(col_list):
try:
res = col_list[-1]
except:
res = -1
return res
func_udf = F.udf(func, StringType()) # 注意,这里是函数的输出类型,类型不对的话,会输出 null
new_df = df.withColumn("col1_item_list", F.split("col1", ':')).withColumn("new_col", func_udf("col1_item_list")).cache()
统计某一列的分位数
df_.show() +----+-----+ |name|value| +----+-----+ | a| 1| | b| 2| | c| 3| | d| 4| | e| 5| | f| 6| | g| 7| | h| 8| | i| 9| | j| 10| +----+-----+ # approxQuantile 第一个参数是列名,第二个参数是分位数,第三个参数是相对误差(relativeError),设定为0时代价巨大 df_.approxQuantile("value", [0.1, 0.5, 0.99], 0.1) # 输出表示每个分位数对应的 value 值 Out[12]: [1.0, 4.0, 10.0]
A = [("a", "aa_aaa"), ("b", "bb_bbb")] df = spark.createDataFrame(A, ["name", "value"]) df.show() +----+------+ |name| value| +----+------+ | a|aa_aaa| | b|bb_bbb| +----+------+ # F.instr(F.col("value"), "aa") 返回的是 "aa" 在 "value" 列中的 index,如果不存在则返回 0 # 判断 字符串"aa"是否在 列 "value" 中 df.withColumn("is_instr", F.when((F.instr(F.col("value"), "aa"))==0, F.lit(0)).otherwise(F.lit(1))).show() +----+------+--------+ |name| value|is_instr| +----+------+--------+ | a|aa_aaa| 1| | b|bb_bbb| 0| +----+------+--------+
A = [("a", 1, None), ("b", None, 2), ("c", None, None)] df_ = spark.createDataFrame(A, ["name", "value1", "value2"]) df_.show() +----+------+------+ |name|value1|value2| +----+------+------+ | a| 1| null| | b| null| 2| | c| null| null| +----+------+------+ df_.fillna({"value1": 0.0, "value2": 11.0}).show() +----+------+------+ |name|value1|value2| +----+------+------+ | a| 1| 11| | b| 0| 2| | c| 0| 11| +----+------+------+
这个方法可以进行复杂的值替换
from pyspark.sql import functions as F A = [("a", 1, None), ("b", None, 2), ("c", None, None)] df_ = spark.createDataFrame(A, ["name", "value1", "value2"]) df_.show() +----+------+------+ |name|value1|value2| +----+------+------+ | a| 1| null| | b| null| 2| | c| null| null| +----+------+------+ # 注意!这个方法如果没有显示指定的值会变成null,所以每一类情况都得考虑 df_.withColumn("value3", F.when(F.col("value1")<10, F.lit(10)).otherwise(F.lit(-10))).show() +----+------+------+------+ |name|value1|value2|value3| +----+------+------+------+ | a| 1| null| 10| | b| null| 2| -10| | c| null| null| -10| +----+------+------+------+ # withColumn出来的新列如果和原先存在的列同名的话会自动覆盖 df_.withColumn("value1",F.when(F.col("value1").isNull(),F.lit(0.0)).otherwise(F.lit(F.col("value1"))))\ .withColumn("value2", F.when(F.col("value2").isNull(),F.lit(11)).otherwise(F.lit(F.col("value2")))).show() +----+------+------+ |name|value1|value2| +----+------+------+ | a| 1.0| 11| | b| 0.0| 2| | c| 0.0| 11| +----+------+------+ # 对现有列的值域进行复杂分类 (1) group1 = ["a"] group2 = ["b"] df = df.withColumn("group", F.when(F.col("name").isin(group1), F.lit("goup_1")).when(F.col("name").isin(group2), F.lit("goup_2")).otherwise(F.lit("group_other"))) +----+------+------+-----------+ |name|value1|value2| group| +----+------+------+-----------+ | a| 1| null| goup_1| | b| null| 2| goup_2| | c| null| null|group_other| +----+------+------+-----------+ # 对现有列的值域进行复杂分类 (2) # 注意等于号是双等于 “==” df = df.withColumn("value", F.when(F.col("value1")==1.0, F.lit("value_is_1")).otherwise(F.lit("value_is_other"))) +----+------+------+--------------+ |name|value1|value2| value| +----+------+------+--------------+ | a| 1| null| value_is_1| | b| null| 2|value_is_other| | c| null| null|value_is_other| +----+------+------+--------------+ # 同时满足多个条件 df = df\ .withColumn("col_pair", F.concat_ws('_', 'col1', 'col2'))\ # 组合多列成 "col1_col2" .withColumn("new_col", F.when((F.col("valid") == '1') & (F.col('col_pair') != '4_9'), F.lit(1)).otherwise(F.lit(0))) ###########【最为推荐!!!】############ # 对于更加复杂的表达式,可以使用 F.expr() df = df.withColumn("new_col", F.when(F.expr("value1>0.5 and value1<1.5"), F.lit("111")).otherwise(F.lit("value_is_other")))
使用双等于号 “==”
df.show() +----+-----+ |name|value| +----+-----+ | a| true| | b|false| | c| true| +----+-----+ df.withColumn("value_new", F.when(F.col("value")==True, F.lit(1)).otherwise(F.lit(0))).show() +----+-----+---------+ |name|value|value_new| +----+-----+---------+ | a| true| 1| | b|false| 0| | c| true| 1| +----+-----+---------+
df.write.parquet("/path")
df.toPandas().to_csv("stat.csv", encoding='utf-8')
#!/usr/bin/env python # coding=utf-8 import sys import os cmd = '' cmd += ' /opt/path1/hadoop fs -test -e /path2/20220101 ' res = os.system(cmd) print("res: ", res) if res == 0: print("the data is exist") else: print("the data is not exist")
如果数据存在,则 res==0,否则不为0
本文 [2.8 交集&并集&合集] 也提到 union 的坑,即 union 操作只会机械的拼接,不会按照列名拼接 (虽然有 unionByName
函数,但是本人还是更倾向于事先assert
两个表的字段,防止出错)。
# 法1. 强行指定列名
join_cols_list = ["col1", "col2"]
df1 = df1.selectExpr(*join_cols_list)
df2 = df2.selectExpr(*join_cols_list)
# 法2. 使用 assert
# assert df1.columns == df1.columns
df = df1.union(df2)
虽然本质上并没有产生错误的结论,但是还需要注意
df.show() +----+-----+ |name|value| +----+-----+ | a| 1| | b| 2| +----+-----+ df_1 = df_.select("name") +----+ |name| +----+ | a| | b| +----+ # 虽然 df_1 的字段中只有 name, 但是value字段在原始表 df 中, filter 时仍然可以使用(但是建议显示指定) # 比如在使用 filter 之前使用 assert 确认 某字段 在columns中 # assert "value" in df_1.columns df_1.filter("value=1").show() +----+ |name| +----+ | a| +----+
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。