赞
踩
1、df.na.fill({'字段名1':'default','字段名2':'default'}) 对空值进行替换
2、df.dropDuplicaates() 去重根据字段名进行去重,空参为全部字段
3、df.subtract(df1) 返回在当前df中出现,并且不在df1中出现的元素,不去重。
4、print time.localtime([timestamp]) 如下格式
time.struct_time(tm_year=2018, tm_mon=10, tm_mday=9, tm_hour=16, tm_min=52, tm_sec=10, tm_wday=1, tm_yday=282, tm_isdst=0)
5、print time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S') 如下格式
time.struct_time(tm_year=2018, tm_mon=12, tm_mday=14, tm_hour=15, tm_min=45, tm_sec=12, tm_wday=4, tm_yday=348, tm_isdst=-1)
6、时间戳转格式化时间
- time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
-
- # 格式化成2016-03-20 11:45:39形式
- print time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
-
- # 格式化成Sat Mar 28 22:24:24 2016形式
- print time.strftime("%a %b %d %H:%M:%S %Y", time.localtime())
-
7、时间转时间戳
print time.mktime(time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S'))
8、取某一天的前后多少天
- def getPreDate(cp, days):
- #转换为时间
- cp_from = (datetime.strptime(str(cp), '%Y%m%d%H') + timedelta(days)).strftime('%Y%m%d%H')
- return cp_from
9、os.path.join(path_prefix,'parquet','path') 的使用方法
- print os.path.join('/user/bbd/dev/xxxx/xxxx/', 'parquet/', 'ccccc.txt')
-
- /user/bbd/dev/xxxx/xxxx/parquet/ccccc.txt
10、时间取差值的方法,计算两个时间相差多少天
- def get_history_date():
- start_date = datetime(2018,10,10)
- end_date = datetime(2018,10,31)
- print (end_date - start_date).days #返回相差的天
- print end_date - start_date #返回相差的
- for i in range(0,(end_date - start_date).days):
- cp_from = (start_date + timedelta(days=i)).strftime('%Y%m%d00')
- cp_end = (end_date + timedelta(days=i + 1)).strftime('%Y%m%d00')
-
- print str(cp_from) + '\t' + str(cp_end)
-
-
- return: 如下
- 21
- 21 days, 0:00:00
- 2018101000 2018110100
- 2018101100 2018110200
- 2018101200 2018110300
- 。。。。
11、not 就是取反的意思 字符串为空,none,数字为0, 对象为空, not后返回的都是真
- def not_ceshi():
- a = ''
- b='as'
- c= None
- d=0
- e=-1
- if not a:
- print 'a为假'
- if b:
- print 'b为真'
- if not c:
- print 'c为假'
- if not d:
- print 'd为0'
- if e:
- print 'e为-1'
-
-
- 结果:
- a为假
- b为真
- c为假
- d为0
- e为-1
12、dataframe的drop,可以先分区排序,再对排序进行drop,num为窗口后分区排序的row_number()字段
- df.where(num=='1').drop(num)
-
-
- 返回选择num为1的,并将num字段去除
13、sorted 高阶函数的使用 如下
- def cmp_ignore_case(s1, s2):
-
- if s1.lower()[:1] > s2.lower()[:1]:
- return 1
- elif s1.lower()[:1] < s2.lower()[:1]:
- return -1
- else:
- return 0
- #sorted 重新定义排序,可以只传入一个参数,给定 (key=比较的参数的自定义函数)
- cmp_ignore_case2 = lambda s: s[:1].lower()
-
-
- print sorted(['bob', 'about', 'Zoo', 'Credit'], key=cmp_ignore_case2)
- print sorted(['bob', 'about', 'Zoo', 'Credit'], cmp_ignore_case)
-
-
- 结果返回相同:
- ['about', 'bob', 'Credit', 'Zoo']
- ['about', 'bob', 'Credit', 'Zoo']
14、在我们进行dataframe的互操作时,需要用到unionall的时候,如果字段数量不一致,可为其填充空白字段,都为空
- sql = 'SELECT '' 相同的字段名, '' 相同的字段名 。。。'
-
- 相当于补全字段
15、验证是否个人邮箱,
- email_regx = '([a-zA-Z0-9]{1}[a-zA-Z0-9\.\+\-.]{0,63}@{1}[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+){1,4})'
- def udf_format_email_string(data):
- try:
- data = data.strip()
- email = re.findall(email_regx, data)
- print email
- if email:
- res = email[0][0]
- except_email = ['np-reply@','noreply@','service@','support@','info@','Postmaster@'
- ,'custmer_service@','admin@','VOICE=','notification@','10086@','10000sx@','10086cq@']
-
- for item in except_email:
- if res.startswith(item):
- return ''
- return res
- except:
- pass
- return ''
-
- 是的话,返回邮箱,不是返回空
这里有一个知识点,就是re.findall(regx,data),就是这里的regx,有两种情况,带大括号和不带括号,还有小括号
- import re
-
- string="abcdefg acbdgef abcdgfe cadbgfe"
-
- #带括号与不带括号的区别
- #带大括号和小括号
- regex=re.compile("((\w+)\s+\w+)")
- print(regex.findall(string))
- #输出:[('abcdefg acbdgef', 'abcdefg'), ('abcdgfe cadbgfe', 'abcdgfe')]
-
- #只带小括号
- regex1=re.compile("(\w+)\s+\w+")
- print(regex1.findall(string))
- #输出:['abcdefg', 'abcdgfe']
-
- #不带括号
- regex2=re.compile("\w+\s+\w+")
- print(regex2.findall(string))
- #输出:['abcdefg acbdgef', 'abcdgfe cadbgfe']
-
- return :
-
- regex:返回的是tuple (大括号匹配的,小括号匹配的)
- regex1:返回的是小括号里匹配的
- regex2:返回的是全部匹配的
16、在pyspark中,对dataframe进行操作,对某个中文字段进行max(col)操作,分组后取出现的第一个,类似于根据姓名去重,一个邮箱只能对应一个姓名
- select email format_data(max(name)) from email_detal group by email
-
-
- 根据email进行分组,对姓名进行去重,但是有可能姓名有差异
17、一些dataframe的api示例
- def get_df():
-
- conf = SparkConf()
-
- spark = SparkSession.builder \
- .master("local[*]") \
- .appName("get_df") \
- .config(conf=conf) \
- .enableHiveSupport() \
- .getOrCreate()
-
- rdd1 = spark.sparkContext.parallelize([
- ('make',24,198),
- ('make',23,198),
- ('tubu',24,198),
- ('tubu',23,198),
- ('mark',24,198),
- ('mark',23,198),
- ('uzi',24,198),
- ('uzi',23,197)
- ])
- rdd2 = spark.sparkContext.parallelize([
- ('make',24,198),
- ('tubu',24,198)
- ])
-
- schema = StructType([
- StructField('name',StringType(), True),
- StructField('age',LongType(), True),
- StructField('hight',LongType(), True)
- ])
- df1 = spark.createDataFrame(rdd1,schema)
- df2 = spark.createDataFrame(rdd2,schema)
-
- df1.createTempView('tmp')
-
- sql = 'select *, row_number() over (partition by name,age order by hight) as num from tmp'
- sql2 = 'select `name`,age, case when age=24 then age else age+2 end re_age, hight,"table_name" tablename from tmp'
- spark.sql(sql).where('num=1').drop("num").show() #删除某列字段
- df1.subtract(df2).show() #df1中去掉df2中的完全相同的数据
- df1.dropDuplicates(['name','age']).show() #去掉指定两列的重复数据,不加参,为全部列
- df1.drop_duplicates(['name','age']).show() #同上
- df1.join(df2,df1.name == df2.name,'left').show() #各种join
- spark.sql(sql2).drop('age').show() #根据某些的字段来确定另一个字段的取值
18、注册 py文件 logging 如下
- import logging,os
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- logger = logging.getLogger(__name__)
-
-
- 用来测试是否有相关信息
19、将多张表我们需要的字段清洗进parquet文件,后面的维度需要哪些自己读取进行相关过滤就可以了,首先对我们需要对基础表的信息了解,然后根据字段含义,对应清洗进对应字段,比如总共五张表的数据,要写进总表,可按如下方式
- def creat_dataframe(source):
- '''获取每个源表的所有信息到源信息总表'''
- df = spark.sql("select * from {tablename}".format(tablename=source["table_name"]))
- if source.get("before_where"):
- df = df.where(source["before_where"])
- column_source = OrderedDict(zip(source["column"],source["source"]))
- columns = ["%s as %s"%(table_file,table_name) for table_name,table_file in column_source.items()]
- df = df.selectExpr(*columns)
- if source.get("after_where"):
- df = df.where(source["after_where"])
- return df
-
- def etl_school():
- columns = ["NAME", "SSQX", "XYMC", "TABLENAME", "table_order"]
- where = "verify_school(NAME) = 1"
-
- data_source = [
- {
- "table_name":"tb_gaw_dzxxyryxsjfmxx",
- "column":columns,
- "source":["DWMC", "SSQX", "''", "'tb_gaw_dzxxyryxsjfmxx'", 5],
- "after_where":where
- },
- {
- "table_name":"tb_gaw_jw_xxjzygxx",
- "column":columns,
- "source":["DWMC", "SSQX", "''", "'tb_gaw_jw_xxjzygxx'", 1],
- "after_where":where
- },
- {
- "table_name":"tb_gaw_jw_zxyjs_new",
- "column":columns,
- "source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxyjs_new'", 2],
- "after_where":where
- },
- {
- "table_name":"tb_gaw_jw_zxdxs_new",
- "column":columns,
- "source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxdxs_new'", 3],
- "after_where":where
- }
- ]
-
- dfs = map(creat_dataframe, data_source)
- df_union = reduce(lambda x, y: x.union(y), dfs)
- df = df_union.selectExpr("*", "row_number() over (partition by NAME,XYMC order by table_order asc) as num")
- df_tmp = df.where("num=1").drop("num")
- df.tmp = df_tmp.where("NAME != XYMC")
-
- write_parquet(df_tmp, "tmp/etl_school")
- logger.info("save to /tmp/etl_school success!!")
20、jg的节点表,需要建索引的话,可按如下方式
- index_list = ['jid']
- def deal_vertex(tablename):
- logger.info('dealing %s'%tablename)
-
- source_table_info = vertex_table_info[tablename]
-
- zd = ','.join(source_table_info)
-
- table_index = get_table_index(tablename)
- sql = 'select %s from bbd.%s'%(zd,tablename)
- df = spark.sql(sql)
-
- def map_rdd(data):
- ret = []
- row,index = data
- #因为最后要入hbase库,这里前面加了随机数,预防热点
- jid = int(str(random.randint(1,9)) + str(index + table_index))
- ret.append(jid)
-
- row = list(row)
- for item in row:
- ret.append(item)
- return tuple(ret)
-
- rdd = df.rdd.zipWithIndex().map(map_rdd)
-
- new_schema = index_list + source_table_info
-
- res = spark.createDataFrame(rdd,new_schema)
- res.write.mode('overwrite').saveAsTable('bbd.%s_jg'%tablename)
- logger.info('bbd.%s_jg down'%tablename)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。