EmmyMiao87 commented on a change in pull request #2821: The new materialized view selector URL: https://github.com/apache/incubator-doris/pull/2821#discussion_r375163071
########## File path: fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java ########## @@ -0,0 +1,453 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * The new materialized view selector supports SPJ<->SPJG. + * At the same time, it is compatible with all the features of the old version. + * The SPJ query is "Select Projection and Join" such as: + * select t1.c1 from t1, t2 where t1.c2=t2.c2 and t1.c3=1; + * The SPJG query is "Select Projection Join and Group-by" such as: + * select t1.c1, sum(t2.c1) from t1, t2 where t1.c2=t2.c2 and t1.c3=1 group by t1.c1; + */ +public class MaterializedViewSelector { + private static final Logger LOG = LogManager.getLogger(MaterializedViewSelector.class); + + private final SelectStmt selectStmt; + private final Analyzer analyzer; + + private Map<String, Set<String>> columnNamesInPredicates = Maps.newHashMap(); + private boolean isSPJQuery; + private Map<String, Set<String>> columnNamesInGrouping = Maps.newHashMap(); + private Map<String, Set<AggregatedColumn>> aggregateColumnsInQuery = Maps.newHashMap(); + private Map<String, Set<String>> columnNamesInQueryOutput = Maps.newHashMap(); + + private boolean disableSPJGView; + private String reasonOfDisable; + private boolean isPreAggregation = true; + + public MaterializedViewSelector(SelectStmt selectStmt, Analyzer analyzer) { + this.selectStmt = selectStmt; + this.analyzer = analyzer; + init(); + } + + /** + * There are two stages to choosing the best MV. + * Phase 1: Predicates + * According to aggregation and column information in the select stmt, + * the candidate MVs that meets the query conditions are selected. + * Phase 2: Priorities + * According to prefix index and row count in candidate MVs, + * the best MV is selected. + * + * @param scanNode + * @return + */ + public void selectBestMV(ScanNode scanNode) throws UserException { + long start = System.currentTimeMillis(); + Preconditions.checkState(scanNode instanceof OlapScanNode); + OlapScanNode olapScanNode = (OlapScanNode) scanNode; + Map<Long, List<Column>> candidateIndexIdToSchema = predicates(olapScanNode); + long bestIndexId = priorities(olapScanNode, candidateIndexIdToSchema); + LOG.info("The best materialized view is {} for scan node {} in query {}, cost {}", + bestIndexId, scanNode.getId(), selectStmt.toSql(), (System.currentTimeMillis() - start)); + olapScanNode.updateScanRangeInfo(bestIndexId, isPreAggregation, reasonOfDisable); + } + + private Map<Long, List<Column>> predicates(OlapScanNode scanNode) { + // Step1: all of predicates is compensating predicates + Map<Long, List<Column>> candidateIndexIdToSchema = scanNode.getOlapTable().getVisibleIndexes(); + OlapTable table = scanNode.getOlapTable(); + Preconditions.checkState(table != null); + String tableName = table.getName(); + // Step2: check all columns in compensating predicates are available in the view output + checkCompensatingPredicates(columnNamesInPredicates.get(tableName), candidateIndexIdToSchema); + // Step3: group by list in query is the subset of group by list in view or view contains no aggregation + checkGrouping(columnNamesInGrouping.get(tableName), candidateIndexIdToSchema); + // Step4: aggregation functions are available in the view output + checkAggregationFunction(aggregateColumnsInQuery.get(tableName), candidateIndexIdToSchema); + // Step5: columns required to compute output expr are available in the view output + checkOutputColumns(columnNamesInQueryOutput.get(tableName), candidateIndexIdToSchema); + // Step6: if table type is aggregate and the candidateIndexIdToSchema is empty, + if (table.getKeysType() == KeysType.AGG_KEYS && candidateIndexIdToSchema.size() == 0) { + // the base index will be added in the candidateIndexIdToSchema. + compensateIndex(candidateIndexIdToSchema, scanNode.getOlapTable().getVisibleIndexes(), + table.getSchemaByIndexId(table.getBaseIndexId()).size()); + } + return candidateIndexIdToSchema; + } + + private long priorities(OlapScanNode scanNode, Map<Long, List<Column>> candidateIndexIdToSchema) { + // Step1: the candidate indexes that satisfies the most prefix index + final Set<String> equivalenceColumns = Sets.newHashSet(); + final Set<String> unequivalenceColumns = Sets.newHashSet(); + scanNode.collectColumns(analyzer, equivalenceColumns, unequivalenceColumns); + Set<Long> indexesMatchingBestPrefixIndex = + matchBestPrefixIndex(candidateIndexIdToSchema, equivalenceColumns, unequivalenceColumns); + if (indexesMatchingBestPrefixIndex.isEmpty()) { + indexesMatchingBestPrefixIndex = candidateIndexIdToSchema.keySet(); + } + + // Step2: the best index that satisfies the least number of rows + return selectBestRowCountIndex(indexesMatchingBestPrefixIndex, scanNode.getOlapTable(), scanNode + .getSelectedPartitionIds()); + } + + private Set<Long> matchBestPrefixIndex(Map<Long, List<Column>> candidateIndexIdToSchema, + Set<String> equivalenceColumns, + Set<String> unequivalenceColumns) { + if (equivalenceColumns.size() == 0 && unequivalenceColumns.size() == 0) { + return candidateIndexIdToSchema.keySet(); + } + Set<Long> indexesMatchingBestPrefixIndex = Sets.newHashSet(); + int maxPrefixMatchCount = 0; + for (Map.Entry<Long, List<Column>> entry : candidateIndexIdToSchema.entrySet()) { + int prefixMatchCount = 0; + long indexId = entry.getKey(); + List<Column> indexSchema = entry.getValue(); + for (Column col : indexSchema) { + if (equivalenceColumns.contains(col.getName())) { + prefixMatchCount++; + } else if (unequivalenceColumns.contains(col.getName())) { + // Unequivalence predicate's columns can match only first column in rollup. + prefixMatchCount++; + break; + } else { + break; + } + } + + if (prefixMatchCount == maxPrefixMatchCount) { + LOG.debug("find a equal prefix match index {}. match count: {}", indexId, prefixMatchCount); + indexesMatchingBestPrefixIndex.add(indexId); + } else if (prefixMatchCount > maxPrefixMatchCount) { + LOG.debug("find a better prefix match index {}. match count: {}", indexId, prefixMatchCount); + maxPrefixMatchCount = prefixMatchCount; + indexesMatchingBestPrefixIndex.clear(); + indexesMatchingBestPrefixIndex.add(indexId); + } + } + LOG.debug("Those mv match the best prefix index:" + Joiner.on(",").join(indexesMatchingBestPrefixIndex)); + return indexesMatchingBestPrefixIndex; + } + + private long selectBestRowCountIndex(Set<Long> indexesMatchingBestPrefixIndex, OlapTable olapTable, + Collection<Long> partitionIds) { + long minRowCount = Long.MAX_VALUE; + long selectedIndexId = 0; + for (Long indexId : indexesMatchingBestPrefixIndex) { + long rowCount = 0; + for (Long partitionId : partitionIds) { + rowCount += olapTable.getPartition(partitionId).getIndex(indexId).getRowCount(); + } + LOG.debug("rowCount={} for table={}", rowCount, indexId); + if (rowCount < minRowCount) { + minRowCount = rowCount; + selectedIndexId = indexId; + } else if (rowCount == minRowCount) { + // check column number, select one minimum column number + int selectedColumnSize = olapTable.getIndexIdToSchema().get(selectedIndexId).size(); + int currColumnSize = olapTable.getIndexIdToSchema().get(indexId).size(); + if (currColumnSize < selectedColumnSize) { + selectedIndexId = indexId; + } + } + } + String tableName = olapTable.getName(); + String v2RollupIndexName = "__v2_" + tableName; + Long v2RollupIndex = olapTable.getIndexIdByName(v2RollupIndexName); + long baseIndexId = olapTable.getBaseIndexId(); + ConnectContext connectContext = ConnectContext.get(); + boolean useV2Rollup = false; + if (connectContext != null) { + useV2Rollup = connectContext.getSessionVariable().getUseV2Rollup(); + } + if (baseIndexId == selectedIndexId && v2RollupIndex != null && useV2Rollup) { + // if the selectedIndexId is baseIndexId + // check whether there is a V2 rollup index and useV2Rollup flag is true, + // if both true, use v2 rollup index + selectedIndexId = v2RollupIndex; + } + if (!useV2Rollup && v2RollupIndex != null && v2RollupIndex == selectedIndexId) { + // if the selectedIndexId is v2RollupIndex + // but useV2Rollup is false, use baseIndexId as selectedIndexId + // just make sure to use baseIndex instead of v2RollupIndex if the useV2Rollup is false + selectedIndexId = baseIndexId; + } + return selectedIndexId; + } + + private void checkCompensatingPredicates(Set<String> columnsInPredicates, + Map<Long, List<Column>> candidateIndexIdToSchema) { + // When the query statement does not contain any columns in predicates, all candidate index can pass this check + if (columnsInPredicates == null) { + return; + } + Iterator<Map.Entry<Long, List<Column>>> iterator = candidateIndexIdToSchema.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, List<Column>> entry = iterator.next(); + Set<String> indexNonAggregatedColumnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + entry.getValue().stream().filter(column -> !column.isAggregated()) + .forEach(column -> indexNonAggregatedColumnNames.add(column.getName())); + if (!indexNonAggregatedColumnNames.containsAll(columnsInPredicates)) { + iterator.remove(); + } + } + LOG.debug("Those mv pass the test of compensating predicates:" + + Joiner.on(",").join(candidateIndexIdToSchema.keySet())); + } + + /** + * View Query result + * SPJ SPJG OR SPJ pass + * SPJG SPJ fail + * SPJG SPJG pass + * 1. grouping columns in query is subset of grouping columns in view + * 2. the empty grouping columns in query is subset of all of views + * + * @param columnsInGrouping + * @param candidateIndexIdToSchema + */ + + private void checkGrouping(Set<String> columnsInGrouping, Map<Long, List<Column>> candidateIndexIdToSchema) { + Iterator<Map.Entry<Long, List<Column>>> iterator = candidateIndexIdToSchema.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, List<Column>> entry = iterator.next(); + Set<String> indexNonAggregatedColumnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + List<Column> candidateIndexSchema = entry.getValue(); + candidateIndexSchema.stream().filter(column -> !column.isAggregated()) + .forEach(column -> indexNonAggregatedColumnNames.add(column.getName())); + // When the candidate index is SPJ type, it passes the verification directly + if (indexNonAggregatedColumnNames.size() == candidateIndexSchema.size()) { + continue; + } + // When the query is SPJ type but the candidate index is SPJG type, it will not pass directly. + if (isSPJQuery || disableSPJGView) { + iterator.remove(); + continue; + } + // The query is SPJG. The candidate index is SPJG too. + // The grouping columns in query is empty. For example: select sum(A) from T + if (columnsInGrouping == null) { + continue; + } + // The grouping columns in query must be subset of the grouping columns in view + if (!indexNonAggregatedColumnNames.containsAll(columnsInGrouping)) { + iterator.remove(); + } + } + LOG.debug("Those mv pass the test of grouping:" + + Joiner.on(",").join(candidateIndexIdToSchema.keySet())); + } + + private void checkAggregationFunction(Set<AggregatedColumn> aggregatedColumnsInQueryOutput, + Map<Long, List<Column>> candidateIndexIdToSchema) { Review comment: No, it can't. If the aggregatedColumnInQueryOutput is null or empty, the query will be a SPJ. But the candidate index such as SPJG should be filter in the following code. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org