当前位置:   article > 正文

工作中 pyspark的小知识点_pyspark na.fill

pyspark na.fill

1、df.na.fill({'字段名1':'default','字段名2':'default'})   对空值进行替换

2、df.dropDuplicaates()    去重根据字段名进行去重,空参为全部字段

3、df.subtract(df1)     返回在当前df中出现,并且不在df1中出现的元素,不去重。

4、print time.localtime([timestamp])    如下格式

time.struct_time(tm_year=2018, tm_mon=10, tm_mday=9, tm_hour=16, tm_min=52, tm_sec=10, tm_wday=1, tm_yday=282, tm_isdst=0)

5、print time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S')   如下格式

time.struct_time(tm_year=2018, tm_mon=12, tm_mday=14, tm_hour=15, tm_min=45, tm_sec=12, tm_wday=4, tm_yday=348, tm_isdst=-1)

6、时间戳转格式化时间

  1. time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
  2. # 格式化成2016-03-20 11:45:39形式
  3. print time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  4. # 格式化成Sat Mar 28 22:24:24 2016形式
  5. print time.strftime("%a %b %d %H:%M:%S %Y", time.localtime())

7、时间转时间戳

print time.mktime(time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S'))

8、取某一天的前后多少天

  1. def getPreDate(cp, days):
  2. #转换为时间
  3. cp_from = (datetime.strptime(str(cp), '%Y%m%d%H') + timedelta(days)).strftime('%Y%m%d%H')
  4. return cp_from

9、os.path.join(path_prefix,'parquet','path')     的使用方法

  1. print os.path.join('/user/bbd/dev/xxxx/xxxx/', 'parquet/', 'ccccc.txt')
  2. /user/bbd/dev/xxxx/xxxx/parquet/ccccc.txt

10、时间取差值的方法,计算两个时间相差多少天

  1. def get_history_date():
  2. start_date = datetime(2018,10,10)
  3. end_date = datetime(2018,10,31)
  4. print (end_date - start_date).days #返回相差的天
  5. print end_date - start_date #返回相差的
  6. for i in range(0,(end_date - start_date).days):
  7. cp_from = (start_date + timedelta(days=i)).strftime('%Y%m%d00')
  8. cp_end = (end_date + timedelta(days=i + 1)).strftime('%Y%m%d00')
  9. print str(cp_from) + '\t' + str(cp_end)
  10. return: 如下
  11. 21
  12. 21 days, 0:00:00
  13. 2018101000 2018110100
  14. 2018101100 2018110200
  15. 2018101200 2018110300
  16. 。。。。

11、not  就是取反的意思   字符串为空,none,数字为0, 对象为空, not后返回的都是真

  1. def not_ceshi():
  2. a = ''
  3. b='as'
  4. c= None
  5. d=0
  6. e=-1
  7. if not a:
  8. print 'a为假'
  9. if b:
  10. print 'b为真'
  11. if not c:
  12. print 'c为假'
  13. if not d:
  14. print 'd为0'
  15. if e:
  16. print 'e为-1'
  17. 结果:
  18. a为假
  19. b为真
  20. c为假
  21. d为0
  22. e为-1

12、dataframe的drop,可以先分区排序,再对排序进行drop,num为窗口后分区排序的row_number()字段

  1. df.where(num=='1').drop(num)
  2. 返回选择num为1的,并将num字段去除

13、sorted 高阶函数的使用    如下

  1. def cmp_ignore_case(s1, s2):
  2. if s1.lower()[:1] > s2.lower()[:1]:
  3. return 1
  4. elif s1.lower()[:1] < s2.lower()[:1]:
  5. return -1
  6. else:
  7. return 0
  8. #sorted 重新定义排序,可以只传入一个参数,给定 (key=比较的参数的自定义函数)
  9. cmp_ignore_case2 = lambda s: s[:1].lower()
  10. print sorted(['bob', 'about', 'Zoo', 'Credit'], key=cmp_ignore_case2)
  11. print sorted(['bob', 'about', 'Zoo', 'Credit'], cmp_ignore_case)
  12. 结果返回相同:
  13. ['about', 'bob', 'Credit', 'Zoo']
  14. ['about', 'bob', 'Credit', 'Zoo']

14、在我们进行dataframe的互操作时,需要用到unionall的时候,如果字段数量不一致,可为其填充空白字段,都为空

  1. sql = 'SELECT '' 相同的字段名, '' 相同的字段名 。。。'
  2. 相当于补全字段

15、验证是否个人邮箱,

  1. email_regx = '([a-zA-Z0-9]{1}[a-zA-Z0-9\.\+\-.]{0,63}@{1}[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+){1,4})'
  2. def udf_format_email_string(data):
  3. try:
  4. data = data.strip()
  5. email = re.findall(email_regx, data)
  6. print email
  7. if email:
  8. res = email[0][0]
  9. except_email = ['np-reply@','noreply@','service@','support@','info@','Postmaster@'
  10. ,'custmer_service@','admin@','VOICE=','notification@','10086@','10000sx@','10086cq@']
  11. for item in except_email:
  12. if res.startswith(item):
  13. return ''
  14. return res
  15. except:
  16. pass
  17. return ''
  18. 是的话,返回邮箱,不是返回空

这里有一个知识点,就是re.findall(regx,data),就是这里的regx,有两种情况,带大括号和不带括号,还有小括号

  1. import re
  2. string="abcdefg acbdgef abcdgfe cadbgfe"
  3. #带括号与不带括号的区别
  4. #带大括号和小括号
  5. regex=re.compile("((\w+)\s+\w+)")
  6. print(regex.findall(string))
  7. #输出:[('abcdefg acbdgef', 'abcdefg'), ('abcdgfe cadbgfe', 'abcdgfe')]
  8. #只带小括号
  9. regex1=re.compile("(\w+)\s+\w+")
  10. print(regex1.findall(string))
  11. #输出:['abcdefg', 'abcdgfe']
  12. #不带括号
  13. regex2=re.compile("\w+\s+\w+")
  14. print(regex2.findall(string))
  15. #输出:['abcdefg acbdgef', 'abcdgfe cadbgfe']
  16. return
  17. regex:返回的是tuple (大括号匹配的,小括号匹配的)
  18. regex1:返回的是小括号里匹配的
  19. regex2:返回的是全部匹配的

16、在pyspark中,对dataframe进行操作,对某个中文字段进行max(col)操作,分组后取出现的第一个,类似于根据姓名去重,一个邮箱只能对应一个姓名

  1. select email format_data(max(name)) from email_detal group by email
  2. 根据email进行分组,对姓名进行去重,但是有可能姓名有差异

17、一些dataframe的api示例

  1. def get_df():
  2. conf = SparkConf()
  3. spark = SparkSession.builder \
  4. .master("local[*]") \
  5. .appName("get_df") \
  6. .config(conf=conf) \
  7. .enableHiveSupport() \
  8. .getOrCreate()
  9. rdd1 = spark.sparkContext.parallelize([
  10. ('make',24,198),
  11. ('make',23,198),
  12. ('tubu',24,198),
  13. ('tubu',23,198),
  14. ('mark',24,198),
  15. ('mark',23,198),
  16. ('uzi',24,198),
  17. ('uzi',23,197)
  18. ])
  19. rdd2 = spark.sparkContext.parallelize([
  20. ('make',24,198),
  21. ('tubu',24,198)
  22. ])
  23. schema = StructType([
  24. StructField('name',StringType(), True),
  25. StructField('age',LongType(), True),
  26. StructField('hight',LongType(), True)
  27. ])
  28. df1 = spark.createDataFrame(rdd1,schema)
  29. df2 = spark.createDataFrame(rdd2,schema)
  30. df1.createTempView('tmp')
  31. sql = 'select *, row_number() over (partition by name,age order by hight) as num from tmp'
  32. sql2 = 'select `name`,age, case when age=24 then age else age+2 end re_age, hight,"table_name" tablename from tmp'
  33. spark.sql(sql).where('num=1').drop("num").show() #删除某列字段
  34. df1.subtract(df2).show() #df1中去掉df2中的完全相同的数据
  35. df1.dropDuplicates(['name','age']).show() #去掉指定两列的重复数据,不加参,为全部列
  36. df1.drop_duplicates(['name','age']).show() #同上
  37. df1.join(df2,df1.name == df2.name,'left').show() #各种join
  38. spark.sql(sql2).drop('age').show() #根据某些的字段来确定另一个字段的取值

18、注册 py文件  logging   如下

  1. import logging,os
  2. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  3. logger = logging.getLogger(__name__)
  4. 用来测试是否有相关信息

19、将多张表我们需要的字段清洗进parquet文件,后面的维度需要哪些自己读取进行相关过滤就可以了,首先对我们需要对基础表的信息了解,然后根据字段含义,对应清洗进对应字段,比如总共五张表的数据,要写进总表,可按如下方式

  1. def creat_dataframe(source):
  2. '''获取每个源表的所有信息到源信息总表'''
  3. df = spark.sql("select * from {tablename}".format(tablename=source["table_name"]))
  4. if source.get("before_where"):
  5. df = df.where(source["before_where"])
  6. column_source = OrderedDict(zip(source["column"],source["source"]))
  7. columns = ["%s as %s"%(table_file,table_name) for table_name,table_file in column_source.items()]
  8. df = df.selectExpr(*columns)
  9. if source.get("after_where"):
  10. df = df.where(source["after_where"])
  11. return df
  12. def etl_school():
  13. columns = ["NAME", "SSQX", "XYMC", "TABLENAME", "table_order"]
  14. where = "verify_school(NAME) = 1"
  15. data_source = [
  16. {
  17. "table_name":"tb_gaw_dzxxyryxsjfmxx",
  18. "column":columns,
  19. "source":["DWMC", "SSQX", "''", "'tb_gaw_dzxxyryxsjfmxx'", 5],
  20. "after_where":where
  21. },
  22. {
  23. "table_name":"tb_gaw_jw_xxjzygxx",
  24. "column":columns,
  25. "source":["DWMC", "SSQX", "''", "'tb_gaw_jw_xxjzygxx'", 1],
  26. "after_where":where
  27. },
  28. {
  29. "table_name":"tb_gaw_jw_zxyjs_new",
  30. "column":columns,
  31. "source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxyjs_new'", 2],
  32. "after_where":where
  33. },
  34. {
  35. "table_name":"tb_gaw_jw_zxdxs_new",
  36. "column":columns,
  37. "source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxdxs_new'", 3],
  38. "after_where":where
  39. }
  40. ]
  41. dfs = map(creat_dataframe, data_source)
  42. df_union = reduce(lambda x, y: x.union(y), dfs)
  43. df = df_union.selectExpr("*", "row_number() over (partition by NAME,XYMC order by table_order asc) as num")
  44. df_tmp = df.where("num=1").drop("num")
  45. df.tmp = df_tmp.where("NAME != XYMC")
  46. write_parquet(df_tmp, "tmp/etl_school")
  47. logger.info("save to /tmp/etl_school success!!")

20、jg的节点表,需要建索引的话,可按如下方式

  1. index_list = ['jid']
  2. def deal_vertex(tablename):
  3. logger.info('dealing %s'%tablename)
  4. source_table_info = vertex_table_info[tablename]
  5. zd = ','.join(source_table_info)
  6. table_index = get_table_index(tablename)
  7. sql = 'select %s from bbd.%s'%(zd,tablename)
  8. df = spark.sql(sql)
  9. def map_rdd(data):
  10. ret = []
  11. row,index = data
  12. #因为最后要入hbase库,这里前面加了随机数,预防热点
  13. jid = int(str(random.randint(1,9)) + str(index + table_index))
  14. ret.append(jid)
  15. row = list(row)
  16. for item in row:
  17. ret.append(item)
  18. return tuple(ret)
  19. rdd = df.rdd.zipWithIndex().map(map_rdd)
  20. new_schema = index_list + source_table_info
  21. res = spark.createDataFrame(rdd,new_schema)
  22. res.write.mode('overwrite').saveAsTable('bbd.%s_jg'%tablename)
  23. logger.info('bbd.%s_jg down'%tablename)

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/66473
推荐阅读
  

闽ICP备14008679号