Merge commit 'ea4edd6a76bb0cb176dbc8e35851c783d7210d04' into kap-2.4.x

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/91c08e8a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/91c08e8a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/91c08e8a

Branch: refs/heads/2.1.x
Commit: 91c08e8a8336284c6ed92f03e17c5eed9f2fe8d0
Parents: e164b4a ea4edd6
Author: Hongbin Ma <mahong...@apache.org>
Authored: Tue Aug 1 10:32:25 2017 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Tue Aug 1 10:32:25 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/metadata/model/TableDesc.java    |  3 ---
 .../kylin/engine/mr/common/HadoopCmdOutput.java   | 17 +++++++++--------
 .../org/apache/kylin/query/util/PushDownUtil.java |  2 +-
 webapp/app/js/controllers/cubeMeasures.js         | 18 +++++++++++-------
 webapp/app/partials/cubeDesigner/measures.html    |  5 ++---
 webapp/app/partials/cubes/cube_detail.html        |  4 ++--
 6 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/91c08e8a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --cc 
core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index a2a2c24,d3543dd..989f47b
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@@ -63,18 -63,18 +63,15 @@@ public class TableDesc extends RootPers
      public TableDesc(TableDesc other) {
          this.uuid = other.uuid;
          this.lastModified = other.lastModified;
- 
 -        
          this.name = other.name;
          this.sourceType = other.sourceType;
          this.tableType = other.tableType;
          this.dataGen = other.dataGen;
- 
 -        
          this.columns = new ColumnDesc[other.columns.length];
          for (int i = 0; i < other.columns.length; i++) {
              this.columns[i] = new ColumnDesc(other.columns[i]);
              this.columns[i].init(this);
          }
- 
 -        
          this.database.setName(other.getDatabase());
          this.identity = other.identity;
      }

