赞
踩
广播变量 broadcast
广播变量允许程序缓存一个只读变量在集群的每台机器上,而不是每个任务保存一个拷贝。借助广播变量,可以用一种更高效的方法来共享一些数据,比如一个全局配置文件。
from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[*]").appName("RDD Demo").getOrCreate(); sc = spark.sparkContext conf = {"ip":"192.168.1.1","key":"cumt"} #广播变量 brVar = sc.broadcast(conf) a = brVar.value #获取广播变量值 print(a) #{'ip': '192.168.1.1', 'key': 'cumt'} print(a["key"]) #cumt brVar.unpersist() #更新广播变量 conf["key"] = "jackwang" brVar = sc.broadcast(conf) #再次广播 a = brVar.value #获取广播新变量值 print(a) #{'ip': '192.168.1.1', 'key': 'jackwang'} #destroy()可将广播变量的数据和元数据一同销毁,销毁后不能使用 brVar.destroy()
累加器 accumulator
只能利用关联操作做加操作的变数,能够快速执行操作,在调试时对作业的执行过程中的相关事件进行计数,不同节点上的计算任务都可以利用add方法 给累加器加值 为了保证准确性,只能使用一次动作操作,如果需要多细,则在RDD对象上执行cache或persist操作来切断依赖
rdd = sc.range(1,101) #创建累加器,初始值0 acc = sc.accumulator(0) def fcounter(x): global acc if x % 2 == 0 : acc += 1 #unsupported operand type(s) for -= #acc -= 1 rdd_counter = rdd.map(fcounter) print(acc.value) #0 fcounter函数的逻辑还未执行 #保证多次正确获取累加器值 rdd_counter.persist() print(rdd_counter.count()) #100 print(acc.value) #50 print(rdd_counter.count()) #100 print(acc.value) #50
from pyspark.sql import SparkSession spark = SparkSession.builder.master('local[1]').appName('DataFrames').getOrCreate() #用list创建DataFrames a = [('Jack', 32),('Smith', 33)] df = spark.createDataFrame(a) print(df.collect()) #[Row(_1='Jack', _2=32), Row(_1='Smith', _2=33)] df.show() +-----+---+ | _1| _2| +-----+---+ | Jack| 32| |Smith| 33| +-----+---+ # 添加结构信息 df2 = spark.createDataFrame(a, ['name', 'age']) print(df2.collect()) #[Row(name='Jack', age=32), Row(name='Smith', age=33)] df2.show() +-----+---+ | name|age| +-----+---+ | Jack| 32| |Smith| 33| +-----+---+ #具有类型的DataFrame对象 from pyspark.sql.types import * a = [('Jack', 32),('Smith', 33)] rdd = sc.parallelize(a) # StructField(name,类型,是否可以为空) schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True)]) df = spark.createDataFrame(rdd, schema) # 简化版本 df2 = spark.createDataFrame(rdd,"name:string,age:int") df.printSchema() # root # |-- name: string (nullable = true) # |-- age: integer (nullable = true)
spark.sql 查询
spark.udf.register() 注册自定义的函数
df.select().where() 过滤
df.agg({字段名:方法}).show()
df.describe() 查看相关变量
spark.read.format(‘json’) .load() 读取文件
df.join(deptDf, peopleDf.deptId == deptDf.id,‘inner’)
df.na.fill({‘name’: ‘unknown’,‘salary’: 0, }).show() 空值填充
df.withColumn(“Timestamp”, df.Id.cast(“timestamp”)) 修改字段类型
# 查询 a = [('Jack', 32),('Smith', 33),('李四', 36)] rdd = sc.parallelize(a) df = spark.createDataFrame(rdd, "name: string, age: int") df.createOrReplaceTempView("user") #创建临时表 df2 = spark.sql("select count(*) as counter from user") df2.show() +-------+ |counter| +-------+ | 3| +-------+ df2 = spark.sql("select *,age+1 as next from user where age < 36") df2.show() +-----+---+----+ | name|age|next| +-----+---+----+ | Jack| 32| 33| |Smith| 33| 34| +-----+---+----+ # 自定义函数 strlen = spark.udf.register("strLen", lambda x: len(x)) #注册了一个自定义的函数 a = [('Jack', 32),('Smith', 33),('李四', 36)] rdd = sc.parallelize(a) df = spark.createDataFrame(rdd, "name: string, age: int") df.createOrReplaceTempView("user") df2 = spark.sql("select *,strLen(name) as len from user") df2.show() +-----+---+---+ | name|age|len| +-----+---+---+ | Jack| 32| 4| |Smith| 33| 5| | 李四| 36| 2| +-----+---+---+ # 选择查看特定列 df.select("name").show() # 查找并过滤 df.select("name").where(strlen("name")>2).show() df.filter(df.age > 32).show() +-----+ | name| +-----+ | Jack| |Smith| +-----+ # 聚合找最大值 agg({字段名:集合函数}) df.agg({"age": "max"}).show() +--------+ |max(age)| +--------+ | 36| +--------+ # 查看字段个数、均值、方差、最大、最小值 df.describe(['age']).show() +-------+------------------+ |summary| age| +-------+------------------+ | count| 3| | mean|33.666666666666664| | stddev| 2.081665999466133| | min| 32| | max| 36| +-------+------------------+ # 读写取parquet文件 df.write.parquet("myuser.parquet") spark.read.parquet("myuser.parquet").show() df.write.csv('user.csv','append') spark.read.csv # 读取json文件 df = spark.read.format('json') .load('hdfs://localhost:9000/user.json') df.show() +---+------+------+-----+------+ |age|deptId|gender| name|salary| +---+------+------+-----+------+ | 32| 01| 男| 张三| 5000| | 33| 01| 男| 李四| 6000| | 38| 01| 女| 王五| 5500| | 42| 02| 男| Jack| 7000| | 27| 02| 女|Smith| 6500| | 45| 02| 女| Lily| 9500| +---+------+------+-----+------+ #打印字段的类型 print(df.dtypes) [('age', 'bigint'), ('deptId', 'string'), ('gender', 'string'), ('name', 'string'), ('salary', 'bigint')] # 透视 pivot df2 = df.groupBy("deptId").pivot("gender") .sum("salary") df2.show() +------+-----+-----+ |deptId| 女| 男| +------+-----+-----+ | 01| 5500|11000| | 02|16000| 7000| +------+-----+-----+ #条件选择 df.select("name",df.salary.between(6000,9500)).show() df.select("name","age").where(df.name.like("Smi%")).show() # 关联查询 求各个部门男女的平均工资 和最大年龄 # 用户表 a = [ ('01','张三', '男',32,5000), ('01','李四', '男',33,6000), ('01','王五', '女',38,5500), ('02','Jack', '男',42,7000), ('02','Smith', '女',27,6500), ('02','Lily', '女',45,9500) ] rdd = sc.parallelize(a) peopleDf = spark.createDataFrame(rdd, \ "deptId:string,name:string,gender:string,age:int,salary:int") # 部门表 b = [ ('01','销售部'), ('02','研发部') ] rdd2 = sc.parallelize(b) deptDf = spark.createDataFrame(rdd2, "id:string,name:string") #join函数第三个参数默认为inner,其他选项为: # inner, cross, outer, full, full_outer, left, left_outer, # right, right_outer, left_semi, and left_anti. peopleDf.join(deptDf, peopleDf.deptId == deptDf.id,'inner') \ .groupBy(deptDf.name, peopleDf.gender) \ .agg({"salary": "avg", "age": "max"}) \ .sort(deptDf.name, peopleDf.gender) \ .show() +----+------+-----------+--------+ |name|gender|avg(salary)|max(age)| +----+------+-----------+--------+ | 研发部| 男| 7000.0| 42| | 销售部| 男| 5500.0| 33| | 销售部| 女| 5500.0| 38| | 研发部| 女| 8000.0| 45| +----+------+-----------+--------+ # 获取所有的列名 peopleDf.columns # 去重 peopleDf.distinct().show() # 删除数据列 peopleDf.drop("gender").show() # 从一个DataFrame上移除另一个DataFrame f1 = spark.createDataFrame( [("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b", 3), ("c", 4)], ["C1", "C2"]) df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"]) df1.exceptAll(df2).show() +---+---+ | C1| C2| +---+---+ | a| 1| | a| 1| | a| 2| | c| 4| +---+---+ # 求交集 df1.intersectAll(df2).show() # 空值替换 a = [ ('01','张三', '男',32,5000), ('01', None, '男',33,6000), ('01','王五', '女',36,None), ('02','Jack', '男',42,7000), ('02','Smith', '女',27,6500), ('02','Lily', '女',45,None), ] rdd = sc.parallelize(a) peopleDf = spark.createDataFrame(rdd,\ "deptId:string,name:string,gender:string,age:int,salary:int") # 将空值进行替换 peopleDf.na.fill({'name': 'unknown','salary': 0, }).show() +------+-------+------+---+------+ |deptId| name|gender|age|salary| +------+-------+------+---+------+ | 01| 张三| 男| 32| 5000| | 01|unknown| 男| 33| 6000| | 01| 王五| 女| 36| 0| | 02| Jack| 男| 42| 7000| | 02| Smith| 女| 27| 6500| | 02| Lily| 女| 45| 0| +------+-------+------+---+------+ # 转成JSON格式 peopleDf.toJSON().collect() ['{"deptId":"01","name":"张三","gender":"男","age":32,"salary":5000}', '{"deptId":"01","gender":"男","age":33,"salary":6000}', '{"deptId":"01","name":"王五","gender":"女","age":36}', '{"deptId":"02","name":"Jack","gender":"男","age":42,"salary":7000}', '{"deptId":"02","name":"Smith","gender":"女","age":27,"salary":6500}', '{"deptId":"02","name":"Lily","gender":"女","age":45}'] # 增加计算列 和 改名 peopleDf.withColumn("age2",peopleDf.age+1) \ .withColumnRenamed("name","姓名") \ .show() # 日期处理 df = spark.createDataFrame(sc.parallelize([("2016-08-26",)]),"Id:string") df.show() +----------+ | Id| +----------+ |2016-08-26| +----------+ df2 = df.withColumn("Timestamp", df.Id.cast("timestamp")) df3 = df2.withColumn("Date", df.Id.cast("date")) df3.show() +----------+--------------------+----------+ | Id| Timestamp| Date| +----------+--------------------+----------+ |2016-08-26|2016-08-26 00:00:...|2016-08-26| +----------+--------------------+----------+ df = spark.createDataFrame([('2020-05-10',),('2020-05-09',)], ['date']) from pyspark.sql.functions import add_months df.select(add_months(df.date, 1).alias('next_month')).show() +----------+ |next_month| +----------+ |2020-06-10| |2020-06-09| +----------+
这是依据蒙特卡洛模拟算法来计算圆周率,在一个变长为2的正方形内画圆,正方形的面积是4,圆的半径是1,面积是Pi*R^2=Pi 需要计算圆的面积,这里取圆心为坐标轴原点,在其中随机取点,共选n个点
from pyspark.sql import SparkSession from random import random from operator import add spark = SparkSession.builder \ .master("local[*]") \ .appName("Pi Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# if __name__ == "__main__": n = 100000 * 20 def f(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = sc.parallelize(range(1, n + 1), 20) .map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop()
spark-submit重要参数
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。