当前位置:   article > 正文

spark SQL 表解析

spark SQL 表解析

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

因为最近基于dolphinscheduler做二次开发,任务实例血缘的呈现存在某些场景与需求:要解析spark sql下的所有任务的输入与输出表,所以需要特地了解怎么提取


提示:以下是本篇文章正文内容,下面案例可供参考

一、spark sql的执行过程

可以参看如下的文章:
https://www.cnblogs.com/ulysses-you/p/9762133.html;
https://blog.csdn.net/lisenyeahyeah/article/details/83539105;

二、提取sparksql的输入输出表

1.引入依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.2</version>
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <artifactId>log4j-slf4j-impl</artifactId>-->
<!--                    <groupId>org.apache.logging.log4j</groupId>-->
<!--                </exclusion>-->
<!--                <exclusion>-->
<!--                    <groupId>org.slf4j</groupId>-->
<!--                    <artifactId>slf4j-log4j12</artifactId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2.提取输入输出表

一下是解析sql,提取输入输出表的代码

package com.q_j_c.common.util;

import org.apache.commons.collections.CollectionUtils;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.expressions.*;
import org.apache.spark.sql.catalyst.plans.logical.*;
import org.apache.spark.sql.execution.SparkSqlParser;
import org.apache.spark.sql.execution.command.CacheTableCommand;
import org.apache.spark.sql.internal.SQLConf;
import scala.Option;
import scala.collection.JavaConversions;
import scala.collection.Seq;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author q_j_c
 * @version 1.0
 * @description
 * @date 2021/1/29 17:58
 */
public class SparkSqlParserUtils {
    public static final String INPUT_TABLE="inputTable";
    public static final String OUTPUT_TABLE="outputTable";
    public static final String CACHE_TABLE="cacheTable";
    private static SparkSqlParser sparkSqlParser= new SparkSqlParser(new SQLConf());

    private  SparkSqlParserUtils(){
    }

