This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4bda83a3388a0ac79a8989160068a790919d43fe Author: Pengfei Zhan <dethr...@gmail.com> AuthorDate: Sat Apr 8 23:21:35 2023 +0800 KYLIN-5633 optimize the pruning process of heterogeneous segments --- .../org/apache/kylin/common/KylinConfigBase.java | 32 ++- .../java/org/apache/kylin/common/QueryContext.java | 3 +- .../kylin/metadata/cube/cuboid/ChooserContext.java | 11 +- .../metadata/cube/cuboid/NLayoutCandidate.java | 129 ++++----- .../metadata/cube/cuboid/NLookupCandidate.java | 4 +- .../metadata/cube/cuboid/NQueryLayoutChooser.java | 184 +++++++++--- .../cube/model/NDataflowCapabilityChecker.java | 10 +- .../metadata/realization/CapabilityResult.java | 17 +- .../metadata/realization/HybridRealization.java | 6 +- .../routing/HeterogeneousSegmentPruningTest.java | 143 ++++++++- .../routing/NDataflowCapabilityCheckerTest.java | 2 +- .../query/routing/VacantIndexPruningRuleTest.java | 212 ++++++++++++++ .../org/apache/kylin/query/routing/Candidate.java | 6 +- .../apache/kylin/query/routing/QueryRouter.java | 17 +- .../kylin/query/routing/RealizationChooser.java | 11 +- .../query/routing/VacantIndexPruningRule.java | 142 +++++++++ .../apache/kylin/rest/response/SQLResponse.java | 15 +- .../apache/kylin/rest/service/QueryService.java | 21 +- .../kylin/query/util/QueryContextCutter.java | 9 +- .../kylin/query/routing/CandidateSortTest.java | 320 +++++---------------- .../kylin/query/routing/CandidateTestUtils.java | 81 ++++++ .../query/routing/LayoutCandidateSortTest.java | 320 +++++++++++++++++++++ .../kylin/query/routing/QueryRouterTest.java | 167 +++++++++++ .../sql/execution/datasource/FilePruner.scala | 8 +- 24 files changed, 1430 insertions(+), 440 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 9557ede4d5..c0797ebc18 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2308,6 +2308,34 @@ public abstract class KylinConfigBase implements Serializable { "org.apache.kylin.query.util.SparkSQLFunctionConverter" }); } + public static final String USE_VACANT_INDEXES = "use-vacant-indexes"; + public static final String USE_TABLE_INDEX_ANSWER_SELECT_STAR = "use-table-index-answer-select-star"; + + public String getQueryIndexMatchRules() { + return getOptional("kylin.query.index-match-rules", ""); + } + + private Set<String> getPruningRules() { + String queryIndexMatchRules = getQueryIndexMatchRules(); + String[] splitRules = queryIndexMatchRules.split(","); + Set<String> configRules = Sets.newHashSet(); + for (String splitRule : splitRules) { + if (StringUtils.isNotBlank(splitRule)) { + configRules.add(StringUtils.lowerCase(splitRule)); + } + } + return configRules; + } + + public boolean isVacantIndexPruningEnabled() { + return getPruningRules().contains(KylinConfigBase.USE_VACANT_INDEXES); + } + + public boolean useTableIndexAnswerSelectStarEnabled() { + return getPruningRules().contains(KylinConfigBase.USE_TABLE_INDEX_ANSWER_SELECT_STAR) + || Boolean.parseBoolean(getOptional("kylin.query.use-tableindex-answer-select-star.enabled", FALSE)); + } + @ThirdPartyDependencies({ @ThirdPartyDependencies.ThirdPartyDependent(repository = "static-user-manager", classes = { "StaticAuthenticationProvider", "StaticUserGroupService", "StaticUserService" }) }) @@ -3761,10 +3789,6 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.build.resource.skip-resource-check", FALSE)); } - public boolean useTableIndexAnswerSelectStarEnabled() { - return Boolean.parseBoolean(getOptional("kylin.query.use-tableindex-answer-select-star.enabled", FALSE)); - } - public int getSecondStorageSkippingIndexGranularity() { int granularity = Integer.parseInt(getOptional("kylin.second-storage.skipping-index.granularity", "3")); return granularity <= 0 ? 3 : granularity; diff --git a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 4f766fb736..cda5ec2c33 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -30,9 +30,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import com.alibaba.ttl.TransmittableThreadLocal; -import org.apache.kylin.guava30.shaded.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; @@ -361,6 +361,7 @@ public class QueryContext implements Closeable { private String separator; private boolean isRefused; private boolean includeHeader; + private boolean isVacant; } @Getter diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/ChooserContext.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/ChooserContext.java index 454cc3bc2f..bf6e8efd74 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/ChooserContext.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/ChooserContext.java @@ -56,7 +56,7 @@ public class ChooserContext { final Map<Integer, TableExtDesc.ColumnStats> columnStatMap = Maps.newHashMap(); final KylinConfig kylinConfig; - + SQLDigest sqlDigest; AggIndexMatcher aggIndexMatcher; TableIndexMatcher tableIndexMatcher; @@ -79,6 +79,7 @@ public class ChooserContext { public ChooserContext(SQLDigest sqlDigest, NDataflow dataflow) { this(dataflow.getModel()); + this.sqlDigest = sqlDigest; prepareIndexMatchers(sqlDigest, dataflow); } @@ -106,7 +107,13 @@ public class ChooserContext { * see org.apache.kylin.query.DynamicQueryTest.testDynamicParamOnAgg. */ public boolean isIndexMatchersInvalid() { - return !getAggIndexMatcher().isValid() && !getTableIndexMatcher().isValid(); + boolean invalid = !getAggIndexMatcher().isValid() && !getTableIndexMatcher().isValid(); + if (invalid) { + log.warn("Unfortunately, the fast check has failed. " + + "It's possible that the queried columns contain null values, " + + "which could be due to the computed column not being present in the model."); + } + return invalid; } public TableExtDesc.ColumnStats getColumnStats(TblColRef ref) { diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java index 2ab1afccc0..19b1ba592a 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java @@ -17,50 +17,44 @@ */ package org.apache.kylin.metadata.cube.cuboid; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import javax.annotation.Nonnull; - +import org.apache.kylin.guava30.shaded.common.base.Preconditions; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.cube.model.LayoutEntity; import org.apache.kylin.metadata.model.DeriveInfo; -import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.realization.CapabilityResult; import org.apache.kylin.metadata.realization.IRealizationCandidate; import lombok.Getter; import lombok.Setter; +@Getter +@Setter public class NLayoutCandidate implements IRealizationCandidate { - private @Nonnull LayoutEntity layoutEntity; - @Setter - private double cost; - - @Setter - @Getter - private CapabilityResult capabilityResult; public static final NLayoutCandidate EMPTY = new NLayoutCandidate(new LayoutEntity(), Double.MAX_VALUE, new CapabilityResult()); - // derived - private @Nonnull Map<Integer, DeriveInfo> derivedToHostMap = Maps.newHashMap(); - - @Getter - @Setter - Set<String> derivedTableSnapshots = new HashSet<>(); + private LayoutEntity layoutEntity; + private double cost; + private CapabilityResult capabilityResult; + private long range; + private long maxSegEnd; + private Map<Integer, DeriveInfo> derivedToHostMap = Maps.newHashMap(); + Set<String> derivedTableSnapshots = Sets.newHashSet(); - public NLayoutCandidate(@Nonnull LayoutEntity layoutEntity) { + public NLayoutCandidate(LayoutEntity layoutEntity) { + Preconditions.checkNotNull(layoutEntity); this.layoutEntity = layoutEntity; } - public NLayoutCandidate(@Nonnull LayoutEntity layoutEntity, double cost, CapabilityResult result) { - this.layoutEntity = layoutEntity; + public NLayoutCandidate(LayoutEntity layoutEntity, double cost, CapabilityResult result) { + this(layoutEntity); this.cost = cost; this.capabilityResult = result; } @@ -69,66 +63,57 @@ public class NLayoutCandidate implements IRealizationCandidate { return this.getLayoutEntity().getIndex() == null; } - @Nonnull - public LayoutEntity getLayoutEntity() { - return layoutEntity; - } - - public void setLayoutEntity(@Nonnull LayoutEntity cuboidLayout) { - this.layoutEntity = cuboidLayout; - } - - @Nonnull - public Map<Integer, DeriveInfo> getDerivedToHostMap() { - return derivedToHostMap; - } - - public void setDerivedToHostMap(@Nonnull Map<Integer, DeriveInfo> derivedToHostMap) { - this.derivedToHostMap = derivedToHostMap; - } - public Map<List<Integer>, List<DeriveInfo>> makeHostToDerivedMap() { Map<List<Integer>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap(); - - for (Map.Entry<Integer, DeriveInfo> entry : derivedToHostMap.entrySet()) { - - Integer derCol = entry.getKey(); - List<Integer> hostCols = entry.getValue().columns; - DeriveInfo.DeriveType type = entry.getValue().type; - JoinDesc join = entry.getValue().join; - - List<DeriveInfo> infoList = hostToDerivedMap.computeIfAbsent(hostCols, k -> new ArrayList<>()); - - // Merged duplicated derived column - boolean merged = false; - for (DeriveInfo existing : infoList) { - if (existing.type == type && existing.join.getPKSide().equals(join.getPKSide())) { - if (existing.columns.contains(derCol)) { - merged = true; - break; - } - if (type == DeriveInfo.DeriveType.LOOKUP || type == DeriveInfo.DeriveType.LOOKUP_NON_EQUI) { - existing.columns.add(derCol); - merged = true; - break; - } - } + derivedToHostMap.forEach((derivedColId, deriveInfo) -> { + DeriveInfo.DeriveType type = deriveInfo.type; + List<Integer> columns = deriveInfo.columns; + List<DeriveInfo> infoList = hostToDerivedMap.computeIfAbsent(columns, k -> Lists.newArrayList()); + if (!isMerged(derivedColId, deriveInfo, infoList)) { + infoList.add(new DeriveInfo(type, deriveInfo.join, Lists.newArrayList(derivedColId), false)); } - if (!merged) - infoList.add(new DeriveInfo(type, join, Lists.newArrayList(derCol), false)); - } - + }); return hostToDerivedMap; } - @Override - public double getCost() { - return this.cost; + // Merged duplicated derived column + private static boolean isMerged(Integer derCol, DeriveInfo deriveInfo, List<DeriveInfo> infoList) { + DeriveInfo.DeriveType type = deriveInfo.type; + boolean merged = false; + for (DeriveInfo existing : infoList) { + if (existing.type == type && existing.join.getPKSide().equals(deriveInfo.join.getPKSide())) { + if (existing.columns.contains(derCol)) { + merged = true; + } + if (type == DeriveInfo.DeriveType.LOOKUP || type == DeriveInfo.DeriveType.LOOKUP_NON_EQUI) { + existing.columns.add(derCol); + merged = true; + } + } + if (merged) { + break; + } + } + return merged; } @Override public String toString() { - return "LayoutCandidate{" + "cuboidLayout=" + layoutEntity + ", indexEntity=" + layoutEntity.getIndex() - + ", cost=" + cost + '}'; + String type = ""; + if (layoutEntity.isManual()) { + type += "manual"; + } else if (layoutEntity.isAuto()) { + type += "auto"; + } + if (layoutEntity.isBase()) { + type += type.isEmpty() ? "base" : ",base"; + } + if (type.isEmpty()) { + type = "unknown"; + } + return "LayoutCandidate{" + "layout=" + layoutEntity // + + ", type=" + type // + + ", cost=" + cost // + + "}"; } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLookupCandidate.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLookupCandidate.java index a0901d4cb9..c65f1e1032 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLookupCandidate.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLookupCandidate.java @@ -24,10 +24,10 @@ import lombok.Getter; public class NLookupCandidate implements IRealizationCandidate { @Getter - private String tableRef; + private final String tableRef; @Getter - private boolean isUsingSnapShot; + private final boolean isUsingSnapShot; public NLookupCandidate(String tableRef, boolean isUsingSnapShot) { this.tableRef = tableRef; diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java index 31572941e5..4b28ad7c7a 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java @@ -95,23 +95,13 @@ public class NQueryLayoutChooser { Map<Long, List<NDataLayout>> commonLayoutsMap = commonLayouts.stream() .collect(Collectors.toMap(NDataLayout::getLayoutId, Lists::newArrayList)); List<NLayoutCandidate> candidates = collectAllLayoutCandidates(dataflow, chooserContext, commonLayoutsMap); - - QueryInterruptChecker.checkThreadInterrupted("Interrupted exception occurs.", - "Current step involves gathering all the layouts that " - + "can potentially provide a response to this query."); - - if (candidates.isEmpty()) { - return null; - } - log.info("Matched candidates num : {}", candidates.size()); - sortCandidates(candidates, chooserContext, sqlDigest); - return candidates.get(0); + return chooseBestLayoutCandidate(dataflow, sqlDigest, chooserContext, candidates, "selectLayoutCandidate"); } public static List<NLayoutCandidate> collectAllLayoutCandidates(NDataflow dataflow, ChooserContext chooserContext, - Map<Long, List<NDataLayout>> commonLayoutsMap) { + Map<Long, List<NDataLayout>> dataLayoutMap) { List<NLayoutCandidate> candidates = Lists.newArrayList(); - for (Map.Entry<Long, List<NDataLayout>> entry : commonLayoutsMap.entrySet()) { + for (Map.Entry<Long, List<NDataLayout>> entry : dataLayoutMap.entrySet()) { LayoutEntity layout = dataflow.getIndexPlan().getLayoutEntity(entry.getKey()); log.trace("Matching index: id = {}", entry.getKey()); IndexMatcher.MatchResult matchResult = chooserContext.getTableIndexMatcher().match(layout); @@ -120,7 +110,7 @@ public class NQueryLayoutChooser { } if (!matchResult.isMatched()) { - log.trace("Matching failed"); + log.trace("The [{}] cannot match with the {}", chooserContext.sqlDigest.toString(), layout); continue; } @@ -131,14 +121,84 @@ public class NQueryLayoutChooser { candidate.setDerivedTableSnapshots(candidate.getDerivedToHostMap().keySet().stream() .map(i -> chooserContext.convertToRef(i).getTable()).collect(Collectors.toSet())); } - long allRows = entry.getValue().stream().mapToLong(NDataLayout::getRows).sum(); + List<NDataLayout> dataLayouts = entry.getValue(); + long allRows = dataLayouts.stream().mapToLong(NDataLayout::getRows).sum(); candidate.setCost(allRows * (tempResult.influences.size() + matchResult.getInfluenceFactor())); candidate.setCapabilityResult(tempResult); + + long[] rangeAndLatest = calcSegRangeAndMaxEnd(chooserContext, dataflow, dataLayouts); + candidate.setRange(rangeAndLatest[0]); + candidate.setMaxSegEnd(rangeAndLatest[1]); candidates.add(candidate); } return candidates; } + private static long[] calcSegRangeAndMaxEnd(ChooserContext chooserContext, NDataflow df, + List<NDataLayout> dataLayouts) { + long[] rangeAndLatest = new long[2]; + if (!chooserContext.getKylinConfig().isVacantIndexPruningEnabled()) { + return rangeAndLatest; + } + List<String> segmentNameList = Lists.newArrayList(); + for (NDataLayout dataLayout : dataLayouts) { + NDataSegment segment = df.getSegment(dataLayout.getSegDetails().getId()); + Long end = (Long) segment.getSegRange().getEnd(); + Long start = (Long) segment.getSegRange().getStart(); + rangeAndLatest[0] += (end - start); + rangeAndLatest[1] = Math.max(rangeAndLatest[1], end); + segmentNameList.add(segment.getName()); + } + log.trace("All available segments are: {}", segmentNameList); + return rangeAndLatest; + } + + public static NLayoutCandidate selectHighIntegrityCandidate(NDataflow dataflow, List<NDataSegment> prunedSegments, + SQLDigest digest) { + if (!NProjectManager.getProjectConfig(dataflow.getProject()).isVacantIndexPruningEnabled()) { + return null; + } + if (CollectionUtils.isEmpty(prunedSegments)) { + log.info("There is no segment to answer sql"); + return NLayoutCandidate.EMPTY; + } + + ChooserContext chooserContext = new ChooserContext(digest, dataflow); + if (chooserContext.isIndexMatchersInvalid()) { + return null; + } + + Map<Long, List<NDataLayout>> idToDataLayoutsMap = Maps.newHashMap(); + for (NDataSegment segment : prunedSegments) { + segment.getLayoutsMap().forEach((id, dataLayout) -> { + idToDataLayoutsMap.putIfAbsent(id, Lists.newArrayList()); + idToDataLayoutsMap.get(id).add(dataLayout); + }); + } + + List<NLayoutCandidate> allLayoutCandidates = NQueryLayoutChooser.collectAllLayoutCandidates(dataflow, + chooserContext, idToDataLayoutsMap); + return chooseBestLayoutCandidate(dataflow, digest, chooserContext, allLayoutCandidates, + "selectHighIntegrityCandidate"); + } + + private static NLayoutCandidate chooseBestLayoutCandidate(NDataflow dataflow, SQLDigest digest, + ChooserContext chooserContext, List<NLayoutCandidate> allLayoutCandidates, String invokedByMethod) { + QueryInterruptChecker.checkThreadInterrupted("Interrupted exception occurs.", + "Current step involves gathering all the layouts that " + + "can potentially provide a response to this query."); + + if (allLayoutCandidates.isEmpty()) { + log.info("There is no layouts can match with the [{}]", digest.toString()); + return null; + } + sortCandidates(allLayoutCandidates, chooserContext, digest); + log.debug("Invoked by method {}. Successfully matched {} candidates within the model ({}/{}), " // + + "and {} has been selected.", invokedByMethod, allLayoutCandidates.size(), dataflow.getProject(), + dataflow.getId(), allLayoutCandidates.get(0).toString()); + return allLayoutCandidates.get(0); + } + private static Collection<NDataLayout> getCommonLayouts(List<NDataSegment> segments, NDataflow dataflow, Map<String, Set<Long>> chSegmentToLayoutsMap) { KylinConfig projectConfig = NProjectManager.getProjectConfig(dataflow.getProject()); @@ -174,35 +234,75 @@ public class NQueryLayoutChooser { return commonLayouts.values(); } - private static void sortCandidates(List<NLayoutCandidate> candidates, ChooserContext chooserContext, + public static void sortCandidates(List<NLayoutCandidate> candidates, ChooserContext chooserContext, SQLDigest sqlDigest) { - final Set<TblColRef> filterColSet = ImmutableSet.copyOf(sqlDigest.filterColumns); - final List<TblColRef> filterCols = Lists.newArrayList(filterColSet); - val filterColIds = filterCols.stream().sorted(ComparatorUtils.filterColComparator(chooserContext)) - .map(col -> chooserContext.getTblColMap().get(col)).collect(Collectors.toList()); + List<Integer> filterColIds = getFilterColIds(chooserContext, sqlDigest); + List<Integer> nonFilterColIds = getNonFilterColIds(chooserContext, sqlDigest); + Ordering<NLayoutCandidate> ordering = chooserContext.getKylinConfig().isVacantIndexPruningEnabled() + ? getEnhancedSorter(filterColIds, nonFilterColIds) + : getDefaultSorter(filterColIds, nonFilterColIds); + candidates.sort(ordering); + } - final Set<TblColRef> nonFilterColSet = sqlDigest.isRawQuery ? sqlDigest.allColumns.stream() - .filter(colRef -> colRef.getFilterLevel() == TblColRef.FilterColEnum.NONE).collect(Collectors.toSet()) - : sqlDigest.groupbyColumns.stream() - .filter(colRef -> colRef.getFilterLevel() == TblColRef.FilterColEnum.NONE) - .collect(Collectors.toSet()); - final List<TblColRef> nonFilterColumns = Lists.newArrayList(nonFilterColSet); - nonFilterColumns.sort(ComparatorUtils.nonFilterColComparator()); - val nonFilterColIds = nonFilterColumns.stream().map(col -> chooserContext.getTblColMap().get(col)) - .collect(Collectors.toList()); + private static Ordering<NLayoutCandidate> getEnhancedSorter(List<Integer> filterColIds, + List<Integer> nonFilterColIds) { + return Ordering.from(segmentRangeComparator()) // high data integrity + .compound(preferAggComparator()) // + .compound(derivedLayoutComparator()) // + .compound(rowSizeComparator()) // lower cost + .compound(filterColumnComparator(filterColIds)) // + .compound(dimensionSizeComparator()) // + .compound(measureSizeComparator()) // + .compound(nonFilterColumnComparator(nonFilterColIds)) // + .compound(segmentEffectivenessComparator()); // the latest segment + } - Ordering<NLayoutCandidate> ordering = Ordering // - .from(priorityLayoutComparator()) // + private static Ordering<NLayoutCandidate> getDefaultSorter(List<Integer> filterColIds, + List<Integer> nonFilterColIds) { + return Ordering // + .from(preferAggComparator()) // .compound(derivedLayoutComparator()) // .compound(rowSizeComparator()) // L1 comparator, compare cuboid rows .compound(filterColumnComparator(filterColIds)) // L2 comparator, order filter columns .compound(dimensionSizeComparator()) // the lower dimension the best .compound(measureSizeComparator()) // L3 comparator, order size of cuboid columns - .compound(nonFilterColumnComparator(nonFilterColIds)); // L4 comparator, order non-filter columns - candidates.sort(ordering); + .compound(nonFilterColumnComparator(nonFilterColIds)); + } + + private static List<Integer> getFilterColIds(ChooserContext chooserContext, SQLDigest sqlDigest) { + Set<TblColRef> filterColSet = ImmutableSet.copyOf(sqlDigest.filterColumns); + List<TblColRef> filterCols = Lists.newArrayList(filterColSet); + return filterCols.stream().sorted(ComparatorUtils.filterColComparator(chooserContext)) + .map(col -> chooserContext.getTblColMap().get(col)).collect(Collectors.toList()); + } + + private static List<Integer> getNonFilterColIds(ChooserContext chooserContext, SQLDigest sqlDigest) { + + Set<TblColRef> nonFilterColSet; + if (sqlDigest.isRawQuery) { + nonFilterColSet = sqlDigest.allColumns.stream() + .filter(colRef -> colRef.getFilterLevel() == TblColRef.FilterColEnum.NONE) + .collect(Collectors.toSet()); + } else { + nonFilterColSet = sqlDigest.groupbyColumns.stream() + .filter(colRef -> colRef.getFilterLevel() == TblColRef.FilterColEnum.NONE) + .collect(Collectors.toSet()); + } + List<TblColRef> nonFilterColumns = Lists.newArrayList(nonFilterColSet); + nonFilterColumns.sort(ComparatorUtils.nonFilterColComparator()); + return nonFilterColumns.stream().map(col -> chooserContext.getTblColMap().get(col)) + .collect(Collectors.toList()); + } + + public static Comparator<NLayoutCandidate> segmentRangeComparator() { + return (c1, c2) -> Long.compare(c2.getRange(), c1.getRange()); + } + + public static Comparator<NLayoutCandidate> segmentEffectivenessComparator() { + return (c1, c2) -> Long.compare(c2.getMaxSegEnd(), c1.getMaxSegEnd()); } - private static Comparator<NLayoutCandidate> priorityLayoutComparator() { + public static Comparator<NLayoutCandidate> preferAggComparator() { return (layoutCandidate1, layoutCandidate2) -> { if (!KylinConfig.getInstanceFromEnv().isPreferAggIndex()) { return 0; @@ -218,7 +318,7 @@ public class NQueryLayoutChooser { }; } - private static Comparator<NLayoutCandidate> derivedLayoutComparator() { + public static Comparator<NLayoutCandidate> derivedLayoutComparator() { return (candidate1, candidate2) -> { int result = 0; if (candidate1.getDerivedToHostMap().isEmpty() && !candidate2.getDerivedToHostMap().isEmpty()) { @@ -236,15 +336,15 @@ public class NQueryLayoutChooser { }; } - private static Comparator<NLayoutCandidate> rowSizeComparator() { + public static Comparator<NLayoutCandidate> rowSizeComparator() { return Comparator.comparingDouble(NLayoutCandidate::getCost); } - private static Comparator<NLayoutCandidate> dimensionSizeComparator() { + public static Comparator<NLayoutCandidate> dimensionSizeComparator() { return Comparator.comparingInt(candidate -> candidate.getLayoutEntity().getOrderedDimensions().size()); } - private static Comparator<NLayoutCandidate> measureSizeComparator() { + public static Comparator<NLayoutCandidate> measureSizeComparator() { return Comparator.comparingInt(candidate -> candidate.getLayoutEntity().getOrderedMeasures().size()); } @@ -253,18 +353,18 @@ public class NQueryLayoutChooser { * 1. choose the layout if its shardby column is found in filters * 2. otherwise, compare position of filter columns appear in the layout dims */ - private static Comparator<NLayoutCandidate> filterColumnComparator(List<Integer> sortedFilters) { + public static Comparator<NLayoutCandidate> filterColumnComparator(List<Integer> sortedFilters) { return Ordering.from(shardByComparator(sortedFilters)).compound(colComparator(sortedFilters)); } - private static Comparator<NLayoutCandidate> nonFilterColumnComparator(List<Integer> sortedNonFilters) { + public static Comparator<NLayoutCandidate> nonFilterColumnComparator(List<Integer> sortedNonFilters) { return colComparator(sortedNonFilters); } /** * compare filters with dim pos in layout, filter columns are sorted by filter type and selectivity (cardinality) */ - private static Comparator<NLayoutCandidate> colComparator(List<Integer> sortedCols) { + public static Comparator<NLayoutCandidate> colComparator(List<Integer> sortedCols) { return (layoutCandidate1, layoutCandidate2) -> { List<Integer> position1 = getColumnsPos(layoutCandidate1, sortedCols); List<Integer> position2 = getColumnsPos(layoutCandidate2, sortedCols); @@ -289,7 +389,7 @@ public class NQueryLayoutChooser { * 1. check if shardby columns appears in filters * 2. if both layout has shardy columns in filters, compare the filter type and selectivity (cardinality) */ - private static Comparator<NLayoutCandidate> shardByComparator(List<Integer> columns) { + public static Comparator<NLayoutCandidate> shardByComparator(List<Integer> columns) { return (candidate1, candidate2) -> { int shardByCol1Idx = getShardByColIndex(candidate1, columns); int shardByCol2Idx = getShardByColIndex(candidate2, columns); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowCapabilityChecker.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowCapabilityChecker.java index c99cf27412..e066bbed45 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowCapabilityChecker.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowCapabilityChecker.java @@ -73,13 +73,17 @@ public class NDataflowCapabilityChecker { } else { // for query-on-fact-table logger.trace("Normal dataflow matching"); - boolean partialMatchIndex = QueryContext.current().isPartialMatchIndex(); NLayoutCandidate candidateAndInfluence = NQueryLayoutChooser.selectLayoutCandidate(dataflow, prunedSegments, digest, secondStorageSegmentLayoutMap); - if (partialMatchIndex && candidateAndInfluence == null) { + if (candidateAndInfluence == null && QueryContext.current().isPartialMatchIndex()) { + // This branch is customized requirements logger.trace("Partial dataflow matching"); candidateAndInfluence = NQueryLayoutChooser.selectPartialLayoutCandidate(dataflow, prunedSegments, digest, secondStorageSegmentLayoutMap); + } else if (candidateAndInfluence == null) { + logger.debug("select the layout candidate with high data integrity."); + candidateAndInfluence = NQueryLayoutChooser.selectHighIntegrityCandidate(dataflow, prunedSegments, + digest); } if (candidateAndInfluence != null) { chosenCandidate = candidateAndInfluence; @@ -94,7 +98,7 @@ public class NDataflowCapabilityChecker { } else { result.setSelectedCandidate(chosenCandidate); } - result.cost = (int) chosenCandidate.getCost(); + result.setCost(chosenCandidate.getCost()); } else { result.setCapable(false); } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java index 42c1648507..ddd2f01d67 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java @@ -31,6 +31,8 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +@Getter +@Setter @NoArgsConstructor public class CapabilityResult { @@ -43,35 +45,32 @@ public class CapabilityResult { /** * Is capable or not */ - @Getter - @Setter private boolean capable = false; /** * selected capable candidate, like Lookup or layout */ - @Getter - @Setter private IRealizationCandidate selectedCandidate; - @Getter - @Setter private IRealizationCandidate selectedStreamingCandidate; - @Getter - @Setter private int layoutUnmatchedColsSize; /** * The smaller the cost, the more capable the realization */ - public int cost = Integer.MAX_VALUE; + private double cost = Integer.MAX_VALUE; /** * reason of incapable */ public IncapableCause incapableCause; + /** + * The layout has not yet been built in any segment or is currently under building. + */ + private boolean isVacant; + /** * Marker objects to indicate all special features * (dimension-as-measure, topN etc.) that have influenced the capability check. diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/HybridRealization.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/HybridRealization.java index fbd01d6c78..b70691de1a 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/HybridRealization.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/HybridRealization.java @@ -138,14 +138,14 @@ public class HybridRealization implements IRealization { child = realization.isCapable(digest, queryableSeg.getStreamingSegments(), Maps.newHashMap()); result.setSelectedStreamingCandidate(child.getSelectedStreamingCandidate()); if (child.isCapable()) { - result.cost = Math.min(result.cost, (int) child.getSelectedStreamingCandidate().getCost()); + result.setCost(Math.min(result.getCost(), child.getSelectedStreamingCandidate().getCost())); } } else { child = realization.isCapable(digest, queryableSeg.getBatchSegments(), queryableSeg.getChSegToLayoutsMap()); result.setSelectedCandidate(child.getSelectedCandidate()); if (child.isCapable()) { - result.cost = Math.min(result.cost, (int) child.getSelectedCandidate().getCost()); + result.setCost(Math.min(result.getCost(), child.getSelectedCandidate().getCost())); } } if (child.isCapable()) { @@ -156,7 +156,7 @@ public class HybridRealization implements IRealization { } } - result.cost--; // let hybrid win its children + result.setCost(result.getCost() - 1); // let hybrid win its children return result; } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/HeterogeneousSegmentPruningTest.java b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/HeterogeneousSegmentPruningTest.java index f568c1b994..4d46270c35 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/HeterogeneousSegmentPruningTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/HeterogeneousSegmentPruningTest.java @@ -26,18 +26,28 @@ import java.util.stream.Collectors; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigBase; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.metadata.cube.model.LayoutPartition; +import org.apache.kylin.metadata.cube.model.NDataLayout; +import org.apache.kylin.metadata.cube.model.NDataSegDetails; +import org.apache.kylin.metadata.cube.model.NDataSegment; +import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NDataflowUpdate; import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.model.Segments; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.metadata.realization.NoRealizationFoundException; import org.apache.kylin.query.engine.QueryExec; import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.storage.StorageContext; +import org.apache.kylin.util.MetadataTestUtils; import org.apache.kylin.util.OlapContextUtil; import org.junit.Assert; import org.junit.Test; @@ -389,7 +399,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest { // composite filters val sqlFilter = sql + "select * from T1\n" + "where (cal_dt = DATE'2012-01-01' or (cast(cal_dt as date) = '2012-01-02' or cal_dt = '2012-01-03')) and (cal_dt is not null or cal_dt in ('2012-01-01', '2012-01-02'))"; - val context = getMatchedContexts(project, sqlFilter).get(0); + val context = OlapContextUtil.getOlapContexts(project, sqlFilter).get(0); assertFiltersAndLayout(context, null, ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)," + "OR(=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01), =(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-02), =(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03))," @@ -399,7 +409,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest { // invalid filters with or val sqlFilter = sql + "select * from T1\n" + "where trans_id = 123 or trans_id + 1 = 123 or (trans_id + 2 = 234 and trans_id = 345)"; - val context = getMatchedContexts(project, sqlFilter).get(0); + val context = OlapContextUtil.getOlapContexts(project, sqlFilter).get(0); assertFiltersAndLayout(context, null, ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)"); } @@ -407,7 +417,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest { // invalid filters with and val sqlFilter = sql + "select * from T1\n" + "where trans_id = 123 and (trans_id + 2 = 234 or trans_id = 345)"; - val context = getMatchedContexts(project, sqlFilter).get(0); + val context = OlapContextUtil.getOlapContexts(project, sqlFilter).get(0); assertFiltersAndLayout(context, null, ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)," + "=(DEFAULT.TEST_KYLIN_FACT.TRANS_ID, 123)"); @@ -416,7 +426,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest { // invalid filters with not val sqlFilter = sql + "select * from T1\n" + "where trans_id = 123 and (not(trans_id = 234) or trans_id = 345) and (not(trans_id + 1 = 132))"; - val context = getMatchedContexts(project, sqlFilter).get(0); + val context = OlapContextUtil.getOlapContexts(project, sqlFilter).get(0); assertFiltersAndLayout(context, null, ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)," + "=(DEFAULT.TEST_KYLIN_FACT.TRANS_ID, 123),OR(<>(DEFAULT.TEST_KYLIN_FACT.TRANS_ID, 234), =(DEFAULT.TEST_KYLIN_FACT.TRANS_ID, 345))"); @@ -455,7 +465,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest { // inner join val sqlInnerJoin = sql + " select * from T1 inner join T2 on T1.cal_dt = T2.cal_dt \n" + " where T1.cal_dt = '2012-01-01' and T2.cal_dt = DATE '2012-01-02'"; - val contexts = getMatchedContexts(project, sqlInnerJoin); + val contexts = OlapContextUtil.getOlapContexts(project, sqlInnerJoin); assertFiltersAndLayout(contexts.get(0), seg1Id, ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03),=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01)"); assertFiltersAndLayout(contexts.get(1), seg2Id, @@ -465,7 +475,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest { // left join val sqlLeftJoin = sql + " select * from T1 left join T2 on T1.cal_dt = T2.cal_dt \n" + " where T1.cal_dt = DATE '2012-01-01' and T2.cal_dt = DATE '2012-01-02'"; - val contexts = getMatchedContexts(project, sqlLeftJoin); + val contexts = OlapContextUtil.getOlapContexts(project, sqlLeftJoin); assertFiltersAndLayout(contexts.get(0), seg1Id, ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03),=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01)"); assertFiltersAndLayout(contexts.get(1), null, @@ -475,7 +485,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest { // right join val sqlRightJoin = sql + " select * from T1 right join T2 on T1.cal_dt = T2.cal_dt \n" + " where T1.cal_dt = DATE '2012-01-01' and T2.cal_dt = DATE '2012-01-02'"; - val contexts = getMatchedContexts(project, sqlRightJoin); + val contexts = OlapContextUtil.getOlapContexts(project, sqlRightJoin); assertFiltersAndLayout(contexts.get(0), seg2Id, ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-02),=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-02)"); assertFiltersAndLayout(contexts.get(1), null, @@ -485,7 +495,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest { // full join val sqlFullJoin = sql + " select * from T1 full join T2 on T1.cal_dt = T2.cal_dt \n" + " where T1.cal_dt = DATE '2012-01-01' and T2.cal_dt = DATE '2012-01-02'"; - val contexts = getMatchedContexts(project, sqlFullJoin); + val contexts = OlapContextUtil.getOlapContexts(project, sqlFullJoin); assertFiltersAndLayout(contexts.get(0), null, ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)"); assertFiltersAndLayout(contexts.get(1), null, @@ -529,7 +539,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest + "(cal_dt='2012-01-01' and trans_id = 15) or\n" + "(cal_dt='2012-01-01' and trans_id = 16) or\n" + "(cal_dt='2012-01-01' and trans_id = 17) or\n" + "(cal_dt='2012-01-01' and trans_id = 18) or\n" + "(cal_dt='2012-01-01' and trans_id = 19) or\n" + "(cal_dt='2012-01-01' and trans_id = 20)"; - val contexts = getMatchedContexts(project, sqlWithTooManyOrs); + val contexts = OlapContextUtil.getOlapContexts(project, sqlWithTooManyOrs); Assert.assertEquals( ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)", contexts.get(0).getExpandedFilterConditions().stream().map(RexNode::toString) @@ -543,7 +553,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest + "(cal_dt='2012-01-01' and trans_id = 6) or\n" + "(cal_dt='2012-01-01' and trans_id = 7) or\n" + "(cal_dt='2012-01-01' and trans_id = 8) or\n" + "(cal_dt='2012-01-01' and trans_id = 9) or\n" + "(cal_dt='2012-01-01' and trans_id = 10)"; - val contexts = getMatchedContexts(project, sqlWithFilter); + val contexts = OlapContextUtil.getOlapContexts(project, sqlWithFilter); Assert.assertNotEquals( ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)", contexts.get(0).getExpandedFilterConditions().stream().map(RexNode::toString) @@ -552,8 +562,113 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest } - private List<OLAPContext> getMatchedContexts(String project, String sql) throws SqlParseException { - return OlapContextUtil.getOlapContexts(project, sql); + @Test + public void testQueryWithEmptySegment() throws SqlParseException { + // layout 20000000001, tableindex + // layout 20001, cal_dt & trans_id + // layout 10001, cal_dt + // layout 1, trans_id + + // segment1 [2012-01-01, 2012-01-02] layout 20000000001, 20001 + // segment2 [2012-01-02, 2012-01-03] layout 20000000001, 20001, 10001 + // segment3 [2012-01-03, 2012-01-04] layout 20001, 10001, 1 + // segment4 [2012-01-04, 2012-01-05] layout 10001, 1 + // segment5 [2012-01-05, 2012-01-06] layout 20000000001, 20001, 10001, 1 + + val project = "heterogeneous_segment"; + val dfId = "747f864b-9721-4b97-acde-0aa8e8656cba"; + // val seg1Id = "8892fa3f-f607-4eec-8159-7c5ae2f16942" [20120101000000_20120102000000] + // val seg2Id = "d75a822c-788a-4592-a500-cf20186dded1" [20120102000000_20120103000000] + val seg3Id = "54eaf96d-6146-45d2-b94e-d5d187f89919"; // [20120103000000_20120104000000] + // val seg4Id = "411f40b9-a80a-4453-90a9-409aac6f7632" [20120104000000_20120105000000] + // val seg5Id = "a8318597-cb75-416f-8eb8-96ea285dd2b4" [20120105000000_20120106000000] + + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + NDataflowManager dfMgr = NDataflowManager.getInstance(getTestConfig(), project); + NDataflow dataflow = dfMgr.getDataflow(dfId); + NDataSegment latestReadySegment = dataflow.getSegment(seg3Id); + if (latestReadySegment != null) { + NDataSegDetails segDetails = latestReadySegment.getSegDetails(); + List<NDataLayout> allLayouts = segDetails.getAllLayouts(); + + // update + NDataflowUpdate dataflowUpdate = new NDataflowUpdate(dfId); + NDataLayout[] toRemoveLayouts = allLayouts.stream() + .filter(dataLayout -> dataLayout.getLayoutId() == 20001).toArray(NDataLayout[]::new); + dataflowUpdate.setToRemoveLayouts(toRemoveLayouts); + dfMgr.updateDataflow(dataflowUpdate); + } + return null; + }, project); + + val sql = "select cal_dt, sum(price), count(*) from test_kylin_fact inner join test_account \n" + + "on test_kylin_fact.seller_id = test_account.account_id \n" + + "where cal_dt between date'2012-01-01' and date'2012-01-03'\n" // + + "group by cal_dt\n"; + // can not query + { + OLAPContext olapContext = OlapContextUtil.getOlapContexts(project, sql).get(0); + StorageContext storageContext = olapContext.storageContext; + Assert.assertEquals(-1L, storageContext.getLayoutId().longValue()); + } + + { + MetadataTestUtils.updateProjectConfig(project, "kylin.query.index-match-rules", + KylinConfigBase.USE_VACANT_INDEXES); + try (QueryContext queryContext = QueryContext.current()) { + OLAPContext olapContext = OlapContextUtil.getOlapContexts(project, sql).get(0); + StorageContext storageContext = olapContext.storageContext; + Assert.assertEquals(10001L, storageContext.getLayoutId().longValue()); + Assert.assertFalse(queryContext.getQueryTagInfo().isVacant()); + } + } + } + + @Test + public void testQueryWithAllSegmentsAreEmpty() throws SqlParseException { + // layout 20000000001, tableindex + // layout 20001, cal_dt & trans_id + // layout 10001, cal_dt + // layout 1, trans_id + + // segment1 [2012-01-01, 2012-01-02] layout 20000000001, 20001 + // segment2 [2012-01-02, 2012-01-03] layout 20000000001, 20001, 10001 + // segment3 [2012-01-03, 2012-01-04] layout 20001, 10001, 1 + // segment4 [2012-01-04, 2012-01-05] layout 10001, 1 + // segment5 [2012-01-05, 2012-01-06] layout 20000000001, 20001, 10001, 1 + + val project = "heterogeneous_segment"; + val dfId = "747f864b-9721-4b97-acde-0aa8e8656cba"; + // val seg1Id = "8892fa3f-f607-4eec-8159-7c5ae2f16942" [20120101000000_20120102000000] + // val seg2Id = "d75a822c-788a-4592-a500-cf20186dded1" [20120102000000_20120103000000] + // val seg3Id = "54eaf96d-6146-45d2-b94e-d5d187f89919" [20120103000000_20120104000000] + // val seg4Id = "411f40b9-a80a-4453-90a9-409aac6f7632" [20120104000000_20120105000000] + // val seg5Id = "a8318597-cb75-416f-8eb8-96ea285dd2b4" [20120105000000_20120106000000] + + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + NDataflowManager dfMgr = NDataflowManager.getInstance(getTestConfig(), project); + NDataflow dataflow = dfMgr.getDataflow(dfId); + Segments<NDataSegment> segments = dataflow.getSegments(); + // update + NDataflowUpdate dataflowUpdate = new NDataflowUpdate(dfId); + dataflowUpdate.setToRemoveSegs(segments.toArray(new NDataSegment[0])); + dfMgr.updateDataflow(dataflowUpdate); + return null; + }, project); + + val sql = "select cal_dt, sum(price), count(*) from test_kylin_fact inner join test_account \n" + + "on test_kylin_fact.seller_id = test_account.account_id \n" + + "where cal_dt between date'2012-01-01' and date'2012-01-03'\n" // + + "group by cal_dt\n"; + + MetadataTestUtils.updateProjectConfig(project, "kylin.query.index-match-rules", + KylinConfigBase.USE_VACANT_INDEXES); + try (QueryContext queryContext = QueryContext.current()) { + OLAPContext olapContext = OlapContextUtil.getOlapContexts(project, sql).get(0); + StorageContext storageContext = olapContext.storageContext; + Assert.assertEquals(-1L, storageContext.getLayoutId().longValue()); + Assert.assertFalse(queryContext.getQueryTagInfo().isVacant()); + } } private void assertFiltersAndLayout(OLAPContext context, String segId, String expectedFilterCond) { @@ -567,7 +682,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest } private void assertNoRealizationFound(String project, String sql) throws SqlParseException { - val context = getMatchedContexts(project, sql).get(0); + val context = OlapContextUtil.getOlapContexts(project, sql).get(0); Assert.assertNull(context.realization); } @@ -580,7 +695,7 @@ public class HeterogeneousSegmentPruningTest extends NLocalWithSparkSessionTest private void assertPrunedSegmentsRange(String project, String sql, String dfId, List<Pair<String, String>> expectedRanges, long expectedLayoutId, Map<String, List<Long>> expectedPartitions, String expectedFilterCond) throws SqlParseException { - val context = getMatchedContexts(project, sql).get(0); + val context = OlapContextUtil.getOlapContexts(project, sql).get(0); if (expectedLayoutId == -1L) { Assert.assertTrue(context.storageContext.isEmptyLayout()); diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/NDataflowCapabilityCheckerTest.java b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/NDataflowCapabilityCheckerTest.java index b5782823c2..5199633912 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/NDataflowCapabilityCheckerTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/NDataflowCapabilityCheckerTest.java @@ -49,7 +49,7 @@ public class NDataflowCapabilityCheckerTest extends NLocalWithSparkSessionTest { CapabilityResult result = NDataflowCapabilityChecker.check(dataflow, dataflow.getQueryableSegments(), olapContext.getSQLDigest(), null); Assert.assertNotNull(result); - Assert.assertEquals((int) result.getSelectedCandidate().getCost(), result.cost); + Assert.assertEquals(result.getSelectedCandidate().getCost(), result.getCost(), 0.001); } @Test diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/VacantIndexPruningRuleTest.java b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/VacantIndexPruningRuleTest.java new file mode 100644 index 0000000000..75ab026e9b --- /dev/null +++ b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/VacantIndexPruningRuleTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.routing; + +import java.util.List; +import java.util.Map; + +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.kylin.common.KylinConfigBase; +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; +import org.apache.kylin.junit.annotation.MetadataInfo; +import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; +import org.apache.kylin.metadata.cube.model.NDataLayout; +import org.apache.kylin.metadata.cube.model.NDataSegDetails; +import org.apache.kylin.metadata.cube.model.NDataSegment; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.cube.model.NDataflowUpdate; +import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.storage.StorageContext; +import org.apache.kylin.util.MetadataTestUtils; +import org.apache.kylin.util.OlapContextUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@MetadataInfo +class VacantIndexPruningRuleTest extends NLocalWithSparkSessionTest { + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + } + + @AfterEach + public void teardown() throws Exception { + super.tearDown(); + } + + @Override + public String getProject() { + return "default"; + } + + @Test + void testWithIndexMatchRulesUseVacantIndexes() throws SqlParseException { + NDataModelManager modelMgr = NDataModelManager.getInstance(getTestConfig(), getProject()); + modelMgr.listAllModels().stream().filter(model -> !model.isBroken()) + .forEach(model -> cleanAlreadyExistingLayoutsInSegments(model.getId())); + + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.query.index-match-rules", + KylinConfigBase.USE_VACANT_INDEXES); + try (QueryContext queryContext = QueryContext.current()) { + String sql = "select max(LO_ORDERDATE) from ssb.lineorder"; + List<OLAPContext> olapContexts = OlapContextUtil.getOlapContexts(getProject(), sql); + OLAPContext olapContext = olapContexts.get(0); + StorageContext storageContext = olapContext.storageContext; + Assertions.assertTrue(storageContext.isEmptyLayout()); + Assertions.assertTrue(queryContext.getQueryTagInfo().isVacant()); + } + } + + @Test + void testUnmatchedWithNullResult() throws SqlParseException { + String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a"; + String sql = "select max(LO_ORDERPRIOTITY) from ssb.lineorder"; + List<OLAPContext> olapContexts = OlapContextUtil.getOlapContexts(getProject(), sql); + OLAPContext olapContext = olapContexts.get(0); + NDataflow df = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + Map<String, String> matchedJoinGraphAliasMap = OlapContextUtil.matchJoins(df.getModel(), olapContext); + olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap); + + Candidate candidate = new Candidate(df, olapContext, matchedJoinGraphAliasMap); + VacantIndexPruningRule vacantIndexPruningRule = new VacantIndexPruningRule(); + vacantIndexPruningRule.apply(candidate); + Assertions.assertNull(candidate.getCapability()); + } + + @Test + void testUnmatchedWithRealizationIsStreaming() throws SqlParseException { + String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a"; + String sql = "select max(LO_ORDERDATE) from ssb.lineorder"; + NDataModelManager modelMgr = NDataModelManager.getInstance(getTestConfig(), getProject()); + modelMgr.listAllModels().stream().filter(model -> !model.isBroken()) + .forEach(model -> cleanAlreadyExistingLayoutsInSegments(model.getId())); + List<OLAPContext> olapContexts = OlapContextUtil.getOlapContexts(getProject(), sql); + OLAPContext olapContext = olapContexts.get(0); + NDataflow df = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + Map<String, String> matchedJoinGraphAliasMap = OlapContextUtil.matchJoins(df.getModel(), olapContext); + olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap); + + Candidate candidate = new Candidate(df, olapContext, matchedJoinGraphAliasMap); + { + df.getModel().setModelType(NDataModel.ModelType.STREAMING); + candidate.setCapability(new CapabilityResult()); + VacantIndexPruningRule vacantIndexPruningRule = new VacantIndexPruningRule(); + vacantIndexPruningRule.apply(candidate); + Assertions.assertFalse(candidate.getCapability().isCapable()); + } + + { + df.getModel().setModelType(NDataModel.ModelType.HYBRID); + candidate.setCapability(new CapabilityResult()); + VacantIndexPruningRule vacantIndexPruningRule = new VacantIndexPruningRule(); + vacantIndexPruningRule.apply(candidate); + Assertions.assertFalse(candidate.getCapability().isCapable()); + } + } + + @Test + void testUnmatchedAggIndex() throws SqlParseException { + String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a"; + String sql = "select max(LO_ORDERPRIOTITY) from ssb.lineorder"; + List<OLAPContext> olapContexts = OlapContextUtil.getOlapContexts(getProject(), sql); + OLAPContext olapContext = olapContexts.get(0); + NDataflow df = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + Map<String, String> matchedJoinGraphAliasMap = OlapContextUtil.matchJoins(df.getModel(), olapContext); + olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap); + + Candidate candidate = new Candidate(df, olapContext, matchedJoinGraphAliasMap); + candidate.setCapability(new CapabilityResult()); + VacantIndexPruningRule vacantIndexPruningRule = new VacantIndexPruningRule(); + vacantIndexPruningRule.apply(candidate); + Assertions.assertFalse(candidate.getCapability().isCapable()); + } + + @Test + void testMatchedTableIndex() throws SqlParseException { + String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a"; + String sql = "select LO_QUANTITY from ssb.lineorder"; + NDataModelManager modelMgr = NDataModelManager.getInstance(getTestConfig(), getProject()); + modelMgr.listAllModels().stream().filter(model -> !model.isBroken()) + .forEach(model -> cleanAlreadyExistingLayoutsInSegments(model.getId())); + List<OLAPContext> olapContexts = OlapContextUtil.getOlapContexts(getProject(), sql); + OLAPContext olapContext = olapContexts.get(0); + NDataflow df = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + Map<String, String> matchedJoinGraphAliasMap = OlapContextUtil.matchJoins(df.getModel(), olapContext); + olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap); + + Candidate candidate = new Candidate(df, olapContext, matchedJoinGraphAliasMap); + candidate.setCapability(new CapabilityResult()); + VacantIndexPruningRule vacantIndexPruningRule = new VacantIndexPruningRule(); + vacantIndexPruningRule.apply(candidate); + Assertions.assertTrue(candidate.getCapability().isCapable()); + Assertions.assertTrue(candidate.getCapability().isVacant()); + NLayoutCandidate selectedCandidate = (NLayoutCandidate) candidate.getCapability().getSelectedCandidate(); + Assertions.assertEquals(20000000001L, selectedCandidate.getLayoutEntity().getId()); + } + + @Test + void testMatchedAggIndex() throws SqlParseException { + String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a"; + String sql = "select max(LO_ORDERDATE) from ssb.lineorder"; + NDataModelManager modelMgr = NDataModelManager.getInstance(getTestConfig(), getProject()); + modelMgr.listAllModels().stream().filter(model -> !model.isBroken()) + .forEach(model -> cleanAlreadyExistingLayoutsInSegments(model.getId())); + List<OLAPContext> olapContexts = OlapContextUtil.getOlapContexts(getProject(), sql); + OLAPContext olapContext = olapContexts.get(0); + NDataflow df = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + Map<String, String> matchedJoinGraphAliasMap = OlapContextUtil.matchJoins(df.getModel(), olapContext); + olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap); + + Candidate candidate = new Candidate(df, olapContext, matchedJoinGraphAliasMap); + candidate.setCapability(new CapabilityResult()); + VacantIndexPruningRule vacantIndexPruningRule = new VacantIndexPruningRule(); + vacantIndexPruningRule.apply(candidate); + Assertions.assertTrue(candidate.getCapability().isCapable()); + Assertions.assertTrue(candidate.getCapability().isVacant()); + NLayoutCandidate selectedCandidate = (NLayoutCandidate) candidate.getCapability().getSelectedCandidate(); + Assertions.assertEquals(1L, selectedCandidate.getLayoutEntity().getId()); + } + + private void cleanAlreadyExistingLayoutsInSegments(String modelId) { + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + NDataflowManager dfMgr = NDataflowManager.getInstance(getTestConfig(), getProject()); + NDataflow dataflow = dfMgr.getDataflow(modelId); + NDataSegment latestReadySegment = dataflow.getLatestReadySegment(); + if (latestReadySegment != null) { + NDataSegDetails segDetails = latestReadySegment.getSegDetails(); + List<NDataLayout> allLayouts = segDetails.getAllLayouts(); + + // update + NDataflowUpdate dataflowUpdate = new NDataflowUpdate(modelId); + dataflowUpdate.setToRemoveLayouts(allLayouts.toArray(new NDataLayout[0])); + dfMgr.updateDataflow(dataflowUpdate); + } + return null; + }, getProject()); + } +} diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java index 5fe45a9425..3db016ffbf 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java @@ -90,10 +90,6 @@ public class Candidate { this.matchedJoinsGraphAliasMap = matchedJoinsGraphAliasMap; } - // for testing only - Candidate() { - } - @Override public String toString() { return realization.toString(); @@ -125,7 +121,7 @@ public class Candidate { } public static Comparator<Candidate> realizationCapabilityCostSorter() { - return Comparator.comparingDouble(c -> c.getCapability().cost); + return Comparator.comparingDouble(c -> c.getCapability().getCost()); } public static Comparator<Candidate> modelUuidSorter() { diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryRouter.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryRouter.java index 1105ee4e9c..3222c8dec6 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryRouter.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryRouter.java @@ -25,7 +25,6 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Ordering; import org.apache.kylin.metadata.project.NProjectManager; -import org.apache.kylin.query.relnode.OLAPContext; import lombok.Getter; @@ -35,27 +34,26 @@ public class QueryRouter { } public static void applyRules(Candidate candidate) { - Strategy pruningStrategy = getStrategy(candidate.getCtx()); + Strategy pruningStrategy = getStrategy(candidate.getCtx().olapSchema.getProjectName()); for (PruningRule r : pruningStrategy.getRules()) { r.apply(candidate); } } - public static void sortCandidates(OLAPContext context, List<Candidate> candidates) { - Strategy strategy = getStrategy(context); + public static void sortCandidates(String project, List<Candidate> candidates) { + Strategy strategy = getStrategy(project); candidates.sort(strategy.getSorter()); } - private static Strategy getStrategy(OLAPContext context) { - String project = context.olapSchema.getProjectName(); - KylinConfig projectConfig = NProjectManager.getProjectConfig(project); - return new Strategy(projectConfig); + private static Strategy getStrategy(String project) { + return new Strategy(NProjectManager.getProjectConfig(project)); } public static class Strategy { private static final PruningRule SEGMENT_PRUNING = new SegmentPruningRule(); private static final PruningRule PARTITION_PRUNING = new PartitionPruningRule(); private static final PruningRule REMOVE_INCAPABLE_REALIZATIONS = new RemoveIncapableRealizationsRule(); + private static final PruningRule VACANT_INDEX_PRUNING = new VacantIndexPruningRule(); @Getter List<PruningRule> rules = Lists.newArrayList(); @@ -72,6 +70,9 @@ public class QueryRouter { rules.add(SEGMENT_PRUNING); rules.add(PARTITION_PRUNING); rules.add(REMOVE_INCAPABLE_REALIZATIONS); + if (config.isVacantIndexPruningEnabled()) { + rules.add(VACANT_INDEX_PRUNING); + } // add all sorters if (config.useTableIndexAnswerSelectStarEnabled()) { diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java index 13720f8932..0613f62d72 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java @@ -227,7 +227,7 @@ public class RealizationChooser { } // Step 3. find the lowest-cost candidate - QueryRouter.sortCandidates(context, candidates); + QueryRouter.sortCandidates(context.olapSchema.getProjectName(), candidates); logger.trace("Cost Sorted Realizations {}", candidates); Candidate candidate = candidates.get(0); restoreOLAPContextProps(context, candidate.getRewrittenCtx()); @@ -235,6 +235,15 @@ public class RealizationChooser { adjustForCapabilityInfluence(candidate, context); context.realization = candidate.realization; + if (candidate.getCapability().isVacant()) { + QueryContext.current().getQueryTagInfo().setVacant(true); + NLayoutCandidate layoutCandidate = (NLayoutCandidate) candidate.capability.getSelectedCandidate(); + context.storageContext.setCandidate(layoutCandidate); + context.storageContext.setLayoutId(layoutCandidate.getLayoutEntity().getId()); + context.storageContext.setEmptyLayout(true); + return; + } + if (candidate.capability.getSelectedCandidate() instanceof NLookupCandidate) { boolean useSnapshot = context.isFirstTableLookupTableInModel(context.realization.getModel()); context.storageContext.setUseSnapshot(useSnapshot); diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/VacantIndexPruningRule.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/VacantIndexPruningRule.java new file mode 100644 index 0000000000..ac5ef26d1c --- /dev/null +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/VacantIndexPruningRule.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.routing; + +import java.util.List; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metadata.cube.cuboid.AggIndexMatcher; +import org.apache.kylin.metadata.cube.cuboid.ChooserContext; +import org.apache.kylin.metadata.cube.cuboid.IndexMatcher; +import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; +import org.apache.kylin.metadata.cube.cuboid.TableIndexMatcher; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.model.AntiFlatChecker; +import org.apache.kylin.metadata.model.ColExcludedChecker; +import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.metadata.realization.HybridRealization; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.query.util.QueryInterruptChecker; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class VacantIndexPruningRule extends PruningRule { + @Override + public void apply(Candidate candidate) { + if (nonBatchRealizationSkipEmptySegments(candidate)) { + log.info("{}({}/{}): only batch model support this feature, but the type of this model is {}", + this.getClass().getName(), candidate.getRealization().getProject(), + candidate.getRealization().getCanonicalName(), + candidate.getRealization().getModel().getModelType()); + return; + } + + if (candidate.getCapability() == null || candidate.getCapability().isCapable()) { + log.debug("skip the rule of {}.", this.getClass().getName()); + return; + } + + List<IRealization> realizations = candidate.getRealization().getRealizations(); + if (CollectionUtils.isEmpty(realizations)) { + log.warn("It seems that unlikely things happened when matching indexes haven't built. " + + "Expected size of realizations(models) is 1."); + return; + } + NDataflow dataflow = (NDataflow) realizations.get(0); + CapabilityResult capabilityResult = match(dataflow, candidate.getCtx().getSQLDigest()); + candidate.setCapability(capabilityResult); + } + + private CapabilityResult match(NDataflow dataflow, SQLDigest digest) { + CapabilityResult result = new CapabilityResult(); + log.info("Try matching no built indexes from model."); + + NLayoutCandidate layoutCandidate = selectLayoutCandidate(dataflow, digest); + if (layoutCandidate != null) { + result.influences.addAll(layoutCandidate.getCapabilityResult().influences); + result.setLayoutUnmatchedColsSize(layoutCandidate.getCapabilityResult().getLayoutUnmatchedColsSize()); + result.setSelectedCandidate(layoutCandidate); + log.info("Matched layout {} snapshot in dataflow {} ", layoutCandidate, dataflow); + result.setCapable(true); + result.setVacant(true); + } else { + result.setCapable(false); + } + + return result; + } + + private NLayoutCandidate selectLayoutCandidate(NDataflow dataflow, SQLDigest sqlDigest) { + + // This section is same as NQueryLayoutChooser#selectLayoutCandidate. + String project = dataflow.getProject(); + NDataModel model = dataflow.getModel(); + KylinConfig projectConfig = NProjectManager.getProjectConfig(project); + ChooserContext chooserContext = new ChooserContext(sqlDigest, dataflow); + ColExcludedChecker excludedChecker = new ColExcludedChecker(projectConfig, project, model); + AntiFlatChecker antiFlatChecker = new AntiFlatChecker(model.getJoinTables(), model); + AggIndexMatcher aggIndexMatcher = new AggIndexMatcher(sqlDigest, chooserContext, dataflow, excludedChecker, + antiFlatChecker); + TableIndexMatcher tableIndexMatcher = new TableIndexMatcher(sqlDigest, chooserContext, dataflow, + excludedChecker, antiFlatChecker); + + if (chooserContext.isIndexMatchersInvalid()) { + return null; + } + + // Find a layout that can match the sqlDigest, even if it hasn't been built yet. + NLayoutCandidate candidate = findCandidate(dataflow, aggIndexMatcher, tableIndexMatcher); + QueryInterruptChecker.checkThreadInterrupted("Interrupted exception occurs.", + "Current step were matching indexes haven't built "); + return candidate; + } + + private static NLayoutCandidate findCandidate(NDataflow dataflow, AggIndexMatcher aggIndexMatcher, + TableIndexMatcher tableIndexMatcher) { + List<LayoutEntity> allLayouts = dataflow.getIndexPlan().getAllLayouts(); + for (LayoutEntity layout : allLayouts) { + NLayoutCandidate candidate = new NLayoutCandidate(layout); + IndexMatcher.MatchResult matchResult = tableIndexMatcher.match(layout); + if (!matchResult.isMatched()) { + matchResult = aggIndexMatcher.match(layout); + } + + if (!matchResult.isMatched()) { + continue; + } + + CapabilityResult tempResult = new CapabilityResult(); + tempResult.setSelectedCandidate(candidate); + candidate.setCapabilityResult(tempResult); + return candidate; + } + return null; + } + + private boolean nonBatchRealizationSkipEmptySegments(Candidate candidate) { + IRealization realization = candidate.getRealization(); + return realization instanceof HybridRealization || realization.isStreaming(); + } +} diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index 455cf305c7..bbee32c7dc 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -25,6 +25,8 @@ import java.util.List; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; import org.apache.kylin.metadata.query.NativeQueryRealization; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.slf4j.Logger; @@ -32,8 +34,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; import lombok.Getter; import lombok.Setter; @@ -68,6 +68,9 @@ public class SQLResponse implements Serializable { protected boolean isPartial = false; + @JsonProperty("vacant") + private boolean isVacant; + private List<Long> scanRows; private List<Long> scanBytes; @@ -148,9 +151,8 @@ public class SQLResponse implements Serializable { } } - public SQLResponse(List<SelectedColumnMeta> columnMetas, List<List<String>> results, - int affectedRowCount, boolean isException, String exceptionMessage, - boolean isPartial, boolean isPushDown) { + public SQLResponse(List<SelectedColumnMeta> columnMetas, List<List<String>> results, int affectedRowCount, + boolean isException, String exceptionMessage, boolean isPartial, boolean isPushDown) { this.columnMetas = columnMetas; this.results = results; this.affectedRowCount = affectedRowCount; @@ -165,8 +167,7 @@ public class SQLResponse implements Serializable { } public SQLResponse(List<SelectedColumnMeta> columnMetas, Iterable<List<String>> results, int resultSize, - int affectedRowCount, boolean isException, String exceptionMessage, - boolean isPartial, boolean isPushDown) { + int affectedRowCount, boolean isException, String exceptionMessage, boolean isPartial, boolean isPushDown) { this.columnMetas = columnMetas; this.results = results; this.affectedRowCount = affectedRowCount; diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index 3a821568d4..3896737392 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -81,6 +81,14 @@ import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.guava30.shaded.common.base.Joiner; +import org.apache.kylin.guava30.shaded.common.collect.Collections2; +import org.apache.kylin.guava30.shaded.common.collect.HashMultimap; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.SetMultimap; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.acl.AclTCR; @@ -162,14 +170,6 @@ import org.springframework.stereotype.Component; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; -import org.apache.kylin.guava30.shaded.common.base.Joiner; -import org.apache.kylin.guava30.shaded.common.collect.Collections2; -import org.apache.kylin.guava30.shaded.common.collect.HashMultimap; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.SetMultimap; import com.google.gson.Gson; import lombok.AllArgsConstructor; @@ -1364,7 +1364,9 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup response.setNativeRealizations(OLAPContext.getNativeRealizations()); - setAppMaterURL(response); + if (!queryContext.getQueryTagInfo().isVacant()) { + setAppMaterURL(response); + } if (isPushDown) { response.setNativeRealizations(Lists.newArrayList()); @@ -1381,6 +1383,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup response.setEngineType(QueryHistory.EngineType.NATIVE.name()); response.setSignature(QueryCacheSignatureUtil.createCacheSignature(response, project)); + response.setVacant(QueryContext.current().getQueryTagInfo().isVacant()); if (QueryContext.current().getMetrics().getQueryExecutedPlan() != null) { response.setExecutedPlan(QueryContext.current().getMetrics().getQueryExecutedPlan()); diff --git a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java index 0355fb60bd..bd41205514 100644 --- a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java +++ b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java @@ -82,11 +82,10 @@ public class QueryContextCutter { } } - ContextUtil.dumpCalcitePlan( - "cannot find proper realizations After re-cut " + MAX_RETRY_TIMES_OF_CONTEXT_CUT + " times", root, log); - log.error("too many unmatched join in this query, please check it or create correspond realization"); - throw new NoRealizationFoundException( - "too many unmatched join in this query, please check it or create correspond realization"); + String errorMsg = "too many unmatched joins in this query, please check it or create corresponding realization."; + ContextUtil.dumpCalcitePlan("cannot find proper realizations After re-cut " + MAX_RETRY_TIMES_OF_CONTEXT_CUT + + " times. \nError: " + errorMsg, root, log); + throw new NoRealizationFoundException(errorMsg); } private static List<OLAPContext> collectContextInfoAndSelectRealization(RelNode queryRoot) { diff --git a/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java index f9799f42ff..530bafff85 100644 --- a/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java @@ -18,287 +18,107 @@ package org.apache.kylin.query.routing; -import java.util.Arrays; import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Ordering; -import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; -import org.apache.kylin.metadata.cube.model.NDataSegment; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.NDataModel; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.CapabilityResult; -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.metadata.realization.QueryableSeg; -import org.apache.kylin.metadata.realization.SQLDigest; -import org.junit.Assert; -import org.junit.Test; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.query.relnode.OLAPContext; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; -import lombok.val; - -public class CandidateSortTest { +class CandidateSortTest { @Test - public void testModelHintCandidateSort() { + void testModelPrioritySorter() { try (QueryContext queryContext = QueryContext.current()) { + Comparator<Candidate> sorter = Candidate.modelPrioritySorter(); + + // assert that c1 is more prioritary than c2 { - queryContext.setModelPriorities(new String[] {}); - val model1 = mockCandidate("model0001", "modelA", 1, 1); - val model2 = mockCandidate("model0002", "modelB", 2, 2); - sort(model1, model2).assertFirst(model1); + String[] modelPriorities = CandidateTestUtils + .mockModelPriorityValues(new String[] { "modelA", "modelB" }); + queryContext.setModelPriorities(modelPriorities); + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0001", "modelB", 1, 1); + + Assertions.assertEquals(-1, sorter.compare(c1, c2)); + Assertions.assertEquals(1, sorter.compare(c2, c1)); + assertSortResult(c1, sorter, Lists.newArrayList(c1, c2)); } + // with empty model priorities, c1 and c2 has the same priority { - queryContext.setModelPriorities(new String[] { "MODELB" }); - val model1 = mockCandidate("model0001", "modelA", 1, 1); - val model2 = mockCandidate("model0002", "modelB", 2, 2); - sort(model1, model2).assertFirst(model2); + queryContext.setModelPriorities(new String[] {}); + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 2, 2); + assertSortResult(c1, sorter, Lists.newArrayList(c1, c2)); } { - queryContext.setModelPriorities(new String[] { "MODELB", "MODELA" }); - val model1 = mockCandidate("model0001", "modelA", 1, 1); - val model2 = mockCandidate("model0002", "modelB", 2, 2); - sort(model1, model2).assertFirst(model2); + queryContext.setModelPriorities(CandidateTestUtils.mockModelPriorityValues(new String[] { "modelB" })); + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 2, 2); + Assertions.assertEquals(Integer.MAX_VALUE, sorter.compare(c1, c2)); + assertSortResult(c2, sorter, Lists.newArrayList(c1, c2)); } { - queryContext.setModelPriorities(new String[] { "MODELC", "MODELA" }); - val model1 = mockCandidate("model0001", "modelA", 1, 1); - val model2 = mockCandidate("model0002", "modelB", 2, 2); - val model3 = mockCandidate("model0003", "modelC", 4, 4); - sort(model1, model2, model3).assertFirst(model3); + queryContext.setModelPriorities(new String[] { "MODELB", "MODELA" }); + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 2, 2); + Assertions.assertEquals(1, sorter.compare(c1, c2)); + assertSortResult(c2, sorter, Lists.newArrayList(c1, c2)); } } } @Test - public void testSort() { - { - val model1 = mockCandidate("model0001", "modelA", 1, 1); - val model2 = mockCandidate("model0002", "modelB", 2, 2); - sort(model1, model2).assertFirst(model1); - } - - { - val model1 = mockCandidate("model0001", "modelA", 2, 1); - val model2 = mockCandidate("model0002", "modelB", 2, 2); - sort(model1, model2).assertFirst(model1); - } - - { - val model1 = mockCandidate("model0001", "modelA", 2, 2); - val model2 = mockCandidate("model0002", "modelB", 2, 2); - sort(model1, model2).assertFirst(model1); - } - - { - val model1 = mockCandidate("model0001", "modelA", 1, 1); - val model2 = mockCandidate("model0002", "modelB", 2, 2); - val model3 = mockCandidate("model0003", "modelC", 4, 4); - sort(model1, model2, model3).assertFirst(model1); - } - - { - val model1 = mockCandidate("model0001", "modelA", 1, 1); - val model2 = mockEmptyCandidate("model0002", "modelB", 1); - sort(model1, model2).assertFirst(model1); - } - - { - val model1 = mockStreamingCandidate("model0001", "modelA", 1, 1); - val model2 = mockEmptyCandidate("model0002", "modelB", 1); - sort(model1, model2).assertFirst(model1); - } - - { - val model1 = mockHybridCandidate("model0001", "modelA", 1, 1, 2); - val model2 = mockEmptyCandidate("model0002", "modelB", 1); - sort(model1, model2).assertFirst(model1); - } - - { - val model1 = mockCandidate("model0001", "modelA", 1, 3); - val model2 = mockStreamingCandidate("model0002", "modelB", 1, 2); - val model3 = mockHybridCandidate("model0003", "modelC", 1, 4, 2); - sort(model1, model2, model3).assertFirst(model2); - } - } - - private interface SortedCandidate { - - void assertFirst(Candidate candidate); - } - - private SortedCandidate sort(Candidate... candidates) { - List<Comparator<Candidate>> sorters = Lists.newArrayList(); - sorters.add(Candidate.modelPrioritySorter()); - sorters.add(Candidate.realizationCostSorter()); - sorters.add(Candidate.realizationCapabilityCostSorter()); - sorters.add(Candidate.modelUuidSorter()); - - return candidate -> { - Arrays.sort(candidates, Ordering.compound(sorters)); - Assert.assertEquals(candidate.getRealization().getModel().getAlias(), - candidates[0].getRealization().getModel().getAlias()); - }; + void realizationCostSorterTest() { + Comparator<Candidate> comparator = Candidate.realizationCostSorter(); + NDataflow df1 = Mockito.mock(NDataflow.class); + NDataflow df2 = Mockito.mock(NDataflow.class); + OLAPContext olapContext = Mockito.mock(OLAPContext.class); + Candidate c1 = new Candidate(df1, olapContext, Maps.newHashMap()); + Candidate c2 = new Candidate(df2, olapContext, Maps.newHashMap()); + Mockito.when(c1.getRealization().getCost()).thenReturn(1); + Mockito.when(c2.getRealization().getCost()).thenReturn(2); + + // Assert that the Comparator sorts the Candidates correctly + assertSortResult(c1, comparator, Lists.newArrayList(c1, c2)); } - private Candidate mockCandidate(String modelId, String modelName, int modelCost, double candidateCost) { - val candidate = new Candidate(); - candidate.realization = mockRealization(modelId, modelName, modelCost); - val cap = new CapabilityResult(); - cap.setSelectedCandidate(() -> candidateCost); - cap.cost = (int) cap.getSelectedCandidate().getCost(); - candidate.setCapability(cap); - return candidate; + @Test + void realizationCapabilityCostSorter() { + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 2); + Candidate c3 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 2); + Comparator<Candidate> comparator = Candidate.realizationCapabilityCostSorter(); + assertSortResult(c1, comparator, Lists.newArrayList(c1, c2)); + assertSortResult(c2, comparator, Lists.newArrayList(c2, c3)); } - private Candidate mockStreamingCandidate(String modelId, String modelName, int modelCost, double candidateCost) { - val candidate = new Candidate(); - candidate.realization = mockRealization(modelId, modelName, modelCost); - val cap = new CapabilityResult(); - cap.setSelectedStreamingCandidate(() -> candidateCost); - cap.cost = (int) cap.getSelectedStreamingCandidate().getCost(); - candidate.setCapability(cap); - return candidate; + @Test + void testModelUuidSorter() { + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 1, 1); + Comparator<Candidate> comparator = Candidate.modelUuidSorter(); + assertSortResult(c1, comparator, Lists.newArrayList(c1, c2)); } - private Candidate mockHybridCandidate(String modelId, String modelName, int modelCost, double candidateCost, - double streamingCandidateCost) { - val candidate = new Candidate(); - candidate.realization = mockRealization(modelId, modelName, modelCost); - val cap = new CapabilityResult(); - cap.setSelectedCandidate(() -> candidateCost); - cap.setSelectedStreamingCandidate(() -> streamingCandidateCost); - cap.cost = (int) Math.min(cap.getSelectedCandidate().getCost(), cap.getSelectedStreamingCandidate().getCost()); - candidate.setCapability(cap); - return candidate; + @Test + void testTableIndexUnmatchedColSizeComparator() { + Comparator<Candidate> comparator = Candidate.tableIndexUnmatchedColSizeSorter(); + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 1, 2, 2); + assertSortResult(c1, comparator, Lists.newArrayList(c2, c1)); } - private Candidate mockEmptyCandidate(String modelId, String modelName, int modelCost) { - val candidate = new Candidate(); - candidate.realization = mockRealization(modelId, modelName, modelCost); - val cap = new CapabilityResult(); - cap.setSelectedCandidate(NLayoutCandidate.EMPTY); - cap.setSelectedStreamingCandidate(NLayoutCandidate.EMPTY); - candidate.setCapability(cap); - return candidate; + private void assertSortResult(Candidate expected, Comparator<Candidate> comparator, List<Candidate> candidates) { + candidates.sort(comparator); + Assertions.assertEquals(expected, candidates.get(0)); } - - private IRealization mockRealization(String modelId, String modelName, int cost) { - return new IRealization() { - @Override - public CapabilityResult isCapable(SQLDigest digest, List<NDataSegment> prunedSegments, - Map<String, Set<Long>> chSegToLayoutsMap) { - return null; - } - - @Override - public CapabilityResult isCapable(SQLDigest digest, QueryableSeg queryableSeg) { - return null; - } - - @Override - public String getType() { - return null; - } - - @Override - public KylinConfig getConfig() { - return null; - } - - @Override - public NDataModel getModel() { - val model = new NDataModel(); - model.setAlias(modelName); - model.setUuid(modelId); - return model; - } - - @Override - public Set<TblColRef> getAllColumns() { - return null; - } - - @Override - public List<TblColRef> getAllDimensions() { - return null; - } - - @Override - public List<MeasureDesc> getMeasures() { - return null; - } - - @Override - public List<IRealization> getRealizations() { - return null; - } - - @Override - public FunctionDesc findAggrFunc(FunctionDesc aggrFunc) { - return null; - } - - @Override - public boolean isOnline() { - return true; - } - - @Override - public String getUuid() { - return null; - } - - @Override - public String getCanonicalName() { - return null; - } - - @Override - public long getDateRangeStart() { - return 0; - } - - @Override - public long getDateRangeEnd() { - return 0; - } - - @Override - public int getCost() { - return cost; - } - - @Override - public boolean hasPrecalculatedFields() { - return false; - } - - @Override - public int getStorageType() { - return 0; - } - - @Override - public boolean isStreaming() { - return false; - } - - @Override - public String getProject() { - return null; - } - }; - } - } diff --git a/src/query/src/test/java/org/apache/kylin/query/routing/CandidateTestUtils.java b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateTestUtils.java new file mode 100644 index 0000000000..341a5a8686 --- /dev/null +++ b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateTestUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.routing; + +import java.util.Arrays; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.query.relnode.OLAPContext; + +import lombok.val; + +public class CandidateTestUtils { + + static Candidate mockCandidate(String modelId, String modelName, int realizationCost, double candidateCost) { + return mockCandidate(modelId, modelName, realizationCost, candidateCost, 0); + } + + static Candidate mockCandidate(String modelId, String modelName, int realizationCost, double candidateCost, + int unmatchedColSize) { + IRealization realization = mockRealization(modelId, modelName, realizationCost); + OLAPContext olapContext = mockOlapContext(); + val candidate = new Candidate(realization, olapContext, Maps.newHashMap()); + val cap = new CapabilityResult(); + cap.setSelectedCandidate(() -> candidateCost); + cap.setCost(cap.getSelectedCandidate().getCost()); + cap.setLayoutUnmatchedColsSize(unmatchedColSize); + candidate.setCapability(cap); + return candidate; + } + + static OLAPContext mockOlapContext() { + return new OLAPContext(-1); + } + + static IRealization mockRealization(String modelId, String modelName, int cost) { + return new NDataflow() { + @Override + public NDataModel getModel() { + val model = new NDataModel(); + model.setAlias(modelName); + model.setUuid(modelId); + return model; + } + + @Override + public boolean isOnline() { + return true; + } + + @Override + public int getCost() { + return cost; + } + }; + } + + static String[] mockModelPriorityValues(String[] arr) { + return Arrays.stream(arr).map(StringUtils::upperCase).toArray(String[]::new); + } +} diff --git a/src/query/src/test/java/org/apache/kylin/query/routing/LayoutCandidateSortTest.java b/src/query/src/test/java/org/apache/kylin/query/routing/LayoutCandidateSortTest.java new file mode 100644 index 0000000000..0666ef3a72 --- /dev/null +++ b/src/query/src/test/java/org/apache/kylin/query/routing/LayoutCandidateSortTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.routing; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.junit.annotation.MetadataInfo; +import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; +import org.apache.kylin.metadata.cube.cuboid.NQueryLayoutChooser; +import org.apache.kylin.metadata.cube.model.IndexEntity; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.cube.model.NIndexPlanManager; +import org.apache.kylin.metadata.cube.model.RuleBasedIndex; +import org.apache.kylin.metadata.model.DeriveInfo; +import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.util.MetadataTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +@MetadataInfo +class LayoutCandidateSortTest { + + @Test + void testPreferAggComparator() { + MockEntity mock1 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + 1, ImmutableList.of(1), 2); + MockEntity mock2 = new MockEntity(1L, ImmutableList.of(1), 3); + assertSortedResult(1L, NQueryLayoutChooser.preferAggComparator(), mock1, mock2); + } + + @Test + void testSegmentRangeComparator() { + MockEntity mock1 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + 1, ImmutableList.of(1, 2), 5000, 2000); + MockEntity mock2 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + IndexEntity.INDEX_ID_STEP + 1, + ImmutableList.of(1, 3), 2000, 2000); + assertSortedResult(IndexEntity.TABLE_INDEX_START_ID + 1, NQueryLayoutChooser.segmentRangeComparator(), mock1, + mock2); + } + + @Test + void testSegmentEffectivenessComparator() { + MockEntity mock1 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + 1, ImmutableList.of(1, 2), 1000, 3000); + MockEntity mock2 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + IndexEntity.INDEX_ID_STEP + 1, + ImmutableList.of(1, 3), 1000, 2000); + assertSortedResult(IndexEntity.TABLE_INDEX_START_ID + 1, NQueryLayoutChooser.segmentEffectivenessComparator(), + mock1, mock2); + } + + @Test + void testRowSizeComparator() { + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(1, 2), 90); + MockEntity mock2 = new MockEntity(IndexEntity.INDEX_ID_STEP + 1L, ImmutableList.of(1, 4), 30); + MockEntity mock3 = new MockEntity(2 * IndexEntity.INDEX_ID_STEP + 1L, ImmutableList.of(1, 5), 10); + assertSortedResult(2 * IndexEntity.INDEX_ID_STEP + 1L, NQueryLayoutChooser.rowSizeComparator(), mock1, mock2, + mock3); + } + + @Test + void testDerivedLayoutComparator() { + DeriveInfo mockDeriveInfo = Mockito.mock(DeriveInfo.class); + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(1, 2), ImmutableMap.of(5, mockDeriveInfo)); + MockEntity mock2 = new MockEntity(2 * IndexEntity.INDEX_ID_STEP + 1L, ImmutableList.of(1, 4), + ImmutableMap.of(3, mockDeriveInfo)); + MockEntity mock3 = new MockEntity(IndexEntity.INDEX_ID_STEP + 1L, ImmutableList.of(1, 3), ImmutableMap.of()); + + Comparator<NLayoutCandidate> comparator = NQueryLayoutChooser.derivedLayoutComparator(); + + // both not empty, choose the first one + assertSortedResult(1L, comparator, mock1, mock2); + + // choose the layout no need derivedInfo + assertSortedResult(IndexEntity.INDEX_ID_STEP + 1, comparator, mock1, mock3); + + // turn on table exclusion, but not prefer snapshot + MetadataTestUtils.updateProjectConfig("default", "kylin.metadata.table-exclusion-enabled", "true"); + MetadataTestUtils.updateProjectConfig("default", "kylin.query.snapshot-preferred-for-table-exclusion", "false"); + assertSortedResult(IndexEntity.INDEX_ID_STEP + 1, comparator, mock1, mock3); + + // turn on table exclusion and prefer snapshot + MetadataTestUtils.updateProjectConfig("default", "kylin.metadata.table-exclusion-enabled", "true"); + MetadataTestUtils.updateProjectConfig("default", "kylin.query.snapshot-preferred-for-table-exclusion", "true"); + assertSortedResult(1L, comparator, mock1, mock3); + } + + @Test + void testShardByComparator() { + + { + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0, 1, 2), ImmutableList.of(), ImmutableList.of(0)); + MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 0, 2), ImmutableList.of(), ImmutableList.of(1)); + + // all layout candidates have shardBy column + List<Integer> sortedFilters = Lists.newArrayList(2, 1, 0); + assertSortedResult(2L, NQueryLayoutChooser.shardByComparator(sortedFilters), mock1, mock2); + + sortedFilters = Lists.newArrayList(2, 0, 1); + assertSortedResult(1L, NQueryLayoutChooser.shardByComparator(sortedFilters), mock1, mock2); + } + + { + // the layout(id=2) has shardBy column(colId=1) + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0, 1, 2), ImmutableList.of(), ImmutableList.of()); + MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 0, 2), ImmutableList.of(), ImmutableList.of(1)); + + List<Integer> sortedFilters = Lists.newArrayList(2, 1, 0); + assertSortedResult(2L, NQueryLayoutChooser.shardByComparator(sortedFilters), mock1, mock2); + } + + { + // the layout(id=1) has shardBy column(colId=0) + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0, 1, 2), ImmutableList.of(), ImmutableList.of(0)); + MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 0, 2), ImmutableList.of(), ImmutableList.of()); + + List<Integer> sortedFilters = Lists.newArrayList(2, 1, 0); + assertSortedResult(1L, NQueryLayoutChooser.shardByComparator(sortedFilters), mock1, mock2); + } + } + + @Test + void testFilterColumnComparator() { + { + // both with shardBy, subject to the shardBy order + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(1, 2, 3), ImmutableList.of(), ImmutableList.of(1)); + MockEntity mock2 = new MockEntity(2L, ImmutableList.of(2, 1, 3), ImmutableList.of(), ImmutableList.of(2)); + List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3); + assertSortedResult(1L, NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2); + + sortedFilters = Lists.newArrayList(3, 2, 1); + assertSortedResult(2L, NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2); + } + + { + // one with shardBy + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), ImmutableList.of(), ImmutableList.of()); + MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), ImmutableList.of(), ImmutableList.of(1)); + List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3); + assertSortedResult(2L, NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2); + } + + { + // one with shardBy + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), ImmutableList.of(), ImmutableList.of(2)); + MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), ImmutableList.of(), ImmutableList.of()); + List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3); + assertSortedResult(1L, NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2); + } + + { + // both without shardBy + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), ImmutableMap.of()); + MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), ImmutableMap.of()); + List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3); + assertSortedResult(2L, NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2); + } + } + + @Test + void testNonFilterColumnComparator() { + { + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), ImmutableMap.of()); + MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), ImmutableMap.of()); + List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3); + assertSortedResult(2L, NQueryLayoutChooser.nonFilterColumnComparator(sortedFilters), mock1, mock2); + } + + { + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), ImmutableMap.of()); + MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), ImmutableMap.of()); + List<Integer> sortedFilters = Lists.newArrayList(2, 1, 3); + assertSortedResult(1L, NQueryLayoutChooser.nonFilterColumnComparator(sortedFilters), mock1, mock2); + } + } + + @Test + void testMeasureSizeComparator() { + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0), ImmutableList.of(100_000, 100_001)); + MockEntity mock2 = new MockEntity(10_001, ImmutableList.of(0), ImmutableList.of(100_000)); + List<NLayoutCandidate> layoutCandidates = mockLayouts(mock1, mock2); + layoutCandidates.sort(NQueryLayoutChooser.measureSizeComparator()); + Assertions.assertEquals(10_001L, layoutCandidates.get(0).getLayoutEntity().getId()); + } + + @Test + void testDimensionSizeComparator() { + MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0, 1, 2), ImmutableList.of()); + MockEntity mock2 = new MockEntity(10_001L, ImmutableList.of(0, 1, 2, 3), ImmutableList.of()); + List<NLayoutCandidate> layoutCandidates = mockLayouts(mock1, mock2); + layoutCandidates.sort(NQueryLayoutChooser.dimensionSizeComparator()); + Assertions.assertEquals(1L, layoutCandidates.get(0).getLayoutEntity().getId()); + } + + private void assertSortedResult(long expectedId, Comparator<NLayoutCandidate> comparator, + MockEntity... mockEntities) { + List<NLayoutCandidate> layoutCandidates = mockLayouts(mockEntities); + layoutCandidates.sort(comparator); + Assertions.assertEquals(expectedId, layoutCandidates.get(0).getLayoutEntity().getId()); + } + + static class MockEntity { + long id; + List<Integer> dimensions; + List<Integer> measures; + List<Integer> shardByCols; + Map<Integer, DeriveInfo> deriveInfoMap; + long segRange; + long maxSegEnd; + double rowCost; + + MockEntity(long id, List<Integer> dimensions, Map<Integer, DeriveInfo> deriveInfoMap) { + this(id, dimensions, Lists.newArrayList(100_000), Lists.newArrayList()); + this.deriveInfoMap = deriveInfoMap; + } + + MockEntity(long id, List<Integer> dimensions, double rowCost) { + this(id, dimensions, Lists.newArrayList(100_000), Lists.newArrayList()); + this.rowCost = rowCost; + } + + MockEntity(long id, List<Integer> dimensions, long segRange, long maxSegEnd) { + this(id, dimensions, Lists.newArrayList(100_000), Lists.newArrayList()); + this.segRange = segRange; + this.maxSegEnd = maxSegEnd; + } + + MockEntity(long id, List<Integer> dimensions, List<Integer> measures) { + this(id, dimensions, measures, Lists.newArrayList()); + } + + MockEntity(long id, List<Integer> dimensions, List<Integer> measures, List<Integer> shardByCols) { + this.shardByCols = shardByCols; + this.id = id; + this.dimensions = dimensions; + this.measures = id < IndexEntity.TABLE_INDEX_START_ID ? measures : Lists.newArrayList(); + } + } + + private List<NLayoutCandidate> mockLayouts(MockEntity... entities) { + String project = "default"; + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + String modelId = "82fa7671-a935-45f5-8779-85703601f49a"; + NDataModelManager modelMgr = NDataModelManager.getInstance(kylinConfig, project); + modelMgr.updateDataModel(modelId, copyForWrite -> { + List<NDataModel.NamedColumn> allNamedColumns = copyForWrite.getAllNamedColumns(); + for (NDataModel.NamedColumn column : allNamedColumns) { + if (column.isExist()) { + column.setStatus(NDataModel.ColumnStatus.DIMENSION); + } + } + }); + Map<Long, MockEntity> idToMockEntityMap = Maps.newHashMap(); + NIndexPlanManager indexMgr = NIndexPlanManager.getInstance(kylinConfig, project); + indexMgr.updateIndexPlan(modelId, copyForWrite -> { + // clear ruleBasedIndex and auto indexes + copyForWrite.setRuleBasedIndex(new RuleBasedIndex()); + copyForWrite.getIndexes().clear(); + Map<Long, IndexEntity> indexMap = Maps.newHashMap(); + for (MockEntity entity : entities) { + idToMockEntityMap.put(entity.id, entity); + long indexId = entity.id - entity.id % IndexEntity.INDEX_ID_STEP; + IndexEntity index = indexMap.get(indexId); + if (index == null) { + index = new IndexEntity(); + index.setId(indexId); + index.setDimensions(entity.dimensions); + index.setMeasures(entity.measures); + } + // add layout + LayoutEntity layout = new LayoutEntity(); + List<Integer> colOrder = Lists.newArrayList(); + colOrder.addAll(entity.dimensions); + colOrder.addAll(entity.measures); + layout.setIndex(index); + layout.setAuto(true); + layout.setColOrder(colOrder); + layout.setShardByColumns(entity.shardByCols); + layout.setId(entity.id); + index.addLayout(layout); + indexMap.put(index.getId(), index); + } + copyForWrite.updateNextId(); + copyForWrite.getIndexes().addAll(indexMap.values()); + }); + List<LayoutEntity> allLayouts = indexMgr.getIndexPlan(modelId).getAllLayouts(); + return allLayouts.stream().map(layout -> { + NLayoutCandidate layoutCandidate = new NLayoutCandidate(layout, 0, new CapabilityResult()); + MockEntity mockEntity = idToMockEntityMap.get(layout.getId()); + layoutCandidate.setRange(mockEntity.segRange); + layoutCandidate.setMaxSegEnd(mockEntity.maxSegEnd); + layoutCandidate.setCost(mockEntity.rowCost); + layoutCandidate.setDerivedToHostMap(mockEntity.deriveInfoMap); + return layoutCandidate; + }).collect(Collectors.toList()); + } +} diff --git a/src/query/src/test/java/org/apache/kylin/query/routing/QueryRouterTest.java b/src/query/src/test/java/org/apache/kylin/query/routing/QueryRouterTest.java new file mode 100644 index 0000000000..6f5fc71a7d --- /dev/null +++ b/src/query/src/test/java/org/apache/kylin/query/routing/QueryRouterTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.routing; + +import java.util.List; + +import org.apache.kylin.common.KylinConfigBase; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.junit.annotation.MetadataInfo; +import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; +import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.util.MetadataTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import lombok.val; + +@MetadataInfo +class QueryRouterTest { + @Test + void testSort() { + { + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 2, 2); + assertSortedResults(c1, Lists.newArrayList(c1, c2)); + } + + { + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 2, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 2, 2); + List<Candidate> candidates = Lists.newArrayList(c1, c2); + QueryRouter.sortCandidates("default", candidates); + Assertions.assertEquals(c1, candidates.get(0)); + } + + { + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 2, 2); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 2, 2); + List<Candidate> candidates = Lists.newArrayList(c1, c2); + QueryRouter.sortCandidates("default", candidates); + Assertions.assertEquals(c1, candidates.get(0)); + } + + { + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 2, 2); + Candidate c3 = CandidateTestUtils.mockCandidate("model0003", "modelC", 4, 4); + List<Candidate> candidates = Lists.newArrayList(c1, c2, c3); + QueryRouter.sortCandidates("default", candidates); + Assertions.assertEquals(c1, candidates.get(0)); + } + + { + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 1); + Candidate c2 = mockEmptyCandidate("model0003", "modelC", 1); + List<Candidate> candidates = Lists.newArrayList(c1, c2); + QueryRouter.sortCandidates("default", candidates); + Assertions.assertEquals(c1, candidates.get(0)); + } + + { + Candidate c1 = mockStreamingCandidate("model0001", "modelA", 2, 1); + Candidate c2 = mockEmptyCandidate("model0002", "modelB", 2); + List<Candidate> candidates = Lists.newArrayList(c1, c2); + QueryRouter.sortCandidates("default", candidates); + Assertions.assertEquals(c1, candidates.get(0)); + } + + { + Candidate c1 = mockHybridCandidate("model0001", "modelA", 3, 1, 2); + Candidate c2 = mockEmptyCandidate("model0002", "modelB", 3); + + assertSortedResults(c1, Lists.newArrayList(c1, c2)); + } + + { + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 1, 3); + Candidate c2 = mockStreamingCandidate("model0002", "modelB", 1, 2); + Candidate c3 = mockHybridCandidate("model0003", "modelC", 1, 4, 2.5); + List<Candidate> candidates = Lists.newArrayList(c1, c2, c3); + QueryRouter.sortCandidates("default", candidates); + Assertions.assertEquals(c2, candidates.get(0)); + } + } + + @Test + void testSortWithVacantPruningRule() { + // This property does affect the sorting of candidates in different models. + MetadataTestUtils.updateProjectConfig("default", "kylin.query.index-match-rules", + KylinConfigBase.USE_VACANT_INDEXES); + testSort(); + } + + @Test + void testTableIndexAnswerSelectStar() { + MetadataTestUtils.updateProjectConfig("default", "kylin.query.index-match-rules", + KylinConfigBase.USE_TABLE_INDEX_ANSWER_SELECT_STAR); + Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 2, 1, 1); + Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 1, 1, 2); + assertSortedResults(c1, Lists.newArrayList(c1, c2)); + + MetadataTestUtils.updateProjectConfig("default", "kylin.query.index-match-rules", ""); + assertSortedResults(c2, Lists.newArrayList(c1, c2)); + } + + private void assertSortedResults(Candidate expectCandidate, List<Candidate> candidates) { + QueryRouter.sortCandidates("default", candidates); + Assertions.assertEquals(expectCandidate, candidates.get(0)); + } + + private Candidate mockStreamingCandidate(String modelId, String modelName, int realizationCost, + double candidateCost) { + IRealization realization = CandidateTestUtils.mockRealization(modelId, modelName, realizationCost); + OLAPContext olapContext = CandidateTestUtils.mockOlapContext(); + val candidate = new Candidate(realization, olapContext, Maps.newHashMap()); + val cap = new CapabilityResult(); + cap.setSelectedStreamingCandidate(() -> candidateCost); + cap.setCost(cap.getSelectedStreamingCandidate().getCost()); + candidate.setCapability(cap); + return candidate; + } + + private Candidate mockHybridCandidate(String modelId, String modelName, int realizationCost, double candidateCost, + double streamingCandidateCost) { + IRealization realization = CandidateTestUtils.mockRealization(modelId, modelName, realizationCost); + OLAPContext olapContext = CandidateTestUtils.mockOlapContext(); + val candidate = new Candidate(realization, olapContext, Maps.newHashMap()); + val cap = new CapabilityResult(); + cap.setSelectedCandidate(() -> candidateCost); + cap.setSelectedStreamingCandidate(() -> streamingCandidateCost); + cap.setCost( + (int) Math.min(cap.getSelectedCandidate().getCost(), cap.getSelectedStreamingCandidate().getCost())); + candidate.setCapability(cap); + return candidate; + } + + private Candidate mockEmptyCandidate(String modelId, String modelName, int realizationCost) { + IRealization realization = CandidateTestUtils.mockRealization(modelId, modelName, realizationCost); + OLAPContext olapContext = CandidateTestUtils.mockOlapContext(); + val candidate = new Candidate(realization, olapContext, Maps.newHashMap()); + candidate.realization = CandidateTestUtils.mockRealization(modelId, modelName, realizationCost); + val cap = new CapabilityResult(); + cap.setSelectedCandidate(NLayoutCandidate.EMPTY); + cap.setSelectedStreamingCandidate(NLayoutCandidate.EMPTY); + candidate.setCapability(cap); + return candidate; + } +} diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index b664963211..483e35aa50 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -292,7 +292,9 @@ class FilePruner(val session: SparkSession, QueryContext.current().getMetrics.setFileCount(totalFileCount) val totalFileSize = selected.flatMap(partition => partition.files).map(_.getLen).sum val sourceRows = selected.map(seg => { - val layoutRows = dataflow.getSegment(seg.segmentID).getLayout(layout.getId).getRows + val segment = dataflow.getSegment(seg.segmentID) + val dataLayout = segment.getLayout(layout.getId) + val layoutRows = if (dataLayout == null) 0 else dataLayout.getRows logInfo(s"Source scan rows: Query Id: ${QueryContext.current().getQueryId}, Segment Id: ${seg.segmentID}, " + s"Layout Id: ${layout.getId}, rows: $layoutRows.") layoutRows @@ -361,7 +363,9 @@ class FilePruner(val session: SparkSession, private def pruneEmptySegments(segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = { segDirs.filter(seg => { - if (dataflow.getSegment(seg.segmentID).getLayout(layout.getId).isEmpty) { + val segment = dataflow.getSegment(seg.segmentID) + val dataLayout = segment.getLayout(layout.getId) + if (dataLayout == null || dataLayout.isEmpty) { logDebug(s"pruning empty segment: segment ${seg.segmentID} ${layout.getId} is empty.") false } else {