赞
踩
数据治理和分析的难度和阵痛总是伴随着数仓建设日益加剧。为了更好的治理数据和评估影响分析——血缘就是我们绕不过的抓手!
本文主旨:通过sqlparse解析sql获取血缘
数据血缘也称为数据血统或谱系,是来描述数据的来源和派生关系。说白了就是这个数据是怎么来的,经过了哪些过程或阶段,从哪些表,哪些字段计算得来的。
按照血缘关系划分节点,主要有以下三类:流出节点->中间节点->流入节点
流出节点: 数据提供方,血缘关系的源端节点。
中间节点: 血缘关系中类型最多的节点,既承接流入数据,又对外流出数据。
流入节点: 血缘关系的终端节点,一般为应用层,例如可视化报表、仪表板或业务系统。
当前数仓模型建设通常使用sql语言建设,而sql语言通过查表在插入表示着流出节点(from) 和 流入节点(insert)的关系。接下来就让我们开始着手准备解析sql
内容引用:作者:fanstuck
Python-sqlparse解析SQL工具库一文详解(一)
Python-sqlparse解析SQL工具库一文详解(二)
python脚本:
import sqlparse if __name__ == '__main__': sql = """ insert table dwd_table_name_prod_info_df ( ftime ,prod_id ,prod_name ) select ftime ,prod_id ,prod_name from ods_table_name_prod_info_df t1_1 where ftime = 20231223 """ parsed = sqlparse.parse(sql)[0] count = 0 for item in parsed.tokens: print(count, ':\t', type(item), '\t|', item.ttype, '\t|', item.value, '\t|', item.is_group) count += 1
执行结果(上述sql中换行符 和 连续空格 仅仅保留一个空格,避免结果爆炸)
结论:通过from关键字可定位from_表名
python脚本:
import sqlparse if __name__ == '__main__': sql = """ insert table dwd_table_name_prod_info_df ( ftime ,prod_id ,prod_name ,prod_price ) select t1.ftime ,t1.prod_id ,t1.prod_name ,t2.prod_price from ( select ftime ,prod_id ,prod_name from ods_table_name_prod_info_df t1_1 where ftime = 20231223 ) t1 left join ( select prod_id ,prod_price from ods_table_name_prod_price_df t2_1 where ftime = 20231223 ) t2 on t1.prod_id = t2.prod_id """ parsed = sqlparse.parse(sql)[0] count = 0 for item in parsed.tokens: print(count, ':\t', type(item), '\t|', item.ttype, '\t|', item.value, '\t|', item.is_group) count += 1
执行结果(上述sql中换行符 和 连续空格 仅仅保留一个空格,避免结果爆炸)
结论:需进入子查询后,按照2.1解析
python脚本:
import sqlparse if __name__ == '__main__': sql = """ insert table dwd_table_name_prod_info_df ( ftime ,prod_id ,prod_name ,prod_price ) select t1.ftime ,t1.prod_id ,t1.prod_name ,t2.prod_price from ods_table_name_prod_info_df t1 left join ods_table_name_prod_price_df t2 on t1.prod_id = t2.prod_id where ftime = 20231223 """ parsed = sqlparse.parse(sql)[0] count = 0 for item in parsed.tokens: print(count, ':\t', type(item), '\t|', item.ttype, '\t|', item.value, '\t|', item.is_group) count += 1
执行结果(上述sql中换行符 和 连续空格 仅仅保留一个空格,避免结果爆炸)
结论:通过join关键字也可直接解析from_表名
通过insert table 关键字解析 insert_表名
通过from 和 join 关键字解析from_表名
import re import sqlparse from sqlparse.sql import IdentifierList, Identifier from sqlparse.tokens import Keyword, Punctuation # 支持的join方式 ALL_JOIN_TYPE = ('JOIN', 'INNER JOIN', 'LEFT JOIN', 'LEFT OUTER JOIN', 'RIGHT JOIN', 'FULL OUTER JOIN', 'FULL JOIN', 'FULL OUT JOIN', 'LEFT SEMI JOIN', 'RIGHT SEMI JOIN') def format_sql(sql_str): """ 规范sql,剔除备注信息 :param sql_str: :return: """ format_mid_sql = "" sql_line_list = sql_str.split('\n') for sql_line in sql_line_list: # print("=" * 100) # print(sql_line) while True: res_str = re_check(r'\"([^\"]*)\"|\'([^\']*)\'', sql_line) if res_str[0]: res_sub_sql = re_check(r'--', res_str[1]) if res_sub_sql[0]: format_mid_sql += " " + res_sub_sql[1] break else: format_mid_sql += " " + res_str[1] + res_str[2] sql_line = res_str[3] else: res_sql = re_check(r'--', sql_line) if res_sql[0]: format_mid_sql += " " + res_sql[1] else: format_mid_sql += " " + sql_line break format_sql = re.sub(r'\s+', ' ', format_mid_sql.replace('\t', ' ').replace('\n', ' ')) return format_sql def re_check(rule, check_str): """ 正则模版 :param rule: :param check_str: :return: """ pattern = re.compile(rule, re.I) re_result = pattern.search(check_str) flag = False match_rule = None before_rule = None after_rule = None if re_result: match_rule = check_str[re_result.span()[0]:re_result.span()[1]] before_rule = check_str[:re_result.span()[0]] after_rule = check_str[re_result.span()[1]:] flag = True return flag, before_rule, match_rule, after_rule class BloodSqlparseAnalysis(object): def format_subselect(self, parsed): """ 规范子查询 """ str_par = str(parsed).strip() first_index = str_par.find('(') last_index = str_par.rfind(')') if first_index != -1 and last_index != -1: str_par = str_par[first_index+1:last_index] return str_par def is_subselect(self, parsed): """ 是否子查询:判断依据是否存在() 是否存在select :param parsed: :return: """ no_token_str = str(parsed) if no_token_str.find('(') != -1 and no_token_str.find(')') != -1 and no_token_str.upper().find('SELECT') != -1: return True return False def extract_table_identifiers(self, token_stream): """ 递归结果返回 :param token_stream: :return: """ for item in token_stream: if isinstance(item, IdentifierList): for identifier in item.get_identifiers(): result_tb_name = identifier.value.split(' ')[0] yield result_tb_name elif isinstance(item, Identifier): result_tb_name = item.value.split(' ')[0] yield result_tb_name elif item.ttype is Keyword and item.value.upper() not in ALL_JOIN_TYPE: yield item.value def extract_tables(self, sql): """ 提取sql中的from | join 后的表名 :param sql: :return: """ parsed = sqlparse.parse(sql)[0] from_seen = False count = 1 for item in parsed.tokens: # 定位问题备注 # print(from_seen, '|', count, ':\t', type(item), '\t|', item.ttype, '\t|', item.value, '\t|', item.is_group) if from_seen: if self.is_subselect(item): from_seen = False item = self.format_subselect(item) for x in self.extract_tables(item): yield x elif (str(item).upper().find('WHERE') != -1) or (item.ttype is Keyword and item.value.upper() not in ALL_JOIN_TYPE) or item.ttype is Punctuation: from_seen = False continue else: yield item elif (item.ttype is Keyword and item.value.upper() == 'FROM') or (item.ttype is Keyword and item.value.upper() in ALL_JOIN_TYPE): from_seen = True count += 1 def get_all_blood(self, sql): all_tb = self.extract_tables(sql) return list(self.extract_table_identifiers(all_tb)) def analysis_sql_blood(self, sql_str): """ 根据函数 estimate_sql_type 返回的类型,执行不同的sql解析操作,返回解析后所有的数据源表(剔除临时表) :param sql_str: sql字符串 :return: [数据源表] """ sql_str = format_sql(sql_str) res_sql_flag = re_check(r'insert\s+table\s+[^\,]*?\s*?\(|insert\s+into\s+table\s+[^\,]*?\s*?\(|insert\s+overwrite\s+table\s+[^\,]*?\s*?\(|insert\s+table\s+[^\,]*?\s+?select|insert\s+into\s+table\s+[^\,]*?\s+?select|insert\s+overwrite\s+table\s+[^\,]*?\s+?select', sql_str) if res_sql_flag[0]: sql_flag = self.estimate_sql_type(sql_str) if sql_flag == 'no_insert': #TODO 返回非insertsql,不解析血缘 insert_table = '' son_tables = [] elif sql_flag == 'no_with': insert_table = self.analysis_insert_tb_name(sql_str) son_tables = self.get_all_blood(sql_str) else: insert_table = self.analysis_insert_tb_name(sql_str) tmp_tb_name_list, split_sql_str_list = self.analysis_with_sql(sql_str) sub_tables = [] for sql_sub_str in split_sql_str_list: sub_son_tables = self.get_all_blood(sql_sub_str) sub_tables += sub_son_tables son_tables = list(set(sub_tables).difference(set(tmp_tb_name_list))) son_tables = list(set(son_tables)) else: insert_table = None son_tables = None return insert_table, son_tables def estimate_sql_type(self, sql_str): """ 判断sql的具体类型 :param sql_str: sql字符串 :return: 返回三种类型:no_insert|with|no_with """ flag = 'no_insert' # 需要解析血缘的 insert sql re_res = re_check(r'insert', sql_str) if re_res[0]: re_res_wiht = re_check(r'with\s+.*\s+as\s+\(', sql_str) if re_res_wiht[0]: flag = 'with' # sql中有临时表 else: flag = 'no_with' # sql无有临时表 return flag def analysis_with_sql(self, sql_str): """ 解析带with临时表的sql字符串,返回一个二维数组: 临时表 和 各临时表计算sql :param sql_str: sql字符串 :return: [[临时表名],[计算sql(拆分后)]] """ pattern_tmp_tb_name = re.compile(r'with\s+[^\,]*?\s+?as\s*?\(|\)\s*\,[^\,]*?\s+?as\s*?\(|\)\s*?select\s+?[^\,]*?\s+ftime', flags=re.I) re_res_tb = pattern_tmp_tb_name.findall(sql_str) tmp_tb_name_list = [] for have_tb_name_str in re_res_tb: re_res_tb = re_check(r'select', have_tb_name_str) if re_res_tb[0]: continue have_tb_name_list = have_tb_name_str.split(' ') for index in range(len(have_tb_name_list)): have_tb_name_list[index] = re.sub(r'\,|^with$|^as$|\(|\)|\s|\n|\t', '', have_tb_name_list[index], flags=re.I) tmp_tb_name = ''.join(have_tb_name_list) tmp_tb_name_list.append(tmp_tb_name) split_sql_str_list = re.split(r'with\s+[^\,]*?\s+?as\s*?\(|\)\s*\,[^\,]*?\s+?as\s*?\(|\)\s*?select\s+?[^\,]*?\s+ftime', sql_str, flags=re.I) split_sql_str_list[-1] = 'select ftime' + split_sql_str_list[-1] return tmp_tb_name_list, split_sql_str_list def analysis_insert_tb_name(self, sql_str): """ 根据runsql获取insert后表名 :param sql_str: sql字符串 :return: [数据源表] """ pattern_tmp_tb_name = re.compile( r'insert\s+table\s+[^\,]*?\s*?\(|insert\s+into\s+table\s+[^\,]*?\s*?\(|insert\s+overwrite\s+table\s+[^\,]*?\s*?\(|insert\s+table\s+[^\,]*?\s+?select|insert\s+into\s+table\s+[^\,]*?\s+?select|insert\s+overwrite\s+table\s+[^\,]*?\s+?select', flags=re.I) re_res_tb_name = pattern_tmp_tb_name.findall(sql_str) table_name = None if len(re_res_tb_name) == 1: insert_sql = re_res_tb_name[0] insert_sql = insert_sql.replace(' :: ', "::") insert_list = insert_sql.split(' ') insert_table_name_flag = False for insert_table in insert_list: if insert_table_name_flag: table_name = insert_table break if insert_table.upper() == 'TABLE': insert_table_name_flag = True if table_name[-1] == "(": table_name = table_name[:-1] return table_name if __name__ == '__main__': sql = """ insert table dwd_table_name_prod_info_df ( ftime ,prod_id ,prod_name ,prod_price ) select t1.ftime ,t1.prod_id ,t1.prod_name ,t2.prod_price from ( select ftime ,prod_id ,prod_name from ods_table_name_prod_info_df t1_1 where ftime = 20231223 ) t1 left join ( select t2_1.prod_id ,t2_1.prod_price ,t2_2.prod_number from ods_table_name_prod_price_df t2_1 left join ods_table_name_prod_number_df t2_2 where ftime = 20231223 ) t2 on t1.prod_id = t2.prod_id """ bsa = BloodSqlparseAnalysis() insert_table, from_table_list = bsa.analysis_sql_blood(sql) print(insert_table) print(from_table_list)
符合预期
以上就是今天分享的内容,本文仅仅简单介绍了python-sqlparse解析sql的方式,欢迎大家一起讨论呀。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。