赞
踩
近期致力于总结科研或者工作中用到的主要技术栈,从技术原理到常用语法,这次查缺补漏当作我的小百科。主要技术包括:
以下整理错误或者缺少的部分欢迎指正!!!
Pyspark | Pandas | |
---|---|---|
运行环境 | 分布式计算集群(Hadoop/Apache Spark集群) | 单个计算机 |
数据规模 | 亿级大规模 | 百万级小规模 |
优势 | 分布式计算->并行处理,处理速度快 | API简单->数据处理简单 |
延迟机制 | lazy execution, 执行动作之前不执行任务 | eager execution, 任务立即被执行 |
内存缓存 | persist()/cache()将转换的RDDs保存在内存 | 单机缓存 |
DataFrame可变性 | 不可变,修改则返回一个新的DataFrame | 可变 |
可扩展性 | 好 | 差 |
列名允许重复 | ✓ | × |
# 头文件 from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType # 或者直接导入* import pandas as pd # 创建SparkSession对象 spark = SparkSession.builder \ .appName("username") \ .getOrCreate() # 创建空表 schema = StructType([ StructField('id', LongType()), StructField('type', StringType()), ]) # spark需要指定列名和类型 spark_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=schema) pandas_df = pd.DataFrame(columns=['id', 'type'], index=[0, 1, 2]) # 根据现有数据创建 data = [(1, "Alice", 2000), (2, "Bob", 2001), (3, "Charlie", 2002)] schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("birth_year", IntegerType(), True) ]) spark_df = spark.createDataFrame(data, ["id", "name", "birth_year"]) spark_df = spark.createDataFrame(data, schema) pandas_df = pd.DataFrame(data=data, columns=["id", "name", "birth_year"]) # 读取csv文件 spark_df = spark.read.csv("data.csv", header=True, inferSchema=True) pandas_df = pd.read_csv("data.csv", sep="\t") # read_excel # 保存数据到csv spark_df.write.csv('data.csv', header=True) pandas_df.to_csv("data.csv", index=False) # 读取hive表数据 spark_df = spark.sql('select * from tab') # 保存数据到hive表 spark_df.write.mode('overwrite').saveAsTable('db_name.tab_name') # 相互转换 spark_df = SQLContext.createDataFrame(pandas_df) pandas_df = spark_df.toPandas() # 转换数据类型 spark_df = spark_df.withColumn("A", col("age").cast(StringType)) pandas_df["A"] = pandas_df['A'].astype("int") # 重置索引 spark_df = spark_df.withColumn("id", monotonically_increasing_id()) # 生成一个增长的id列 pandas_df.reset_index() # 切片 pandas_df['a':'c'] # a-c三行 pandas_df.iloc[1:3, 0:2] # 1-2行,0-1列。左闭右开 pandas_df.iloc[[0, 2], [1, 2]] # 第0,2行第0,2列 pandas_df.loc['a':'c', ['A', 'B']] # 第a-c行A,B列 # 选择列 spark_df.select('A', 'B') pandas_df[['A', 'B']] # 删除列 spark_df.drop('A', 'B') pandas_df.drop(['A', 'B'], axis=1, inplace=True) # inplace表示是否创建新对象 # 新增列,设置列值 spark_df = spark_df.withColumn('name', F.lit(0)) pandas_df['name'] = 0 # 修改列值 spark_df.withColumn('name', 1) pandas_df['name'] = 1 # 使用函数修改列值 spark_df = spark_df.withColumn('code', F.when(F.isnull(spark_df.code), 0).otherwise(spark_df.code)) # 修改列名 spark_df.withColumnRenamed('old_name', 'new_name') pandas_df.rename(columns={'old_name1': 'new_name1', 'old_name1': 'new_name2'}, inplace=True) # 显示数据 spark_df.limit(10) # 前10行 spark_df.show/take(10) # collect()返回全部数据 spark_df/pandas_df.first/head/tail(10) # 表格遍历 saprk_df.collect()[:10] spark_df.foreach(lambda row: print(row['c1'], row['c2'])) for i, row in pandas_df.iterrows(): print(row["c1"], row["c2"]) # 排序 spark/pandas_df.sort() # 按列值排序 pandas_df.sort_index() # 按轴排序 pandas_df.sort_values(by=["A", "B"], axis=0, ascending=[True, False], inplace=True) # 指定列升序/降序排序 # 过滤 spark_df.filter(df['col_name'] > 1) # spark_df.where(df['col_name'] > 1) pandas_df[pandas_df['col_name'] > 1] pandas_df_new = pandas_df[pandas_df["code"].apply(lambda x: len(x) == 11)] # 去重 spark_df.select('col_name').distinct() spark_df_filter = spark_df.drop_duplicates(["col_name"]) pandas_df.drop_duplicates(["col_name"], keep='first', inplace=True) # 缺失数据处理 spark_df.na.fill() spark_df.na.drop(subset=['A', "B"]) # 同dropna pandas_df.fillna() pandas_df.dropna(subset=['A', "B"], how="any", inplace=True) # 空值过滤 filter=choose spark_df.filter(~(F.isnull(spark_df.d))) spark_df.filter(~(spark_df['A'].isNull() | spark_df['B'].isNull())) # 选出列值不为空的行 isnan()=isNull()<->isNOtnan() pandas_df[pandas_df['A'].isna()] # 选出列值为空的行 pandas_df[pandas_df['A'].notna()] # 选出列值不为空的行 # 统计 spark/pandas_df.count() # spark返回总行数,pandas返回列非空总数 spark/pandas_df.describe() # 描述列的count, mean, min, max... # 计算某一列均值 average_value = spark_df.select("col_name").agg({"col_name": "avg"}).collect()[0][0] average_value = pandas_df["col_name"].mean() # 表合并 # 按行合并,相当于追加 spark_df = spark_df.unionAll(spark_df1) pandas_df = pd.concat([df_up, df_down], axis=0) # 按列合并 spark_df = spark_df.join(df1, df1.id==spark_df.id, 'inner').drop(df1.id) # df1.id==spark_df.id也可写成['id](当且仅当列名相同) pd.merge(df_left, df_right, left_on="a", right_on="b", how="left|right|inner|outer") # 聚合函数 spark_df_collect = spark_df.groupBy('number').agg( F.collect_set('province').alias('set_province'), F.first('city').alias('set_city'), F.collect_list('district').alias('set_district'), F.max('report_user').alias('set_report_user'), F.min('first_type').alias('set_first_type')) # 分组聚合 spark_df.groupBy('A').agg(F.avg('B'), F.min('B')) spark/pandas_df.groupby('A').avg('B') # 根据函数分组聚合 def func(x): return pd.DataFrame({ "A": x["A"].tolist()[0], "B": sum(x["B"])}, index=[0]) pandas_df_result = pandas_df.groupby(["A"]).apply(func) # spark udf函数和pandas apply函数 def func1(a, b): return a + b spark_df.withColumn("col_name", F.udf(func1, IntegerType())(spark_df.a, spark_df.b)) # spark_df['a']或F.col("a"))) def func2(x,y): return 1 if x > np.mean(y) else 0 pandas_df['A'].apply(func2, args=(pandas_df['B'],)) pandas_df['C'] = pandas_df.apply(lambda x: 1 if x['A'] > (x['B']*0.5) else 0, axis=1) # spark创建临时表 spark_df.createOrReplaceTempView('tmp_table') # 用sql API res1 = spark.sql('select * from tmp_table') spark_df.registerTempTable('tmp_table') # 用dataframe API res2 = spark.table('tmp_table')
其他常用设置
class SparkUtils: def __init__(self): self.spark = None def get_spark(self): if self.spark is None: self.spark = SparkSession.builder.appName("username") \ .enableHiveSupport().config("spark.sql.shuffle.partitions", "500") \ .config("spark.sql.broadcastTimeout", "3600") \ .config("spark.driver.memory", "200g") \ .config("spark.executor.memory", "40g") \ .config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \ .config("spark.executorEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \ .config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name", "bdp-docker.jd.com:5000/wise_mart_bag:latest") \ .config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name", "bdp-docker.jd.com:5000/wise_mart_bag:latest") \ .getOrCreate() self.spark.sql('SET hive.exec.dynamic.partition=true') self.spark.sql('SET hive.exec.dynamic.partition.mode=nonstrict') return self.spark spark = SparkUtils() # 生成dataframe spark_data = spark.sql(""" select id, username from tab1 where status in (1, 2, 3) and dt = '{}' """.format(date)) # pandas常用显示设置 pd.set_option('display.max_rows', 100) pd.set_option('display.max_columns', None) pd.set_option('display.width',1000) pd.set_option('display.max_colwidth',1000)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。