当前位置:   article > 正文

Pyspark基础操作( rdd dataframe 创建 读取 利用)_pyspark 建表

pyspark 建表

1.读取数据

txt/csv/hive 表中

  1. #enableHiveSupprot() 支持hive操作
  2. #getOrCreate() 如果没有就创建,有就不用了
  3. spark = SparkSession.builder.appName("appName").enableHiveSupport().getOrCreate()
  4. spark.sparkContext.pythonExec = spark.conf.get('spark.yarn.appMasterEnv.PYSPARK_PYTHON')
  5. path="yourPath"
  6. #读取文件为txt
  7. rdd = spark.sparkContext.textFile(path)
  8. #读取文件为csv
  9. df = spark.read.csv(r"yourCsvFile.csv", encoding='utf-8', header=True, inferSchema=True)
  10. #读取文件为csv,也可以通过如下方式读取
  11. rdd=spark.sparkContext.textFile(your_path).map(lambda x:x.split(","))
  12. #读取文件为序列化文件
  13. dt_ = spark.read.parquet(r'testfile.parquet')
  14. #读取利用sql抽取的数据
  15. sql="""
  16. select id
  17. from tableName
  18. where dt between "20200101" and "20200202"
  19. limit 100
  20. """
  21. df=spark.sql(sql)
  22. print(df.count())
  23. print(df.show(2))

2.dataframe和rdd的相互转化

  1. #df为dataframe格式
  2. #rdd为rdd格式
  3. #转化注意括号,书写
  4. rdd=df.rdd
  5. df=rdd.toDF()
  6. from pyspark.sql.types import *
  7. def schema_parse():
  8. schema_out = StructType([\
  9. StructField("id", StringType(), False),\
  10. StructField("value", StringType(), True)
  11. ])
  12. return schema_out
  13. df=spark.createDataFrame(rdd,schema_parse())
  14. #或者简单的方式
  15. df=spark.createDataFrame(rdd,["id","value"])
  16. #Some of types cannot be determined by the first 100 rows, please try again with samplie 如果出现这个问题,则在添加如下参数
  17. df=spark.createDataFrame(rdd,["id","value"],samplingRatio=0.2)

3.选择数据

主要是用filter

  1. #https://blog.csdn.net/xingxing1839381/article/details/81273424
  2. #select * from data where userid = 2 and rating > 3
  3. df.filter('userid == 2 and rating > 3').show()
  4. #select userid, rating from data where userid = 2 and rating > 3
  5. df.filter('userid == 2 and rating > 3').select('userid', 'rating').show()
  6. df.select("userID", "rating").filter("userID = 2 and rating > 3").show()
  7. #重命名
  8. df.selectExpr("userID as user_id"."rating")

4.保存数据

  1. path="/user/yourCsvFileName.csv"
  2. #coalesce(1) 表示分一个区
  3. df.coalesce(1).write.option("header","true").csv(path)
  4. #覆盖保存--mode("overwrite")
  5. spark.sql("SELECT id FROM USER LIMIT 10").coalesce(1).write.mode("overwrite").option("header", "true").option("escape", "\"").csv("s3://tmp/business/10554210609/")
  6. #rdd保存成文本文件
  7. df.rdd.saveAsTextFile("/user/yourTextFileName")

5.表连接

前提是两个dataframe类型

  1. #select * from df1 left join df2 on df1.order_id=df2.order_id
  2. df12=df1.join(df2,'order_id','left')
  3. #如果需要借助多个key进行join
  4. df12=df1.join(df2, ['order_id', 'dt'])

6.创建临时表

  1. data = [
  2. (1,"11","111"),
  3. (2,"22","222"),
  4. (3,"33","333"),
  5. (4,"44","444")
  6. ]
  7. # 创建dataframe,添加表头
  8. # 方法1:直接创建表头
  9. header_list = ['id', "value1", 'value2']
  10. write_df = spark.createDataFrame(data, header_list)
  11. # 方法2:借助StructField创建表头
  12. https://blog.csdn.net/MusicDancing/article/details/107958990
  13. # 将dataframe 注册成一个临时表,名字是test_hive
  14. write_df.registerTempTable('test_hive')
  15. #创建它的SparkSession对象终止前有效
  16. df.createOrReplaceTempView("tempViewName")
  17. #spark应用程序终止前有效
  18. df.createOrReplaceGlobalTempView("tempViewName")
  19. # 将数据从临时表导入目标hive表(test.table_02)
  20. spark.sql("drop table if exists test.table_02")
  21. spark.sql("create table test.table_02 as select * from test_hive")

7.UDF

  1. from pyspark.sql.types import *
  2. def pow1(m,n):
  3. return float(m)**float(n)
  4. udf = spark.udf
  5. udf.register('pow1',pow1,returnType=DoubleType())
  6. df = spark.range(0,10,2,3)
  7. df.createOrReplaceTempView('A')
  8. print spark.sql('select pow1(id,2) from A').show()
  9. print spark.sql('select pow1(id,3) from A').show()

8.分层随机取样

每个日期随机取0.00001的数据

参考链接:使用pyspark进行分层抽样 - IT屋-程序员软件开发技术分享社区

  1. from pyspark.sql.functions import lit
  2. fractions = df.select("dt").distinct().withColumn("fraction", lit(0.00001)).rdd.collectAsMap()
  3. print fractions
  4. sampled_df = df.stat.sampleBy("dt", fractions, 12)
  5. sampled_df.groupBy("dt").count().show()

9.json dict

dataframe选择其中几个属性,以json格式的文本保存

  1. import json
  2. rdd=df.select('userid','username').rdd.map(lambda x:json.dumps(x.asDict()))

解析成json

  1. import json
  2. def getInfo(x):
  3. x=json.loads(x)
  4. x=x['value'][0][0][0]
  5. return x

 dict装载成json

json.dumps(result,ensure_ascii=False)

10.添加Python文件

spark.sparkContext.addPyFile("hdfs://DClusterNmg3/user/your_space/your_file.py")

11.groupby统计

df.groupBy('tag1','tag2').sum('cnt').show()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/489642
推荐阅读
相关标签
  

闽ICP备14008679号