当前位置:   article > 正文

python解析sql字段血缘_##[开源]HIVE数仓数据血缘分析工具-SQL解析

数据血缘分析开源工具

代码如下: Block类

package com.xiaoju.products.parse;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.Stack;import java.util.Map.Entry;import java.util.LinkedHashSet;import org.antlr.runtime.tree.Tree;import org.apache.hadoop.hive.ql.parse.ASTNode;import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;import org.apache.hadoop.hive.ql.parse.HiveParser;import org.apache.hadoop.hive.ql.parse.ParseDriver;import com.xiaoju.products.bean.Block;import com.xiaoju.products.bean.ColLine;import com.xiaoju.products.bean.QueryTree;import com.xiaoju.products.exception.SQLParseException;import com.xiaoju.products.exception.UnSupportedException;import com.xiaoju.products.util.Check;import com.xiaoju.products.util.MetaCache;import com.xiaoju.products.util.NumberUtil;import com.xiaoju.products.util.ParseUtil;import com.xiaoju.products.util.PropertyFileUtil;/** * hive sql解析类 * * 目的:实现HQL的语句解析,分析出输入输出表、字段和相应的处理条件。为字段级别的数据血缘提供基础。 * 重点:获取SELECT操作中的表和列的相关操作。其他操作这判断到字段级别。 * 实现思路:对AST深度优先遍历,遇到操作的token则判断当前的操作,遇到子句则压栈当前处理,处理子句。子句处理完,栈弹出。 * 处理字句的过程中,遇到子查询就保存当前子查询的信息,判断与其父查询的关系,最终形成树形结构; * 遇到字段或者条件处理则记录当前的字段和条件信息、组成Block,嵌套调用。 * 关键点解析 * 1、遇到TOK_TAB或TOK_TABREF则判断出当前操作的表 * 2、压栈判断是否是join,判断join条件 * 3、定义数据结构Block,遇到在where\select\join时获得其下相应的字段和条件,组成Block * 4、定义数据结构ColLine,遇到TOK_SUBQUERY保存当前的子查询信息,供父查询使用 * 5、定义数据结构ColLine,遇到TOK_UNION结束时,合并并截断当前的列信息 * 6、遇到select 或者未明确指出的字段,查询元数据进行辅助分析 * 7、解析结果进行相关校验 * 试用范围: * 1、支持标准SQL * 2、不支持transform using script * * @author yangyangthomas * /public class LineParser { private static final String SPLIT_DOT = "."; private static final String SPLIT_COMMA = ","; private static final String SPLIT_AND = "&"; private static final String TOK_EOF = ""; private static final String CON_WHERE = "WHERE:"; private static final String TOK_TMP_FILE = "TOK_TMP_FILE"; private Map> dbMap = new HashMap>(); private List queryTreeList = new ArrayList(); //子查询树形关系保存 private Stack> conditionsStack = new Stack>(); private Stack> colsStack = new Stack>(); private Map> resultQueryMap = new HashMap>(); private Set conditions = new HashSet(); //where or join 条件缓存 private List cols = new ArrayList(); //一个子查询内的列缓存 private Stack tableNameStack = new Stack(); private Stack joinStack = new Stack(); private Stack joinOnStack = new Stack(); private Map queryMap = new HashMap(); private boolean joinClause = false; private ASTNode joinOn = null; private String nowQueryDB = "default"; //hive的默认库 private boolean isCreateTable = false; //结果 private List colLines = new ArrayList(); private Set outputTables = new HashSet(); private Set inputTables = new HashSet(); private List tmpColLines = new ArrayList(); private Set tmpOutputTables = new HashSet(); private Set tmpInputTables = new HashSet(); public List getColLines() { return colLines; } public Set getOutputTables() { return outputTables; } public Set getInputTables() { return inputTables; } private void parseIteral(ASTNode ast) { prepareToParseCurrentNodeAndChilds(ast); parseChildNodes(ast); parseCurrentNode(ast); endParseCurrentNode(ast); } /** * 解析当前节点 * @param ast * @param set * @return / private void parseCurrentNode(ASTNode ast){ if (ast.getToken() != null) { switch (ast.getToken().getType()) { case HiveParser.TOK_CREATETABLE: //outputtable isCreateTable = true; String tableOut = fillDB(BaseSemanticAnalyzer.getUnescapedName((ASTNode) ast.getChild(0))); tmpOutputTables.add(tableOut); MetaCache.getInstance().init(tableOut); //初始化数据,供以后使用 break; case HiveParser.TOK_TAB:// outputTable String tableTab = BaseSemanticAnalyzer.getUnescapedName((ASTNode) ast.getChild(0)); String tableOut2 = fillDB(tableTab); tmpOutputTables.add(tableOut2); MetaCache.getInstance().init(tableOut2); //初始化数据,供以后使用 break; case HiveParser.TOK_TABREF:// inputTable ASTNode tabTree = (ASTNode) ast.getChild(0); String tableInFull = fillDB((tabTree.getChildCount() == 1) ? BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) : BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) + SPLIT_DOT + BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(1)) ); String tableIn = tableInFull.substring(tableInFull.indexOf(SPLIT_DOT) + 1); tmpInputTables.add(tableInFull); MetaCache.getInstance().init(tableInFull); //初始化数据,供以后使用 queryMap.clear(); String alia = null; if (ast.getChild(1) != null) { //(TOK_TABREF (TOK_TABNAME detail usersequence_client) c) alia = ast.getChild(1).getText().toLowerCase(); QueryTree qt = new QueryTree(); qt.setCurrent(alia); qt.getTableSet().add(tableInFull); QueryTree pTree = getSubQueryParent(ast); qt.setpId(pTree.getpId()); qt.setParent(pTree.getParent()); queryTreeList.add(qt); if (joinClause && ast.getParent() == joinOn) { // TOK_SUBQUERY join TOK_TABREF ,此处的TOK_SUBQUERY信息不应该清楚 for (QueryTree entry : queryTreeList) { //当前的查询范围 if (qt.getParent().equals(entry.getParent())) { queryMap.put(entry.getCurrent(), entry); } } } else { queryMap.put(qt.getCurrent(), qt); } } else { alia = tableIn.toLowerCase(); QueryTree qt = new QueryTree(); qt.setCurrent(alia); qt.getTableSet().add(tableInFull); QueryTree pTree = getSubQueryParent(ast); qt.setpId(pTree.getpId()); qt.setParent(pTree.getParent()); queryTreeList.add(qt); if (joinClause && ast.getParent() == joinOn) { for (QueryTree entry : queryTreeList) { if (qt.getParent().equals(entry.getParent())) { queryMap.put(entry.getCurrent(), entry); } } } else { queryMap.put(qt.getCurrent(), qt); //此处检查查询 select app.t1.c1,t1.c1 from t1 的情况 queryMap.put(tableInFull.toLowerCase(), qt); } } break; case HiveParser.TOK_SUBQUERY: if (ast.getChildCount() == 2) { String tableAlias = BaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(1).getText()); String aliaReal = ""; if(aliaReal.length() !=0){ aliaReal = aliaReal.substring(0, aliaReal.length()-1); } QueryTree qt = new QueryTree(); qt.setCurrent(tableAlias.toLowerCase()); qt.setColLineList(generateColLineList(cols, conditions)); QueryTree pTree = getSubQueryParent(ast); qt.setId(generateTreeId(ast)); qt.setpId(pTree.getpId()); qt.setParent(pTree.getParent()); qt.setChildList(getSubQueryChilds(qt.getId())); if (Check.notEmpty(qt.getChildList())) { for (QueryTree cqt : qt.getChildList()) { qt.getTableSet().addAll(cqt.getTableSet()); queryTreeList.remove(cqt); // 移除子节点信息 } } queryTreeList.add(qt); cols.clear(); queryMap.clear(); for (QueryTree _qt : queryTreeList) { if (qt.getParent().equals( _qt.getParent())) { //当前子查询才保存 queryMap.put(_qt.getCurrent(), _qt); } } } break; case HiveParser.TOK_SELEXPR: //输入输出字段的处理 /* * (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) * (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) * * (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) * (TOK_SELECT * (TOK_SELEXPR (. (TOK_TABLE_OR_COL p) datekey) datekey) * (TOK_SELEXPR (TOK_TABLE_OR_COL datekey)) * (TOK_SELEXPR (TOK_FUNCTIONDI count (. (TOK_TABLE_OR_COL base) userid)) buyer_count)) * (TOK_SELEXPR (TOK_FUNCTION when (> (. (TOK_TABLE_OR_COL base) userid) 5) (. (TOK_TABLE_OR_COL base) clienttype) (> (. (TOK_TABLE_OR_COL base) userid) 1) (+ (. (TOK_TABLE_OR_COL base) datekey) 5) (+ (. (TOK_TABLE_OR_COL base) clienttype) 1)) bbbaaa) / //解析需要插入的表 Tree tok_insert = ast.getParent().getParent(); Tree child = tok_insert.getChild(0).getChild(0); String tName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) child.getChild(0)); String destTable = TOK_TMP_FILE.equals(tName) ? TOK_TMP_FILE : fillDB(tName); //select a.,* from t1 和 select * from (select c1 as a,c2 from t1) t 的情况 if (ast.getChild(0).getType() == HiveParser.TOK_ALLCOLREF) { String tableOrAlias = ""; if (ast.getChild(0).getChild(0) != null) { tableOrAlias = ast.getChild(0).getChild(0).getChild(0).getText(); } String[] result = getTab

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/923750
推荐阅读
相关标签
  

闽ICP备14008679号