    /**
     * @description
     * @author q_j_c
     * @date 2021/2/9 18:30
     * @param tableMaps, curLogicalPlan, parentLogicalPlan
     * @return void
     */
    public static void visitedLogicalPlan(ConcurrentHashMap<String, Set<String>> tableMaps, LogicalPlan curLogicalPlan, LogicalPlan parentLogicalPlan){

        if (curLogicalPlan instanceof InsertIntoTable) {
            LogicalPlan table = ((InsertIntoTable) curLogicalPlan).table();
            visitedLogicalPlan(tableMaps, table, curLogicalPlan);
            LogicalPlan query = ((InsertIntoTable) curLogicalPlan).query();
            visitedLogicalPlan(tableMaps, query, curLogicalPlan);
        }
        //寻找保存表的信息的类
        if (curLogicalPlan instanceof UnresolvedRelation) {
            saveTableName(tableMaps, curLogicalPlan, parentLogicalPlan);
        }
        //联合关键字
        if (curLogicalPlan instanceof Union) {
            Seq<LogicalPlan> localChildren = curLogicalPlan.children();
            List<LogicalPlan> localPlans = JavaConversions.seqAsJavaList(localChildren);
            localPlans.forEach(localLogicalPlan -> {
                visitedLogicalPlan(tableMaps, localLogicalPlan, curLogicalPlan);
            });
        }
        //视图
        if (curLogicalPlan instanceof View) {
            LogicalPlan child = ((View)curLogicalPlan).child();
            visitedLogicalPlan(tableMaps, child, curLogicalPlan);
        }
        //二元节点
        if (curLogicalPlan instanceof BinaryNode) {
            LogicalPlan left = ((BinaryNode)curLogicalPlan).left();
            if (left != null) {
                visitedLogicalPlan(tableMaps, left, curLogicalPlan);
            }
            LogicalPlan right = ((BinaryNode) curLogicalPlan).right();
            if (right != null) {
                visitedLogicalPlan(tableMaps, right,curLogicalPlan);
            }
            if(curLogicalPlan instanceof Join){
                Option<Expression> condition = ((Join) (curLogicalPlan)).condition();
                Expression expression = condition.get();
                if(expression !=null){
                    visitedExpression(tableMaps,expression);
                }
            }
        }
        //一元节点
        if (curLogicalPlan instanceof UnaryNode) {
            LogicalPlan child = ((UnaryNode) curLogicalPlan).child();
            visitedLogicalPlan(tableMaps, child, curLogicalPlan);
        
            if(curLogicalPlan instanceof Filter){
                Expression condition = ((Filter) curLogicalPlan).condition();
                visitedExpression(tableMaps,condition);
            }
        }
        //处理临时表
        if (curLogicalPlan instanceof CacheTableCommand) {
            saveTableName(tableMaps,curLogicalPlan,curLogicalPlan);
            Option<LogicalPlan> plan = ((CacheTableCommand)curLogicalPlan).plan();
            LogicalPlan child = plan.get();
            visitedLogicalPlan(tableMaps, child,curLogicalPlan);
        }
    }
    private static void saveTableName(ConcurrentHashMap<String, Set<String>> tableMaps, LogicalPlan logicalPlan,LogicalPlan parentLogicalPlan){
        Set<String> tableNames = null;
        TableIdentifier tableIdentifier = null;
        if(parentLogicalPlan instanceof CacheTableCommand) {
            tableNames = tableMaps.get(CACHE_TABLE);
            tableIdentifier = ((CacheTableCommand) parentLogicalPlan).tableIdent();
        }
        if (parentLogicalPlan instanceof InsertIntoTable) {
            tableNames = tableMaps.get(OUTPUT_TABLE);
            tableIdentifier= ((UnresolvedRelation) logicalPlan).tableIdentifier();
        } else if(! (logicalPlan instanceof CacheTableCommand)){
            tableNames = tableMaps.get(INPUT_TABLE);
            tableIdentifier= ((UnresolvedRelation) logicalPlan).tableIdentifier();
        }
        if(tableIdentifier !=null&&tableNames !=null){
            String tableName = tableIdentifier.table();
            tableNames.add(tableName);
        }
    }