http://git-wip-us.apache.org/repos/asf/kylin/blob/91c08e8a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
----------------------------------------------------------------------
diff --cc query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index f496626,0000000..3edd899
mode 100644,000000..100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@@ -1,294 -1,0 +1,294 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 +*/
 +
 +package org.apache.kylin.query.util;
 +
 +import java.sql.SQLException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
 +import org.apache.calcite.sql.SqlBasicCall;
 +import org.apache.calcite.sql.SqlCall;
 +import org.apache.calcite.sql.SqlDataTypeSpec;
 +import org.apache.calcite.sql.SqlDynamicParam;
 +import org.apache.calcite.sql.SqlIdentifier;
 +import org.apache.calcite.sql.SqlIntervalQualifier;
 +import org.apache.calcite.sql.SqlJoin;
 +import org.apache.calcite.sql.SqlLiteral;
 +import org.apache.calcite.sql.SqlNode;
 +import org.apache.calcite.sql.SqlNodeList;
 +import org.apache.calcite.sql.util.SqlVisitor;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.commons.lang.text.StrBuilder;
 +import org.apache.commons.lang3.exception.ExceptionUtils;
 +import org.apache.commons.lang3.tuple.Pair;
 +import org.apache.commons.lang3.tuple.Triple;
 +import org.apache.kylin.common.KylinConfig;
 +import org.apache.kylin.common.util.ClassUtil;
 +import org.apache.kylin.metadata.MetadataManager;
 +import org.apache.kylin.metadata.model.ComputedColumnDesc;
 +import org.apache.kylin.metadata.model.DataModelDesc;
 +import org.apache.kylin.metadata.model.tool.CalciteParser;
 +import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 +import org.apache.kylin.query.routing.NoRealizationFoundException;
 +import org.apache.kylin.source.adhocquery.IPushDownConverter;
 +import org.apache.kylin.source.adhocquery.IPushDownRunner;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Lists;
 +
 +public class PushDownUtil {
 +    private static final Logger logger = 
LoggerFactory.getLogger(PushDownUtil.class);
 +
 +    public static boolean doPushDownQuery(String project, String sql, String 
schema, List<List<String>> results,
 +            List<SelectedColumnMeta> columnMetas, SQLException sqlException) 
throws Exception {
 +
 +        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
 +        if (!kylinConfig.isPushDownEnabled()) {
 +            return false;
 +        }
 +
 +        Throwable rootCause = ExceptionUtils.getRootCause(sqlException);
 +        boolean isExpectedCause = rootCause != null && 
(rootCause.getClass().equals(NoRealizationFoundException.class));
 +
 +        if (isExpectedCause) {
 +            logger.info("Query failed to utilize pre-calculation, routing to 
other engines", sqlException);
 +            IPushDownRunner runner = (IPushDownRunner) 
ClassUtil.newInstance(kylinConfig.getPushDownRunnerClassName());
 +            IPushDownConverter converter = (IPushDownConverter) ClassUtil
 +                    .newInstance(kylinConfig.getPushDownConverterClassName());
 +
 +            runner.init(kylinConfig);
 +
 +            logger.debug("Query pushdown runner {}", runner);
 +
 +            String expandCC = restoreComputedColumnToExpr(sql, project);
 +            if (!StringUtils.equals(expandCC, sql)) {
 +                logger.info("computed column in sql is expanded to:  " + 
expandCC);
 +            }
 +            if (schema != null && !schema.equals("DEFAULT")) {
 +                expandCC = schemaCompletion(expandCC, schema);
 +            }
 +            String adhocSql = converter.convert(expandCC);
 +            if (!adhocSql.equals(expandCC)) {
 +                logger.info("the query is converted to {} according to 
kylin.query.pushdown.converter-class-name",
 +                        adhocSql);
 +            }
 +
 +            runner.executeQuery(adhocSql, results, columnMetas);
 +            return true;
 +        } else {
 +            return false;
 +        }
 +    }
 +
 +    static String schemaCompletion(String inputSql, String schema) {
 +        if (inputSql == null || inputSql.equals("")) {
 +            return "";
 +        }
 +        SqlNode fromNode = CalciteParser.getFromNode(inputSql);
 +
 +        // get all table node that don't have schema by visitor pattern
 +        FromTablesVisitor ftv = new FromTablesVisitor();
 +        fromNode.accept(ftv);
 +        List<SqlNode> tablesWithoutSchema = ftv.getTablesWithoutSchema();
 +
 +        List<Pair<Integer, Integer>> tablesPos = new ArrayList<>();
 +        for (SqlNode tables : tablesWithoutSchema) {
 +            tablesPos.add(CalciteParser.getReplacePos(tables, inputSql));
 +        }
 +
 +        // make the behind position in the front of the list, so that the 
front position will not be affected when replaced
 +        Collections.sort(tablesPos);
 +        Collections.reverse(tablesPos);
 +
 +        StrBuilder afterConvert = new StrBuilder(inputSql);
 +        for (Pair<Integer, Integer> pos : tablesPos) {
 +            String tableWithSchema = schema + "." + 
inputSql.substring(pos.getLeft(), pos.getRight());
 +            afterConvert.replace(pos.getLeft(), pos.getRight(), 
tableWithSchema);
 +        }
 +        return afterConvert.toString();
 +    }
 +
 +    private final static Pattern identifierInSqlPattern = Pattern.compile(
 +            //find pattern like "table"."column" or "column"
 +            
"((?<![\\p{L}_0-9\\.\\\"])(\\\"[\\p{L}_0-9]+\\\"\\.)?(\\\"[\\p{L}_0-9]+\\\")(?![\\p{L}_0-9\\.\\\"]))"
 + "|"
 +            //find pattern like table.column or column
 +                    + 
"((?<![\\p{L}_0-9\\.\\\"])([\\p{L}_0-9]+\\.)?([\\p{L}_0-9]+)(?![\\p{L}_0-9\\.\\\"]))");
 +
 +    private final static Pattern identifierInExprPattern = Pattern.compile(
 +            // a.b.c
 +            
"((?<![\\p{L}_0-9\\.\\\"])([\\p{L}_0-9]+\\.)([\\p{L}_0-9]+\\.)([\\p{L}_0-9]+)(?![\\p{L}_0-9\\.\\\"]))");
 +
 +    private final static Pattern endWithAsPattern = 
Pattern.compile("\\s+as\\s+$", Pattern.CASE_INSENSITIVE);
 +
 +    public static String restoreComputedColumnToExpr(String beforeSql, String 
project) {
 +        final MetadataManager metadataManager = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
 +        List<DataModelDesc> dataModelDescs = 
metadataManager.getModels(project);
 +
 +        String afterSql = beforeSql;
 +        for (DataModelDesc dataModelDesc : dataModelDescs) {
 +            for (ComputedColumnDesc computedColumnDesc : 
dataModelDesc.getComputedColumnDescs()) {
 +                afterSql = restoreComputedColumnToExpr(afterSql, 
computedColumnDesc);
 +            }
 +        }
 +        return afterSql;
 +    }
 +
 +    static String restoreComputedColumnToExpr(String sql, ComputedColumnDesc 
computedColumnDesc) {
 +
 +        String ccName = computedColumnDesc.getColumnName();
 +        List<Triple<Integer, Integer, String>> replacements = 
Lists.newArrayList();
 +        Matcher matcher = identifierInSqlPattern.matcher(sql);
 +
 +        while (matcher.find()) {
 +            if (matcher.group(1) != null) { //with quote case: 
"TABLE"."COLUMN"
 +
 +                String quotedColumnName = matcher.group(3);
 +                Preconditions.checkNotNull(quotedColumnName);
 +                String columnName = StringUtils.strip(quotedColumnName, "\"");
 +                if (!columnName.equalsIgnoreCase(ccName)) {
 +                    continue;
 +                }
 +
 +                if (matcher.group(2) != null) { // table name exist 
 +                    String quotedTableAlias = 
StringUtils.strip(matcher.group(2), ".");
 +                    String tableAlias = StringUtils.strip(quotedTableAlias, 
"\"");
 +                    replacements.add(Triple.of(matcher.start(1), 
matcher.end(1),
 +                            
replaceIdentifierInExpr(computedColumnDesc.getExpression(), tableAlias, true)));
 +                } else { //only column
 +                    if (endWithAsPattern.matcher(sql.substring(0, 
matcher.start(1))).find()) {
 +                        //select DEAL_AMOUNT as "deal_amount" case
 +                        continue;
 +                    }
 +                    replacements.add(Triple.of(matcher.start(1), 
matcher.end(1),
 +                            
replaceIdentifierInExpr(computedColumnDesc.getExpression(), null, true)));
 +                }
 +            } else if (matcher.group(4) != null) { //without quote case: 
table.column or simply column
 +                String columnName = matcher.group(6);
 +                Preconditions.checkNotNull(columnName);
 +                if (!columnName.equalsIgnoreCase(ccName)) {
 +                    continue;
 +                }
 +
 +                if (matcher.group(5) != null) { //table name exist
 +                    String tableAlias = StringUtils.strip(matcher.group(5), 
".");
 +                    replacements.add(Triple.of(matcher.start(4), 
matcher.end(4),
 +                            
replaceIdentifierInExpr(computedColumnDesc.getExpression(), tableAlias, 
false)));
 +
 +                } else { //only column 
 +                    if (endWithAsPattern.matcher(sql.substring(0, 
matcher.start(4))).find()) {
 +                        //select DEAL_AMOUNT as deal_amount case
 +                        continue;
 +                    }
 +                    replacements.add(Triple.of(matcher.start(4), 
matcher.end(4),
 +                            
replaceIdentifierInExpr(computedColumnDesc.getExpression(), null, false)));
 +                }
 +            }
 +        }
 +
 +        Collections.reverse(replacements);
 +        for (Triple<Integer, Integer, String> triple : replacements) {
 +            sql = sql.substring(0, triple.getLeft()) + "(" + 
triple.getRight() + ")"
 +                    + sql.substring(triple.getMiddle());
 +        }
 +        return sql;
 +    }
 +
 +    static String replaceIdentifierInExpr(String expr, String tableAlias, 
boolean quoted) {
 +        if (tableAlias == null) {
 +            return expr;
 +        }
 +
 +        return CalciteParser.insertAliasInExpr(expr, tableAlias);
 +    }
 +}
 +
 +/**
 + * Created by jiatao.tao
 + * Get all the tables from "FROM" clause that without schema
 + */
 +class FromTablesVisitor implements SqlVisitor<SqlNode> {
 +    private List<SqlNode> tables;
 +
 +    FromTablesVisitor() {
 +        this.tables = new ArrayList<>();
 +    }
 +
 +    List<SqlNode> getTablesWithoutSchema() {
 +        return tables;
 +    }
 +
 +    @Override
 +    public SqlNode visit(SqlNodeList nodeList) {
 +        return null;
 +    }
 +
 +    @Override
 +    public SqlNode visit(SqlLiteral literal) {
 +        return null;
 +    }
 +
 +    @Override
 +    public SqlNode visit(SqlCall call) {
 +        if (call instanceof SqlBasicCall) {
 +            SqlBasicCall node = (SqlBasicCall) call;
 +            node.getOperands()[0].accept(this);
 +            return null;
 +        }
 +        if (call instanceof SqlJoin) {
 +            SqlJoin node = (SqlJoin) call;
 +            node.getLeft().accept(this);
 +            node.getRight().accept(this);
 +            return null;
 +        }
 +        for (SqlNode operand : call.getOperandList()) {
 +            if (operand != null) {
 +                operand.accept(this);
 +            }
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public SqlNode visit(SqlIdentifier id) {
 +        if (id.names.size() == 1) {
 +            tables.add(id);
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public SqlNode visit(SqlDataTypeSpec type) {
 +        return null;
 +    }
 +
 +    @Override
 +    public SqlNode visit(SqlDynamicParam param) {
 +        return null;
 +    }
 +
 +    @Override
 +    public SqlNode visit(SqlIntervalQualifier intervalQualifier) {
 +        return null;
 +    }
- }
++}

Reply via email to