赞
踩
txt/csv/hive 表中
- #enableHiveSupprot() 支持hive操作
- #getOrCreate() 如果没有就创建,有就不用了
- spark = SparkSession.builder.appName("appName").enableHiveSupport().getOrCreate()
- spark.sparkContext.pythonExec = spark.conf.get('spark.yarn.appMasterEnv.PYSPARK_PYTHON')
- path="yourPath"
- #读取文件为txt
- rdd = spark.sparkContext.textFile(path)
- #读取文件为csv
- df = spark.read.csv(r"yourCsvFile.csv", encoding='utf-8', header=True, inferSchema=True)
- #读取文件为csv,也可以通过如下方式读取
- rdd=spark.sparkContext.textFile(your_path).map(lambda x:x.split(","))
-
- #读取文件为序列化文件
- dt_ = spark.read.parquet(r'testfile.parquet')
- #读取利用sql抽取的数据
- sql="""
- select id
- from tableName
- where dt between "20200101" and "20200202"
- limit 100
- """
- df=spark.sql(sql)
- print(df.count())
- print(df.show(2))
- #df为dataframe格式
- #rdd为rdd格式
- #转化注意括号,书写
- rdd=df.rdd
- df=rdd.toDF()
-
- from pyspark.sql.types import *
- def schema_parse():
- schema_out = StructType([\
- StructField("id", StringType(), False),\
- StructField("value", StringType(), True)
- ])
- return schema_out
-
-
- df=spark.createDataFrame(rdd,schema_parse())
- #或者简单的方式
- df=spark.createDataFrame(rdd,["id","value"])
- #Some of types cannot be determined by the first 100 rows, please try again with samplie 如果出现这个问题,则在添加如下参数
- df=spark.createDataFrame(rdd,["id","value"],samplingRatio=0.2)
主要是用filter
- #https://blog.csdn.net/xingxing1839381/article/details/81273424
- #select * from data where userid = 2 and rating > 3
- df.filter('userid == 2 and rating > 3').show()
- #select userid, rating from data where userid = 2 and rating > 3
- df.filter('userid == 2 and rating > 3').select('userid', 'rating').show()
- df.select("userID", "rating").filter("userID = 2 and rating > 3").show()
-
- #重命名
- df.selectExpr("userID as user_id"."rating")
- path="/user/yourCsvFileName.csv"
- #coalesce(1) 表示分一个区
- df.coalesce(1).write.option("header","true").csv(path)
- #覆盖保存--mode("overwrite")
- spark.sql("SELECT id FROM USER LIMIT 10").coalesce(1).write.mode("overwrite").option("header", "true").option("escape", "\"").csv("s3://tmp/business/10554210609/")
- #rdd保存成文本文件
- df.rdd.saveAsTextFile("/user/yourTextFileName")
-
前提是两个dataframe类型
- #select * from df1 left join df2 on df1.order_id=df2.order_id
- df12=df1.join(df2,'order_id','left')
- #如果需要借助多个key进行join
- df12=df1.join(df2, ['order_id', 'dt'])
- data = [
- (1,"11","111"),
- (2,"22","222"),
- (3,"33","333"),
- (4,"44","444")
- ]
- # 创建dataframe,添加表头
- # 方法1:直接创建表头
- header_list = ['id', "value1", 'value2']
- write_df = spark.createDataFrame(data, header_list)
-
- # 方法2:借助StructField创建表头
- https://blog.csdn.net/MusicDancing/article/details/107958990
-
-
- # 将dataframe 注册成一个临时表,名字是test_hive
- write_df.registerTempTable('test_hive')
-
- #创建它的SparkSession对象终止前有效
- df.createOrReplaceTempView("tempViewName")
-
- #spark应用程序终止前有效
- df.createOrReplaceGlobalTempView("tempViewName")
-
- # 将数据从临时表导入目标hive表(test.table_02)
- spark.sql("drop table if exists test.table_02")
- spark.sql("create table test.table_02 as select * from test_hive")
- from pyspark.sql.types import *
- def pow1(m,n):
- return float(m)**float(n)
- udf = spark.udf
- udf.register('pow1',pow1,returnType=DoubleType())
- df = spark.range(0,10,2,3)
- df.createOrReplaceTempView('A')
- print spark.sql('select pow1(id,2) from A').show()
- print spark.sql('select pow1(id,3) from A').show()
每个日期随机取0.00001的数据
参考链接:使用pyspark进行分层抽样 - IT屋-程序员软件开发技术分享社区
- from pyspark.sql.functions import lit
- fractions = df.select("dt").distinct().withColumn("fraction", lit(0.00001)).rdd.collectAsMap()
- print fractions
- sampled_df = df.stat.sampleBy("dt", fractions, 12)
- sampled_df.groupBy("dt").count().show()
dataframe选择其中几个属性,以json格式的文本保存
- import json
- rdd=df.select('userid','username').rdd.map(lambda x:json.dumps(x.asDict()))
解析成json
- import json
- def getInfo(x):
- x=json.loads(x)
- x=x['value'][0][0][0]
- return x
dict装载成json
json.dumps(result,ensure_ascii=False)
spark.sparkContext.addPyFile("hdfs://DClusterNmg3/user/your_space/your_file.py")
df.groupBy('tag1','tag2').sum('cnt').show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。