    public static Map<String,  Set<String>> getRealIOTableMap(Map<String,  Set<String>> tableMaps){
        Set<String> cacheTable = tableMaps.get(CACHE_TABLE);
        Set<String> inputTable = tableMaps.get(INPUT_TABLE);
        if(CollectionUtils.isNotEmpty(cacheTable)){
            inputTable.removeAll(cacheTable);
        }
        tableMaps.remove(CACHE_TABLE);
        return  tableMaps;
    }
    /**
     * @description
     * @author q_j_c
     * @date 2021/2/9 16:02
     * @param curExpression
     * @return void
     */
    private static void visitedExpression(ConcurrentHashMap<String, Set<String>> tableMaps,Expression curExpression){
        //解析In连接词
        if(curExpression instanceof In){
            Seq<Expression> list = ((In) curExpression).list();
            if(CollectionUtils.isNotEmpty(Collections.singleton(list))){
                handleExpressions(tableMaps,list);
            }
        }else if(curExpression instanceof SubqueryExpression){
            LogicalPlan plan = getLogicalPlanOnExpression(curExpression);
            if( plan !=null){
                visitedLogicalPlan(tableMaps,plan,plan);
            }
            Seq<Expression> children = curExpression.children();
            if(CollectionUtils.isNotEmpty(Collections.singleton(children))){
                handleExpressions(tableMaps,children);
            }
        }else {
            Expression expressionRight = handleBinaryOperatorExpressionOnRight(curExpression);
            if(expressionRight !=null){
                visitedExpression(tableMaps,expressionRight);
            }
            Expression expressionLeft = handleBinaryOperatorExpressionOnLeft(curExpression);
            if(expressionLeft !=null){
                visitedExpression(tableMaps,expressionLeft);
            }
        }
    }
    /**
     * @description 获取右端的值的表达式子
     * @author q_j_c
     * @date 2021/2/9 16:37
     * @param curExpression
     * @return org.apache.spark.sql.catalyst.expressions.Expression
     */
    private static Expression handleBinaryOperatorExpressionOnRight(Expression curExpression){
        Expression right=null;
        //解析操作符连接词
        if(curExpression instanceof BinaryOperator){
            if(curExpression instanceof Or){
                right = ((Or) curExpression).right();
            }else if(curExpression instanceof And){
                right = ((And) curExpression).right();
            }else if(curExpression instanceof BinaryComparison){
                if (curExpression instanceof EqualNullSafe){
                    right = ((EqualNullSafe) curExpression).right();
                }else if(curExpression instanceof GreaterThanOrEqual){
                    right = ((GreaterThanOrEqual) curExpression).right();
                }else if(curExpression instanceof LessThanOrEqual){
                    right = ((LessThanOrEqual) curExpression).right();
                }else if(curExpression instanceof LessThan){
                    right = ((LessThan) curExpression).right();
                }else if(curExpression instanceof GreaterThan){
                    right = ((GreaterThan) curExpression).right();
                }else if(curExpression instanceof EqualTo){
                    right = ((EqualTo) curExpression).right();
                }
            }
        }
        return right;
    }
    /**
     * @description 获取左端的值的表达式子
     * @author q_j_c
     * @date 2021/2/9 16:37
     * @param curExpression
     * @return org.apache.spark.sql.catalyst.expressions.Expression
     */
    private static Expression handleBinaryOperatorExpressionOnLeft(Expression curExpression){
        Expression left=null;
        //解析操作符连接词
        if(curExpression instanceof BinaryOperator){
            if(curExpression instanceof Or){
                left = ((Or) curExpression).left();
            }else if(curExpression instanceof And){
                left = ((And) curExpression).left();
            }else if(curExpression instanceof BinaryComparison){
                if (curExpression instanceof EqualNullSafe){
                    left = ((EqualNullSafe) curExpression).left();
                }else if(curExpression instanceof GreaterThanOrEqual){
                    left = ((GreaterThanOrEqual) curExpression).left();
                }else if(curExpression instanceof LessThanOrEqual){
                    left = ((LessThanOrEqual) curExpression).left();
                }else if(curExpression instanceof LessThan){
                    left = ((LessThan) curExpression).left();
                }else if(curExpression instanceof GreaterThan){
                    left = ((GreaterThan) curExpression).left();
                }else if(curExpression instanceof EqualTo){
                    left = ((EqualTo) curExpression).left();
                }
            }
        }
        return left;
    }
    private static LogicalPlan getLogicalPlanOnExpression(Expression curExpression){
        LogicalPlan plan=null;
        if(curExpression instanceof ListQuery){
            plan = ((ListQuery) curExpression).plan();
        }else if(curExpression instanceof ScalarSubquery){
            plan = ((ScalarSubquery) curExpression).plan();
        }else if(curExpression instanceof Exists){
            plan = ((Exists) curExpression).plan();
        }
        return plan;
    }
    private static void handleExpressions(ConcurrentHashMap<String, Set<String>> tableMaps, Seq<Expression> seqExpression){
        Collection<Expression> expressions= JavaConversions.asJavaCollection(seqExpression);
        expressions.forEach(expression -> {
            visitedExpression(tableMaps,expression);
        });
    }

    public static SparkSqlParser getSparkSqlParserInstance(){
        return sparkSqlParser;
   }

   public static ConcurrentHashMap<String, Set<String>>  getTableMaps(){
       ConcurrentHashMap<String, Set<String>> tableMap = new ConcurrentHashMap<>();
       tableMap.put(OUTPUT_TABLE,new HashSet<>());
       tableMap.put(INPUT_TABLE,new HashSet<>());
       tableMap.put(CACHE_TABLE,new HashSet<>());
       return tableMap;
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262

有需要的就拿去呀

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/422664
推荐阅读
相关标签
  

闽ICP备14008679号