This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4bda83a3388a0ac79a8989160068a790919d43fe
Author: Pengfei Zhan <dethr...@gmail.com>
AuthorDate: Sat Apr 8 23:21:35 2023 +0800

    KYLIN-5633 optimize the pruning process of heterogeneous segments
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  32 ++-
 .../java/org/apache/kylin/common/QueryContext.java |   3 +-
 .../kylin/metadata/cube/cuboid/ChooserContext.java |  11 +-
 .../metadata/cube/cuboid/NLayoutCandidate.java     | 129 ++++-----
 .../metadata/cube/cuboid/NLookupCandidate.java     |   4 +-
 .../metadata/cube/cuboid/NQueryLayoutChooser.java  | 184 +++++++++---
 .../cube/model/NDataflowCapabilityChecker.java     |  10 +-
 .../metadata/realization/CapabilityResult.java     |  17 +-
 .../metadata/realization/HybridRealization.java    |   6 +-
 .../routing/HeterogeneousSegmentPruningTest.java   | 143 ++++++++-
 .../routing/NDataflowCapabilityCheckerTest.java    |   2 +-
 .../query/routing/VacantIndexPruningRuleTest.java  | 212 ++++++++++++++
 .../org/apache/kylin/query/routing/Candidate.java  |   6 +-
 .../apache/kylin/query/routing/QueryRouter.java    |  17 +-
 .../kylin/query/routing/RealizationChooser.java    |  11 +-
 .../query/routing/VacantIndexPruningRule.java      | 142 +++++++++
 .../apache/kylin/rest/response/SQLResponse.java    |  15 +-
 .../apache/kylin/rest/service/QueryService.java    |  21 +-
 .../kylin/query/util/QueryContextCutter.java       |   9 +-
 .../kylin/query/routing/CandidateSortTest.java     | 320 +++++----------------
 .../kylin/query/routing/CandidateTestUtils.java    |  81 ++++++
 .../query/routing/LayoutCandidateSortTest.java     | 320 +++++++++++++++++++++
 .../kylin/query/routing/QueryRouterTest.java       | 167 +++++++++++
 .../sql/execution/datasource/FilePruner.scala      |   8 +-
 24 files changed, 1430 insertions(+), 440 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9557ede4d5..c0797ebc18 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2308,6 +2308,34 @@ public abstract class KylinConfigBase implements 
Serializable {
                         
"org.apache.kylin.query.util.SparkSQLFunctionConverter" });
     }
 
+    public static final String USE_VACANT_INDEXES = "use-vacant-indexes";
+    public static final String USE_TABLE_INDEX_ANSWER_SELECT_STAR = 
"use-table-index-answer-select-star";
+
+    public String getQueryIndexMatchRules() {
+        return getOptional("kylin.query.index-match-rules", "");
+    }
+
+    private Set<String> getPruningRules() {
+        String queryIndexMatchRules = getQueryIndexMatchRules();
+        String[] splitRules = queryIndexMatchRules.split(",");
+        Set<String> configRules = Sets.newHashSet();
+        for (String splitRule : splitRules) {
+            if (StringUtils.isNotBlank(splitRule)) {
+                configRules.add(StringUtils.lowerCase(splitRule));
+            }
+        }
+        return configRules;
+    }
+
+    public boolean isVacantIndexPruningEnabled() {
+        return getPruningRules().contains(KylinConfigBase.USE_VACANT_INDEXES);
+    }
+
+    public boolean useTableIndexAnswerSelectStarEnabled() {
+        return 
getPruningRules().contains(KylinConfigBase.USE_TABLE_INDEX_ANSWER_SELECT_STAR)
+                || 
Boolean.parseBoolean(getOptional("kylin.query.use-tableindex-answer-select-star.enabled",
 FALSE));
+    }
+
     @ThirdPartyDependencies({
             @ThirdPartyDependencies.ThirdPartyDependent(repository = 
"static-user-manager", classes = {
                     "StaticAuthenticationProvider", "StaticUserGroupService", 
"StaticUserService" }) })
@@ -3761,10 +3789,6 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.build.resource.skip-resource-check", 
FALSE));
     }
 
-    public boolean useTableIndexAnswerSelectStarEnabled() {
-        return 
Boolean.parseBoolean(getOptional("kylin.query.use-tableindex-answer-select-star.enabled",
 FALSE));
-    }
-
     public int getSecondStorageSkippingIndexGranularity() {
         int granularity = 
Integer.parseInt(getOptional("kylin.second-storage.skipping-index.granularity", 
"3"));
         return granularity <= 0 ? 3 : granularity;
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 4f766fb736..cda5ec2c33 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -30,9 +30,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 
 import com.alibaba.ttl.TransmittableThreadLocal;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -361,6 +361,7 @@ public class QueryContext implements Closeable {
         private String separator;
         private boolean isRefused;
         private boolean includeHeader;
+        private boolean isVacant;
     }
 
     @Getter
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/ChooserContext.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/ChooserContext.java
index 454cc3bc2f..bf6e8efd74 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/ChooserContext.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/ChooserContext.java
@@ -56,7 +56,7 @@ public class ChooserContext {
     final Map<Integer, TableExtDesc.ColumnStats> columnStatMap = 
Maps.newHashMap();
 
     final KylinConfig kylinConfig;
-
+    SQLDigest sqlDigest;
     AggIndexMatcher aggIndexMatcher;
     TableIndexMatcher tableIndexMatcher;
 
@@ -79,6 +79,7 @@ public class ChooserContext {
 
     public ChooserContext(SQLDigest sqlDigest, NDataflow dataflow) {
         this(dataflow.getModel());
+        this.sqlDigest = sqlDigest;
         prepareIndexMatchers(sqlDigest, dataflow);
     }
 
@@ -106,7 +107,13 @@ public class ChooserContext {
      *    see org.apache.kylin.query.DynamicQueryTest.testDynamicParamOnAgg.
      */
     public boolean isIndexMatchersInvalid() {
-        return !getAggIndexMatcher().isValid() && 
!getTableIndexMatcher().isValid();
+        boolean invalid = !getAggIndexMatcher().isValid() && 
!getTableIndexMatcher().isValid();
+        if (invalid) {
+            log.warn("Unfortunately, the fast check has failed. "
+                    + "It's possible that the queried columns contain null 
values, "
+                    + "which could be due to the computed column not being 
present in the model.");
+        }
+        return invalid;
     }
 
     public TableExtDesc.ColumnStats getColumnStats(TblColRef ref) {
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
index 2ab1afccc0..19b1ba592a 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
@@ -17,50 +17,44 @@
  */
 package org.apache.kylin.metadata.cube.cuboid;
 
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.annotation.Nonnull;
-
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.metadata.model.DeriveInfo;
-import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.realization.CapabilityResult;
 import org.apache.kylin.metadata.realization.IRealizationCandidate;
 
 import lombok.Getter;
 import lombok.Setter;
 
+@Getter
+@Setter
 public class NLayoutCandidate implements IRealizationCandidate {
-    private @Nonnull LayoutEntity layoutEntity;
-    @Setter
-    private double cost;
-
-    @Setter
-    @Getter
-    private CapabilityResult capabilityResult;
 
     public static final NLayoutCandidate EMPTY = new NLayoutCandidate(new 
LayoutEntity(), Double.MAX_VALUE,
             new CapabilityResult());
 
-    // derived
-    private @Nonnull Map<Integer, DeriveInfo> derivedToHostMap = 
Maps.newHashMap();
-
-    @Getter
-    @Setter
-    Set<String> derivedTableSnapshots = new HashSet<>();
+    private LayoutEntity layoutEntity;
+    private double cost;
+    private CapabilityResult capabilityResult;
+    private long range;
+    private long maxSegEnd;
+    private Map<Integer, DeriveInfo> derivedToHostMap = Maps.newHashMap();
+    Set<String> derivedTableSnapshots = Sets.newHashSet();
 
-    public NLayoutCandidate(@Nonnull LayoutEntity layoutEntity) {
+    public NLayoutCandidate(LayoutEntity layoutEntity) {
+        Preconditions.checkNotNull(layoutEntity);
         this.layoutEntity = layoutEntity;
     }
 
-    public NLayoutCandidate(@Nonnull LayoutEntity layoutEntity, double cost, 
CapabilityResult result) {
-        this.layoutEntity = layoutEntity;
+    public NLayoutCandidate(LayoutEntity layoutEntity, double cost, 
CapabilityResult result) {
+        this(layoutEntity);
         this.cost = cost;
         this.capabilityResult = result;
     }
@@ -69,66 +63,57 @@ public class NLayoutCandidate implements 
IRealizationCandidate {
         return this.getLayoutEntity().getIndex() == null;
     }
 
-    @Nonnull
-    public LayoutEntity getLayoutEntity() {
-        return layoutEntity;
-    }
-
-    public void setLayoutEntity(@Nonnull LayoutEntity cuboidLayout) {
-        this.layoutEntity = cuboidLayout;
-    }
-
-    @Nonnull
-    public Map<Integer, DeriveInfo> getDerivedToHostMap() {
-        return derivedToHostMap;
-    }
-
-    public void setDerivedToHostMap(@Nonnull Map<Integer, DeriveInfo> 
derivedToHostMap) {
-        this.derivedToHostMap = derivedToHostMap;
-    }
-
     public Map<List<Integer>, List<DeriveInfo>> makeHostToDerivedMap() {
         Map<List<Integer>, List<DeriveInfo>> hostToDerivedMap = 
Maps.newHashMap();
-
-        for (Map.Entry<Integer, DeriveInfo> entry : 
derivedToHostMap.entrySet()) {
-
-            Integer derCol = entry.getKey();
-            List<Integer> hostCols = entry.getValue().columns;
-            DeriveInfo.DeriveType type = entry.getValue().type;
-            JoinDesc join = entry.getValue().join;
-
-            List<DeriveInfo> infoList = 
hostToDerivedMap.computeIfAbsent(hostCols, k -> new ArrayList<>());
-
-            // Merged duplicated derived column
-            boolean merged = false;
-            for (DeriveInfo existing : infoList) {
-                if (existing.type == type && 
existing.join.getPKSide().equals(join.getPKSide())) {
-                    if (existing.columns.contains(derCol)) {
-                        merged = true;
-                        break;
-                    }
-                    if (type == DeriveInfo.DeriveType.LOOKUP || type == 
DeriveInfo.DeriveType.LOOKUP_NON_EQUI) {
-                        existing.columns.add(derCol);
-                        merged = true;
-                        break;
-                    }
-                }
+        derivedToHostMap.forEach((derivedColId, deriveInfo) -> {
+            DeriveInfo.DeriveType type = deriveInfo.type;
+            List<Integer> columns = deriveInfo.columns;
+            List<DeriveInfo> infoList = 
hostToDerivedMap.computeIfAbsent(columns, k -> Lists.newArrayList());
+            if (!isMerged(derivedColId, deriveInfo, infoList)) {
+                infoList.add(new DeriveInfo(type, deriveInfo.join, 
Lists.newArrayList(derivedColId), false));
             }
-            if (!merged)
-                infoList.add(new DeriveInfo(type, join, 
Lists.newArrayList(derCol), false));
-        }
-
+        });
         return hostToDerivedMap;
     }
 
-    @Override
-    public double getCost() {
-        return this.cost;
+    // Merged duplicated derived column
+    private static boolean isMerged(Integer derCol, DeriveInfo deriveInfo, 
List<DeriveInfo> infoList) {
+        DeriveInfo.DeriveType type = deriveInfo.type;
+        boolean merged = false;
+        for (DeriveInfo existing : infoList) {
+            if (existing.type == type && 
existing.join.getPKSide().equals(deriveInfo.join.getPKSide())) {
+                if (existing.columns.contains(derCol)) {
+                    merged = true;
+                }
+                if (type == DeriveInfo.DeriveType.LOOKUP || type == 
DeriveInfo.DeriveType.LOOKUP_NON_EQUI) {
+                    existing.columns.add(derCol);
+                    merged = true;
+                }
+            }
+            if (merged) {
+                break;
+            }
+        }
+        return merged;
     }
 
     @Override
     public String toString() {
-        return "LayoutCandidate{" + "cuboidLayout=" + layoutEntity + ", 
indexEntity=" + layoutEntity.getIndex()
-                + ", cost=" + cost + '}';
+        String type = "";
+        if (layoutEntity.isManual()) {
+            type += "manual";
+        } else if (layoutEntity.isAuto()) {
+            type += "auto";
+        }
+        if (layoutEntity.isBase()) {
+            type += type.isEmpty() ? "base" : ",base";
+        }
+        if (type.isEmpty()) {
+            type = "unknown";
+        }
+        return "LayoutCandidate{" + "layout=" + layoutEntity //
+                + ", type=" + type //
+                + ", cost=" + cost //
+                + "}";
     }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLookupCandidate.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLookupCandidate.java
index a0901d4cb9..c65f1e1032 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLookupCandidate.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLookupCandidate.java
@@ -24,10 +24,10 @@ import lombok.Getter;
 
 public class NLookupCandidate implements IRealizationCandidate {
     @Getter
-    private String tableRef;
+    private final String tableRef;
 
     @Getter
-    private boolean isUsingSnapShot;
+    private final boolean isUsingSnapShot;
 
     public NLookupCandidate(String tableRef, boolean isUsingSnapShot) {
         this.tableRef = tableRef;
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java
index 31572941e5..4b28ad7c7a 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NQueryLayoutChooser.java
@@ -95,23 +95,13 @@ public class NQueryLayoutChooser {
         Map<Long, List<NDataLayout>> commonLayoutsMap = commonLayouts.stream()
                 .collect(Collectors.toMap(NDataLayout::getLayoutId, 
Lists::newArrayList));
         List<NLayoutCandidate> candidates = 
collectAllLayoutCandidates(dataflow, chooserContext, commonLayoutsMap);
-
-        QueryInterruptChecker.checkThreadInterrupted("Interrupted exception 
occurs.",
-                "Current step involves gathering all the layouts that "
-                        + "can potentially provide a response to this query.");
-
-        if (candidates.isEmpty()) {
-            return null;
-        }
-        log.info("Matched candidates num : {}", candidates.size());
-        sortCandidates(candidates, chooserContext, sqlDigest);
-        return candidates.get(0);
+        return chooseBestLayoutCandidate(dataflow, sqlDigest, chooserContext, 
candidates, "selectLayoutCandidate");
     }
 
     public static List<NLayoutCandidate> collectAllLayoutCandidates(NDataflow 
dataflow, ChooserContext chooserContext,
-            Map<Long, List<NDataLayout>> commonLayoutsMap) {
+            Map<Long, List<NDataLayout>> dataLayoutMap) {
         List<NLayoutCandidate> candidates = Lists.newArrayList();
-        for (Map.Entry<Long, List<NDataLayout>> entry : 
commonLayoutsMap.entrySet()) {
+        for (Map.Entry<Long, List<NDataLayout>> entry : 
dataLayoutMap.entrySet()) {
             LayoutEntity layout = 
dataflow.getIndexPlan().getLayoutEntity(entry.getKey());
             log.trace("Matching index: id = {}", entry.getKey());
             IndexMatcher.MatchResult matchResult = 
chooserContext.getTableIndexMatcher().match(layout);
@@ -120,7 +110,7 @@ public class NQueryLayoutChooser {
             }
 
             if (!matchResult.isMatched()) {
-                log.trace("Matching failed");
+                log.trace("The [{}] cannot match with the {}", 
chooserContext.sqlDigest.toString(), layout);
                 continue;
             }
 
@@ -131,14 +121,84 @@ public class NQueryLayoutChooser {
                 
candidate.setDerivedTableSnapshots(candidate.getDerivedToHostMap().keySet().stream()
                         .map(i -> 
chooserContext.convertToRef(i).getTable()).collect(Collectors.toSet()));
             }
-            long allRows = 
entry.getValue().stream().mapToLong(NDataLayout::getRows).sum();
+            List<NDataLayout> dataLayouts = entry.getValue();
+            long allRows = 
dataLayouts.stream().mapToLong(NDataLayout::getRows).sum();
             candidate.setCost(allRows * (tempResult.influences.size() + 
matchResult.getInfluenceFactor()));
             candidate.setCapabilityResult(tempResult);
+
+            long[] rangeAndLatest = calcSegRangeAndMaxEnd(chooserContext, 
dataflow, dataLayouts);
+            candidate.setRange(rangeAndLatest[0]);
+            candidate.setMaxSegEnd(rangeAndLatest[1]);
             candidates.add(candidate);
         }
         return candidates;
     }
 
+    private static long[] calcSegRangeAndMaxEnd(ChooserContext chooserContext, 
NDataflow df,
+            List<NDataLayout> dataLayouts) {
+        long[] rangeAndLatest = new long[2];
+        if (!chooserContext.getKylinConfig().isVacantIndexPruningEnabled()) {
+            return rangeAndLatest;
+        }
+        List<String> segmentNameList = Lists.newArrayList();
+        for (NDataLayout dataLayout : dataLayouts) {
+            NDataSegment segment = 
df.getSegment(dataLayout.getSegDetails().getId());
+            Long end = (Long) segment.getSegRange().getEnd();
+            Long start = (Long) segment.getSegRange().getStart();
+            rangeAndLatest[0] += (end - start);
+            rangeAndLatest[1] = Math.max(rangeAndLatest[1], end);
+            segmentNameList.add(segment.getName());
+        }
+        log.trace("All available segments are: {}", segmentNameList);
+        return rangeAndLatest;
+    }
+
+    public static NLayoutCandidate selectHighIntegrityCandidate(NDataflow 
dataflow, List<NDataSegment> prunedSegments,
+            SQLDigest digest) {
+        if 
(!NProjectManager.getProjectConfig(dataflow.getProject()).isVacantIndexPruningEnabled())
 {
+            return null;
+        }
+        if (CollectionUtils.isEmpty(prunedSegments)) {
+            log.info("There is no segment to answer sql");
+            return NLayoutCandidate.EMPTY;
+        }
+
+        ChooserContext chooserContext = new ChooserContext(digest, dataflow);
+        if (chooserContext.isIndexMatchersInvalid()) {
+            return null;
+        }
+
+        Map<Long, List<NDataLayout>> idToDataLayoutsMap = Maps.newHashMap();
+        for (NDataSegment segment : prunedSegments) {
+            segment.getLayoutsMap().forEach((id, dataLayout) -> {
+                idToDataLayoutsMap.putIfAbsent(id, Lists.newArrayList());
+                idToDataLayoutsMap.get(id).add(dataLayout);
+            });
+        }
+
+        List<NLayoutCandidate> allLayoutCandidates = 
NQueryLayoutChooser.collectAllLayoutCandidates(dataflow,
+                chooserContext, idToDataLayoutsMap);
+        return chooseBestLayoutCandidate(dataflow, digest, chooserContext, 
allLayoutCandidates,
+                "selectHighIntegrityCandidate");
+    }
+
+    private static NLayoutCandidate chooseBestLayoutCandidate(NDataflow 
dataflow, SQLDigest digest,
+            ChooserContext chooserContext, List<NLayoutCandidate> 
allLayoutCandidates, String invokedByMethod) {
+        QueryInterruptChecker.checkThreadInterrupted("Interrupted exception 
occurs.",
+                "Current step involves gathering all the layouts that "
+                        + "can potentially provide a response to this query.");
+
+        if (allLayoutCandidates.isEmpty()) {
+            log.info("There is no layouts can match with the [{}]", 
digest.toString());
+            return null;
+        }
+        sortCandidates(allLayoutCandidates, chooserContext, digest);
+        log.debug("Invoked by method {}. Successfully matched {} candidates 
within the model ({}/{}), " //
+                + "and {} has been selected.", invokedByMethod, 
allLayoutCandidates.size(), dataflow.getProject(),
+                dataflow.getId(), allLayoutCandidates.get(0).toString());
+        return allLayoutCandidates.get(0);
+    }
+
     private static Collection<NDataLayout> getCommonLayouts(List<NDataSegment> 
segments, NDataflow dataflow,
             Map<String, Set<Long>> chSegmentToLayoutsMap) {
         KylinConfig projectConfig = 
NProjectManager.getProjectConfig(dataflow.getProject());
@@ -174,35 +234,75 @@ public class NQueryLayoutChooser {
         return commonLayouts.values();
     }
 
-    private static void sortCandidates(List<NLayoutCandidate> candidates, 
ChooserContext chooserContext,
+    public static void sortCandidates(List<NLayoutCandidate> candidates, 
ChooserContext chooserContext,
             SQLDigest sqlDigest) {
-        final Set<TblColRef> filterColSet = 
ImmutableSet.copyOf(sqlDigest.filterColumns);
-        final List<TblColRef> filterCols = Lists.newArrayList(filterColSet);
-        val filterColIds = 
filterCols.stream().sorted(ComparatorUtils.filterColComparator(chooserContext))
-                .map(col -> 
chooserContext.getTblColMap().get(col)).collect(Collectors.toList());
+        List<Integer> filterColIds = getFilterColIds(chooserContext, 
sqlDigest);
+        List<Integer> nonFilterColIds = getNonFilterColIds(chooserContext, 
sqlDigest);
+        Ordering<NLayoutCandidate> ordering = 
chooserContext.getKylinConfig().isVacantIndexPruningEnabled()
+                ? getEnhancedSorter(filterColIds, nonFilterColIds)
+                : getDefaultSorter(filterColIds, nonFilterColIds);
+        candidates.sort(ordering);
+    }
 
-        final Set<TblColRef> nonFilterColSet = sqlDigest.isRawQuery ? 
sqlDigest.allColumns.stream()
-                .filter(colRef -> colRef.getFilterLevel() == 
TblColRef.FilterColEnum.NONE).collect(Collectors.toSet())
-                : sqlDigest.groupbyColumns.stream()
-                        .filter(colRef -> colRef.getFilterLevel() == 
TblColRef.FilterColEnum.NONE)
-                        .collect(Collectors.toSet());
-        final List<TblColRef> nonFilterColumns = 
Lists.newArrayList(nonFilterColSet);
-        nonFilterColumns.sort(ComparatorUtils.nonFilterColComparator());
-        val nonFilterColIds = nonFilterColumns.stream().map(col -> 
chooserContext.getTblColMap().get(col))
-                .collect(Collectors.toList());
+    private static Ordering<NLayoutCandidate> getEnhancedSorter(List<Integer> 
filterColIds,
+            List<Integer> nonFilterColIds) {
+        return Ordering.from(segmentRangeComparator()) // high data integrity
+                .compound(preferAggComparator()) //
+                .compound(derivedLayoutComparator()) //
+                .compound(rowSizeComparator()) // lower cost
+                .compound(filterColumnComparator(filterColIds)) //
+                .compound(dimensionSizeComparator()) //
+                .compound(measureSizeComparator()) //
+                .compound(nonFilterColumnComparator(nonFilterColIds)) //
+                .compound(segmentEffectivenessComparator()); // the latest 
segment
+    }
 
-        Ordering<NLayoutCandidate> ordering = Ordering //
-                .from(priorityLayoutComparator()) //
+    private static Ordering<NLayoutCandidate> getDefaultSorter(List<Integer> 
filterColIds,
+            List<Integer> nonFilterColIds) {
+        return Ordering //
+                .from(preferAggComparator()) //
                 .compound(derivedLayoutComparator()) //
                 .compound(rowSizeComparator()) // L1 comparator, compare 
cuboid rows
                 .compound(filterColumnComparator(filterColIds)) // L2 
comparator, order filter columns
                 .compound(dimensionSizeComparator()) // the lower dimension 
the best
                 .compound(measureSizeComparator()) // L3 comparator, order 
size of cuboid columns
-                .compound(nonFilterColumnComparator(nonFilterColIds)); // L4 
comparator, order non-filter columns
-        candidates.sort(ordering);
+                .compound(nonFilterColumnComparator(nonFilterColIds));
+    }
+
+    private static List<Integer> getFilterColIds(ChooserContext 
chooserContext, SQLDigest sqlDigest) {
+        Set<TblColRef> filterColSet = 
ImmutableSet.copyOf(sqlDigest.filterColumns);
+        List<TblColRef> filterCols = Lists.newArrayList(filterColSet);
+        return 
filterCols.stream().sorted(ComparatorUtils.filterColComparator(chooserContext))
+                .map(col -> 
chooserContext.getTblColMap().get(col)).collect(Collectors.toList());
+    }
+
+    private static List<Integer> getNonFilterColIds(ChooserContext 
chooserContext, SQLDigest sqlDigest) {
+
+        Set<TblColRef> nonFilterColSet;
+        if (sqlDigest.isRawQuery) {
+            nonFilterColSet = sqlDigest.allColumns.stream()
+                    .filter(colRef -> colRef.getFilterLevel() == 
TblColRef.FilterColEnum.NONE)
+                    .collect(Collectors.toSet());
+        } else {
+            nonFilterColSet = sqlDigest.groupbyColumns.stream()
+                    .filter(colRef -> colRef.getFilterLevel() == 
TblColRef.FilterColEnum.NONE)
+                    .collect(Collectors.toSet());
+        }
+        List<TblColRef> nonFilterColumns = Lists.newArrayList(nonFilterColSet);
+        nonFilterColumns.sort(ComparatorUtils.nonFilterColComparator());
+        return nonFilterColumns.stream().map(col -> 
chooserContext.getTblColMap().get(col))
+                .collect(Collectors.toList());
+    }
+
+    public static Comparator<NLayoutCandidate> segmentRangeComparator() {
+        return (c1, c2) -> Long.compare(c2.getRange(), c1.getRange());
+    }
+
+    public static Comparator<NLayoutCandidate> 
segmentEffectivenessComparator() {
+        return (c1, c2) -> Long.compare(c2.getMaxSegEnd(), c1.getMaxSegEnd());
     }
 
-    private static Comparator<NLayoutCandidate> priorityLayoutComparator() {
+    public static Comparator<NLayoutCandidate> preferAggComparator() {
         return (layoutCandidate1, layoutCandidate2) -> {
             if (!KylinConfig.getInstanceFromEnv().isPreferAggIndex()) {
                 return 0;
@@ -218,7 +318,7 @@ public class NQueryLayoutChooser {
         };
     }
 
-    private static Comparator<NLayoutCandidate> derivedLayoutComparator() {
+    public static Comparator<NLayoutCandidate> derivedLayoutComparator() {
         return (candidate1, candidate2) -> {
             int result = 0;
             if (candidate1.getDerivedToHostMap().isEmpty() && 
!candidate2.getDerivedToHostMap().isEmpty()) {
@@ -236,15 +336,15 @@ public class NQueryLayoutChooser {
         };
     }
 
-    private static Comparator<NLayoutCandidate> rowSizeComparator() {
+    public static Comparator<NLayoutCandidate> rowSizeComparator() {
         return Comparator.comparingDouble(NLayoutCandidate::getCost);
     }
 
-    private static Comparator<NLayoutCandidate> dimensionSizeComparator() {
+    public static Comparator<NLayoutCandidate> dimensionSizeComparator() {
         return Comparator.comparingInt(candidate -> 
candidate.getLayoutEntity().getOrderedDimensions().size());
     }
 
-    private static Comparator<NLayoutCandidate> measureSizeComparator() {
+    public static Comparator<NLayoutCandidate> measureSizeComparator() {
         return Comparator.comparingInt(candidate -> 
candidate.getLayoutEntity().getOrderedMeasures().size());
     }
 
@@ -253,18 +353,18 @@ public class NQueryLayoutChooser {
      * 1. choose the layout if its shardby column is found in filters
      * 2. otherwise, compare position of filter columns appear in the layout 
dims
      */
-    private static Comparator<NLayoutCandidate> 
filterColumnComparator(List<Integer> sortedFilters) {
+    public static Comparator<NLayoutCandidate> 
filterColumnComparator(List<Integer> sortedFilters) {
         return 
Ordering.from(shardByComparator(sortedFilters)).compound(colComparator(sortedFilters));
     }
 
-    private static Comparator<NLayoutCandidate> 
nonFilterColumnComparator(List<Integer> sortedNonFilters) {
+    public static Comparator<NLayoutCandidate> 
nonFilterColumnComparator(List<Integer> sortedNonFilters) {
         return colComparator(sortedNonFilters);
     }
 
     /**
      * compare filters with dim pos in layout, filter columns are sorted by 
filter type and selectivity (cardinality)
      */
-    private static Comparator<NLayoutCandidate> colComparator(List<Integer> 
sortedCols) {
+    public static Comparator<NLayoutCandidate> colComparator(List<Integer> 
sortedCols) {
         return (layoutCandidate1, layoutCandidate2) -> {
             List<Integer> position1 = getColumnsPos(layoutCandidate1, 
sortedCols);
             List<Integer> position2 = getColumnsPos(layoutCandidate2, 
sortedCols);
@@ -289,7 +389,7 @@ public class NQueryLayoutChooser {
      * 1. check if shardby columns appears in filters
      * 2. if both layout has shardy columns in filters, compare the filter 
type and selectivity (cardinality)
      */
-    private static Comparator<NLayoutCandidate> 
shardByComparator(List<Integer> columns) {
+    public static Comparator<NLayoutCandidate> shardByComparator(List<Integer> 
columns) {
         return (candidate1, candidate2) -> {
             int shardByCol1Idx = getShardByColIndex(candidate1, columns);
             int shardByCol2Idx = getShardByColIndex(candidate2, columns);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowCapabilityChecker.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowCapabilityChecker.java
index c99cf27412..e066bbed45 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowCapabilityChecker.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflowCapabilityChecker.java
@@ -73,13 +73,17 @@ public class NDataflowCapabilityChecker {
         } else {
             // for query-on-fact-table
             logger.trace("Normal dataflow matching");
-            boolean partialMatchIndex = 
QueryContext.current().isPartialMatchIndex();
             NLayoutCandidate candidateAndInfluence = 
NQueryLayoutChooser.selectLayoutCandidate(dataflow, prunedSegments,
                     digest, secondStorageSegmentLayoutMap);
-            if (partialMatchIndex && candidateAndInfluence == null) {
+            if (candidateAndInfluence == null && 
QueryContext.current().isPartialMatchIndex()) {
+                // This branch is customized requirements
                 logger.trace("Partial dataflow matching");
                 candidateAndInfluence = 
NQueryLayoutChooser.selectPartialLayoutCandidate(dataflow, prunedSegments,
                         digest, secondStorageSegmentLayoutMap);
+            } else if (candidateAndInfluence == null) {
+                logger.debug("select the layout candidate with high data 
integrity.");
+                candidateAndInfluence = 
NQueryLayoutChooser.selectHighIntegrityCandidate(dataflow, prunedSegments,
+                        digest);
             }
             if (candidateAndInfluence != null) {
                 chosenCandidate = candidateAndInfluence;
@@ -94,7 +98,7 @@ public class NDataflowCapabilityChecker {
             } else {
                 result.setSelectedCandidate(chosenCandidate);
             }
-            result.cost = (int) chosenCandidate.getCost();
+            result.setCost(chosenCandidate.getCost());
         } else {
             result.setCapable(false);
         }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
index 42c1648507..ddd2f01d67 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
@@ -31,6 +31,8 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 
+@Getter
+@Setter
 @NoArgsConstructor
 public class CapabilityResult {
 
@@ -43,35 +45,32 @@ public class CapabilityResult {
     /**
      * Is capable or not
      */
-    @Getter
-    @Setter
     private boolean capable = false;
 
     /**
      * selected capable candidate, like Lookup or layout
      */
-    @Getter
-    @Setter
     private IRealizationCandidate selectedCandidate;
 
-    @Getter
-    @Setter
     private IRealizationCandidate selectedStreamingCandidate;
 
-    @Getter
-    @Setter
     private int layoutUnmatchedColsSize;
 
     /**
      * The smaller the cost, the more capable the realization
      */
-    public int cost = Integer.MAX_VALUE;
+    private double cost = Integer.MAX_VALUE;
 
     /**
      * reason of incapable
      */
     public IncapableCause incapableCause;
 
+    /**
+     * The layout has not yet been built in any segment or is currently under 
building.
+     */
+    private boolean isVacant;
+
     /**
      * Marker objects to indicate all special features
      * (dimension-as-measure, topN etc.) that have influenced the capability 
check.
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/HybridRealization.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/HybridRealization.java
index fbd01d6c78..b70691de1a 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/HybridRealization.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/realization/HybridRealization.java
@@ -138,14 +138,14 @@ public class HybridRealization implements IRealization {
                 child = realization.isCapable(digest, 
queryableSeg.getStreamingSegments(), Maps.newHashMap());
                 
result.setSelectedStreamingCandidate(child.getSelectedStreamingCandidate());
                 if (child.isCapable()) {
-                    result.cost = Math.min(result.cost, (int) 
child.getSelectedStreamingCandidate().getCost());
+                    result.setCost(Math.min(result.getCost(), 
child.getSelectedStreamingCandidate().getCost()));
                 }
             } else {
                 child = realization.isCapable(digest, 
queryableSeg.getBatchSegments(),
                         queryableSeg.getChSegToLayoutsMap());
                 result.setSelectedCandidate(child.getSelectedCandidate());
                 if (child.isCapable()) {
-                    result.cost = Math.min(result.cost, (int) 
child.getSelectedCandidate().getCost());
+                    result.setCost(Math.min(result.getCost(), 
child.getSelectedCandidate().getCost()));
                 }
             }
             if (child.isCapable()) {
@@ -156,7 +156,7 @@ public class HybridRealization implements IRealization {
             }
         }
 
-        result.cost--; // let hybrid win its children
+        result.setCost(result.getCost() - 1); // let hybrid win its children
 
         return result;
     }
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/HeterogeneousSegmentPruningTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/HeterogeneousSegmentPruningTest.java
index f568c1b994..4d46270c35 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/HeterogeneousSegmentPruningTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/HeterogeneousSegmentPruningTest.java
@@ -26,18 +26,28 @@ import java.util.stream.Collectors;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.metadata.cube.model.LayoutPartition;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
+import org.apache.kylin.metadata.cube.model.NDataSegDetails;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.realization.NoRealizationFoundException;
 import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.util.MetadataTestUtils;
 import org.apache.kylin.util.OlapContextUtil;
 import org.junit.Assert;
 import org.junit.Test;
@@ -389,7 +399,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
         { // composite filters
             val sqlFilter = sql + "select * from T1\n"
                     + "where (cal_dt = DATE'2012-01-01' or (cast(cal_dt as 
date) = '2012-01-02' or cal_dt = '2012-01-03')) and (cal_dt is not null or 
cal_dt in ('2012-01-01', '2012-01-02'))";
-            val context = getMatchedContexts(project, sqlFilter).get(0);
+            val context = OlapContextUtil.getOlapContexts(project, 
sqlFilter).get(0);
             assertFiltersAndLayout(context, null,
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03),"
                             + "OR(=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01), =(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-02), 
=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)),"
@@ -399,7 +409,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
         { // invalid filters with or
             val sqlFilter = sql + "select * from T1\n"
                     + "where trans_id = 123  or trans_id + 1 = 123 or 
(trans_id + 2 = 234 and trans_id = 345)";
-            val context = getMatchedContexts(project, sqlFilter).get(0);
+            val context = OlapContextUtil.getOlapContexts(project, 
sqlFilter).get(0);
             assertFiltersAndLayout(context, null,
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)");
         }
@@ -407,7 +417,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
         { // invalid filters with and
             val sqlFilter = sql + "select * from T1\n"
                     + "where trans_id = 123 and (trans_id + 2 = 234 or 
trans_id = 345)";
-            val context = getMatchedContexts(project, sqlFilter).get(0);
+            val context = OlapContextUtil.getOlapContexts(project, 
sqlFilter).get(0);
             assertFiltersAndLayout(context, null,
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03),"
                             + "=(DEFAULT.TEST_KYLIN_FACT.TRANS_ID, 123)");
@@ -416,7 +426,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
         { // invalid filters with not
             val sqlFilter = sql + "select * from T1\n"
                     + "where trans_id = 123 and (not(trans_id = 234) or 
trans_id = 345) and (not(trans_id + 1 = 132))";
-            val context = getMatchedContexts(project, sqlFilter).get(0);
+            val context = OlapContextUtil.getOlapContexts(project, 
sqlFilter).get(0);
             assertFiltersAndLayout(context, null,
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03),"
                             + "=(DEFAULT.TEST_KYLIN_FACT.TRANS_ID, 
123),OR(<>(DEFAULT.TEST_KYLIN_FACT.TRANS_ID, 234), 
=(DEFAULT.TEST_KYLIN_FACT.TRANS_ID, 345))");
@@ -455,7 +465,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
         { // inner join
             val sqlInnerJoin = sql + " select * from T1 inner join T2 on 
T1.cal_dt = T2.cal_dt \n"
                     + " where T1.cal_dt = '2012-01-01' and T2.cal_dt = DATE 
'2012-01-02'";
-            val contexts = getMatchedContexts(project, sqlInnerJoin);
+            val contexts = OlapContextUtil.getOlapContexts(project, 
sqlInnerJoin);
             assertFiltersAndLayout(contexts.get(0), seg1Id,
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-03),=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01)");
             assertFiltersAndLayout(contexts.get(1), seg2Id,
@@ -465,7 +475,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
         { // left join
             val sqlLeftJoin = sql + " select * from T1 left join T2 on 
T1.cal_dt = T2.cal_dt \n"
                     + " where T1.cal_dt = DATE '2012-01-01' and T2.cal_dt = 
DATE '2012-01-02'";
-            val contexts = getMatchedContexts(project, sqlLeftJoin);
+            val contexts = OlapContextUtil.getOlapContexts(project, 
sqlLeftJoin);
             assertFiltersAndLayout(contexts.get(0), seg1Id,
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-03),=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-01)");
             assertFiltersAndLayout(contexts.get(1), null,
@@ -475,7 +485,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
         { // right join
             val sqlRightJoin = sql + " select * from T1 right join T2 on 
T1.cal_dt = T2.cal_dt \n"
                     + " where T1.cal_dt = DATE '2012-01-01' and T2.cal_dt = 
DATE '2012-01-02'";
-            val contexts = getMatchedContexts(project, sqlRightJoin);
+            val contexts = OlapContextUtil.getOlapContexts(project, 
sqlRightJoin);
             assertFiltersAndLayout(contexts.get(0), seg2Id,
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-02),=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-02)");
             assertFiltersAndLayout(contexts.get(1), null,
@@ -485,7 +495,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
         { // full join
             val sqlFullJoin = sql + " select * from T1 full join T2 on 
T1.cal_dt = T2.cal_dt \n"
                     + " where T1.cal_dt = DATE '2012-01-01' and T2.cal_dt = 
DATE '2012-01-02'";
-            val contexts = getMatchedContexts(project, sqlFullJoin);
+            val contexts = OlapContextUtil.getOlapContexts(project, 
sqlFullJoin);
             assertFiltersAndLayout(contexts.get(0), null,
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)");
             assertFiltersAndLayout(contexts.get(1), null,
@@ -529,7 +539,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
                     + "(cal_dt='2012-01-01' and trans_id = 15) or\n" + 
"(cal_dt='2012-01-01' and trans_id = 16) or\n"
                     + "(cal_dt='2012-01-01' and trans_id = 17) or\n" + 
"(cal_dt='2012-01-01' and trans_id = 18) or\n"
                     + "(cal_dt='2012-01-01' and trans_id = 19) or\n" + 
"(cal_dt='2012-01-01' and trans_id = 20)";
-            val contexts = getMatchedContexts(project, sqlWithTooManyOrs);
+            val contexts = OlapContextUtil.getOlapContexts(project, 
sqlWithTooManyOrs);
             Assert.assertEquals(
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)",
                     
contexts.get(0).getExpandedFilterConditions().stream().map(RexNode::toString)
@@ -543,7 +553,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
                     + "(cal_dt='2012-01-01' and trans_id = 6) or\n" + 
"(cal_dt='2012-01-01' and trans_id = 7) or\n"
                     + "(cal_dt='2012-01-01' and trans_id = 8) or\n" + 
"(cal_dt='2012-01-01' and trans_id = 9) or\n"
                     + "(cal_dt='2012-01-01' and trans_id = 10)";
-            val contexts = getMatchedContexts(project, sqlWithFilter);
+            val contexts = OlapContextUtil.getOlapContexts(project, 
sqlWithFilter);
             Assert.assertNotEquals(
                     ">=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 
2012-01-01),<=(DEFAULT.TEST_KYLIN_FACT.CAL_DT, 2012-01-03)",
                     
contexts.get(0).getExpandedFilterConditions().stream().map(RexNode::toString)
@@ -552,8 +562,113 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
 
     }
 
-    private List<OLAPContext> getMatchedContexts(String project, String sql) 
throws SqlParseException {
-        return OlapContextUtil.getOlapContexts(project, sql);
+    @Test
+    public void testQueryWithEmptySegment() throws SqlParseException {
+        // layout 20000000001, tableindex
+        // layout 20001, cal_dt & trans_id
+        // layout 10001, cal_dt
+        // layout 1, trans_id
+
+        // segment1 [2012-01-01, 2012-01-02] layout 20000000001, 20001
+        // segment2 [2012-01-02, 2012-01-03] layout 20000000001, 20001, 10001
+        // segment3 [2012-01-03, 2012-01-04] layout 20001, 10001, 1
+        // segment4 [2012-01-04, 2012-01-05] layout 10001, 1
+        // segment5 [2012-01-05, 2012-01-06] layout 20000000001, 20001, 10001, 
1
+
+        val project = "heterogeneous_segment";
+        val dfId = "747f864b-9721-4b97-acde-0aa8e8656cba";
+        // val seg1Id = "8892fa3f-f607-4eec-8159-7c5ae2f16942"   
[20120101000000_20120102000000]
+        // val seg2Id = "d75a822c-788a-4592-a500-cf20186dded1"   
[20120102000000_20120103000000]
+        val seg3Id = "54eaf96d-6146-45d2-b94e-d5d187f89919"; // 
[20120103000000_20120104000000]
+        // val seg4Id = "411f40b9-a80a-4453-90a9-409aac6f7632"   
[20120104000000_20120105000000]
+        // val seg5Id = "a8318597-cb75-416f-8eb8-96ea285dd2b4"   
[20120105000000_20120106000000]
+
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            NDataflowManager dfMgr = 
NDataflowManager.getInstance(getTestConfig(), project);
+            NDataflow dataflow = dfMgr.getDataflow(dfId);
+            NDataSegment latestReadySegment = dataflow.getSegment(seg3Id);
+            if (latestReadySegment != null) {
+                NDataSegDetails segDetails = 
latestReadySegment.getSegDetails();
+                List<NDataLayout> allLayouts = segDetails.getAllLayouts();
+
+                // update
+                NDataflowUpdate dataflowUpdate = new NDataflowUpdate(dfId);
+                NDataLayout[] toRemoveLayouts = allLayouts.stream()
+                        .filter(dataLayout -> dataLayout.getLayoutId() == 
20001).toArray(NDataLayout[]::new);
+                dataflowUpdate.setToRemoveLayouts(toRemoveLayouts);
+                dfMgr.updateDataflow(dataflowUpdate);
+            }
+            return null;
+        }, project);
+
+        val sql = "select cal_dt, sum(price), count(*) from test_kylin_fact 
inner join test_account \n"
+                + "on test_kylin_fact.seller_id = test_account.account_id \n"
+                + "where cal_dt between date'2012-01-01' and 
date'2012-01-03'\n" //
+                + "group by cal_dt\n";
+        // can not query
+        {
+            OLAPContext olapContext = OlapContextUtil.getOlapContexts(project, 
sql).get(0);
+            StorageContext storageContext = olapContext.storageContext;
+            Assert.assertEquals(-1L, storageContext.getLayoutId().longValue());
+        }
+
+        {
+            MetadataTestUtils.updateProjectConfig(project, 
"kylin.query.index-match-rules",
+                    KylinConfigBase.USE_VACANT_INDEXES);
+            try (QueryContext queryContext = QueryContext.current()) {
+                OLAPContext olapContext = 
OlapContextUtil.getOlapContexts(project, sql).get(0);
+                StorageContext storageContext = olapContext.storageContext;
+                Assert.assertEquals(10001L, 
storageContext.getLayoutId().longValue());
+                Assert.assertFalse(queryContext.getQueryTagInfo().isVacant());
+            }
+        }
+    }
+
+    @Test
+    public void testQueryWithAllSegmentsAreEmpty() throws SqlParseException {
+        // layout 20000000001, tableindex
+        // layout 20001, cal_dt & trans_id
+        // layout 10001, cal_dt
+        // layout 1, trans_id
+
+        // segment1 [2012-01-01, 2012-01-02] layout 20000000001, 20001
+        // segment2 [2012-01-02, 2012-01-03] layout 20000000001, 20001, 10001
+        // segment3 [2012-01-03, 2012-01-04] layout 20001, 10001, 1
+        // segment4 [2012-01-04, 2012-01-05] layout 10001, 1
+        // segment5 [2012-01-05, 2012-01-06] layout 20000000001, 20001, 10001, 
1
+
+        val project = "heterogeneous_segment";
+        val dfId = "747f864b-9721-4b97-acde-0aa8e8656cba";
+        // val seg1Id = "8892fa3f-f607-4eec-8159-7c5ae2f16942"   
[20120101000000_20120102000000]
+        // val seg2Id = "d75a822c-788a-4592-a500-cf20186dded1"   
[20120102000000_20120103000000]
+        // val seg3Id = "54eaf96d-6146-45d2-b94e-d5d187f89919"   
[20120103000000_20120104000000]
+        // val seg4Id = "411f40b9-a80a-4453-90a9-409aac6f7632"   
[20120104000000_20120105000000]
+        // val seg5Id = "a8318597-cb75-416f-8eb8-96ea285dd2b4"   
[20120105000000_20120106000000]
+
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            NDataflowManager dfMgr = 
NDataflowManager.getInstance(getTestConfig(), project);
+            NDataflow dataflow = dfMgr.getDataflow(dfId);
+            Segments<NDataSegment> segments = dataflow.getSegments();
+            // update
+            NDataflowUpdate dataflowUpdate = new NDataflowUpdate(dfId);
+            dataflowUpdate.setToRemoveSegs(segments.toArray(new 
NDataSegment[0]));
+            dfMgr.updateDataflow(dataflowUpdate);
+            return null;
+        }, project);
+
+        val sql = "select cal_dt, sum(price), count(*) from test_kylin_fact 
inner join test_account \n"
+                + "on test_kylin_fact.seller_id = test_account.account_id \n"
+                + "where cal_dt between date'2012-01-01' and 
date'2012-01-03'\n" //
+                + "group by cal_dt\n";
+
+        MetadataTestUtils.updateProjectConfig(project, 
"kylin.query.index-match-rules",
+                KylinConfigBase.USE_VACANT_INDEXES);
+        try (QueryContext queryContext = QueryContext.current()) {
+            OLAPContext olapContext = OlapContextUtil.getOlapContexts(project, 
sql).get(0);
+            StorageContext storageContext = olapContext.storageContext;
+            Assert.assertEquals(-1L, storageContext.getLayoutId().longValue());
+            Assert.assertFalse(queryContext.getQueryTagInfo().isVacant());
+        }
     }
 
     private void assertFiltersAndLayout(OLAPContext context, String segId, 
String expectedFilterCond) {
@@ -567,7 +682,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
     }
 
     private void assertNoRealizationFound(String project, String sql) throws 
SqlParseException {
-        val context = getMatchedContexts(project, sql).get(0);
+        val context = OlapContextUtil.getOlapContexts(project, sql).get(0);
         Assert.assertNull(context.realization);
     }
 
@@ -580,7 +695,7 @@ public class HeterogeneousSegmentPruningTest extends 
NLocalWithSparkSessionTest
     private void assertPrunedSegmentsRange(String project, String sql, String 
dfId,
             List<Pair<String, String>> expectedRanges, long expectedLayoutId,
             Map<String, List<Long>> expectedPartitions, String 
expectedFilterCond) throws SqlParseException {
-        val context = getMatchedContexts(project, sql).get(0);
+        val context = OlapContextUtil.getOlapContexts(project, sql).get(0);
 
         if (expectedLayoutId == -1L) {
             Assert.assertTrue(context.storageContext.isEmptyLayout());
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/NDataflowCapabilityCheckerTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/NDataflowCapabilityCheckerTest.java
index b5782823c2..5199633912 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/NDataflowCapabilityCheckerTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/NDataflowCapabilityCheckerTest.java
@@ -49,7 +49,7 @@ public class NDataflowCapabilityCheckerTest extends 
NLocalWithSparkSessionTest {
         CapabilityResult result = NDataflowCapabilityChecker.check(dataflow, 
dataflow.getQueryableSegments(),
                 olapContext.getSQLDigest(), null);
         Assert.assertNotNull(result);
-        Assert.assertEquals((int) result.getSelectedCandidate().getCost(), 
result.cost);
+        Assert.assertEquals(result.getSelectedCandidate().getCost(), 
result.getCost(), 0.001);
     }
 
     @Test
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/VacantIndexPruningRuleTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/VacantIndexPruningRuleTest.java
new file mode 100644
index 0000000000..75ab026e9b
--- /dev/null
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/VacantIndexPruningRuleTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.routing;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
+import org.apache.kylin.metadata.cube.model.NDataSegDetails;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.util.MetadataTestUtils;
+import org.apache.kylin.util.OlapContextUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+@MetadataInfo
+class VacantIndexPruningRuleTest extends NLocalWithSparkSessionTest {
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        super.tearDown();
+    }
+
+    @Override
+    public String getProject() {
+        return "default";
+    }
+
+    @Test
+    void testWithIndexMatchRulesUseVacantIndexes() throws SqlParseException {
+        NDataModelManager modelMgr = 
NDataModelManager.getInstance(getTestConfig(), getProject());
+        modelMgr.listAllModels().stream().filter(model -> !model.isBroken())
+                .forEach(model -> 
cleanAlreadyExistingLayoutsInSegments(model.getId()));
+
+        MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.query.index-match-rules",
+                KylinConfigBase.USE_VACANT_INDEXES);
+        try (QueryContext queryContext = QueryContext.current()) {
+            String sql = "select max(LO_ORDERDATE) from ssb.lineorder";
+            List<OLAPContext> olapContexts = 
OlapContextUtil.getOlapContexts(getProject(), sql);
+            OLAPContext olapContext = olapContexts.get(0);
+            StorageContext storageContext = olapContext.storageContext;
+            Assertions.assertTrue(storageContext.isEmptyLayout());
+            Assertions.assertTrue(queryContext.getQueryTagInfo().isVacant());
+        }
+    }
+
+    @Test
+    void testUnmatchedWithNullResult() throws SqlParseException {
+        String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a";
+        String sql = "select max(LO_ORDERPRIOTITY) from ssb.lineorder";
+        List<OLAPContext> olapContexts = 
OlapContextUtil.getOlapContexts(getProject(), sql);
+        OLAPContext olapContext = olapContexts.get(0);
+        NDataflow df = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+        Map<String, String> matchedJoinGraphAliasMap = 
OlapContextUtil.matchJoins(df.getModel(), olapContext);
+        olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap);
+
+        Candidate candidate = new Candidate(df, olapContext, 
matchedJoinGraphAliasMap);
+        VacantIndexPruningRule vacantIndexPruningRule = new 
VacantIndexPruningRule();
+        vacantIndexPruningRule.apply(candidate);
+        Assertions.assertNull(candidate.getCapability());
+    }
+
+    @Test
+    void testUnmatchedWithRealizationIsStreaming() throws SqlParseException {
+        String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a";
+        String sql = "select max(LO_ORDERDATE) from ssb.lineorder";
+        NDataModelManager modelMgr = 
NDataModelManager.getInstance(getTestConfig(), getProject());
+        modelMgr.listAllModels().stream().filter(model -> !model.isBroken())
+                .forEach(model -> 
cleanAlreadyExistingLayoutsInSegments(model.getId()));
+        List<OLAPContext> olapContexts = 
OlapContextUtil.getOlapContexts(getProject(), sql);
+        OLAPContext olapContext = olapContexts.get(0);
+        NDataflow df = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+        Map<String, String> matchedJoinGraphAliasMap = 
OlapContextUtil.matchJoins(df.getModel(), olapContext);
+        olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap);
+
+        Candidate candidate = new Candidate(df, olapContext, 
matchedJoinGraphAliasMap);
+        {
+            df.getModel().setModelType(NDataModel.ModelType.STREAMING);
+            candidate.setCapability(new CapabilityResult());
+            VacantIndexPruningRule vacantIndexPruningRule = new 
VacantIndexPruningRule();
+            vacantIndexPruningRule.apply(candidate);
+            Assertions.assertFalse(candidate.getCapability().isCapable());
+        }
+
+        {
+            df.getModel().setModelType(NDataModel.ModelType.HYBRID);
+            candidate.setCapability(new CapabilityResult());
+            VacantIndexPruningRule vacantIndexPruningRule = new 
VacantIndexPruningRule();
+            vacantIndexPruningRule.apply(candidate);
+            Assertions.assertFalse(candidate.getCapability().isCapable());
+        }
+    }
+
+    @Test
+    void testUnmatchedAggIndex() throws SqlParseException {
+        String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a";
+        String sql = "select max(LO_ORDERPRIOTITY) from ssb.lineorder";
+        List<OLAPContext> olapContexts = 
OlapContextUtil.getOlapContexts(getProject(), sql);
+        OLAPContext olapContext = olapContexts.get(0);
+        NDataflow df = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+        Map<String, String> matchedJoinGraphAliasMap = 
OlapContextUtil.matchJoins(df.getModel(), olapContext);
+        olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap);
+
+        Candidate candidate = new Candidate(df, olapContext, 
matchedJoinGraphAliasMap);
+        candidate.setCapability(new CapabilityResult());
+        VacantIndexPruningRule vacantIndexPruningRule = new 
VacantIndexPruningRule();
+        vacantIndexPruningRule.apply(candidate);
+        Assertions.assertFalse(candidate.getCapability().isCapable());
+    }
+
+    @Test
+    void testMatchedTableIndex() throws SqlParseException {
+        String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a";
+        String sql = "select LO_QUANTITY from ssb.lineorder";
+        NDataModelManager modelMgr = 
NDataModelManager.getInstance(getTestConfig(), getProject());
+        modelMgr.listAllModels().stream().filter(model -> !model.isBroken())
+                .forEach(model -> 
cleanAlreadyExistingLayoutsInSegments(model.getId()));
+        List<OLAPContext> olapContexts = 
OlapContextUtil.getOlapContexts(getProject(), sql);
+        OLAPContext olapContext = olapContexts.get(0);
+        NDataflow df = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+        Map<String, String> matchedJoinGraphAliasMap = 
OlapContextUtil.matchJoins(df.getModel(), olapContext);
+        olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap);
+
+        Candidate candidate = new Candidate(df, olapContext, 
matchedJoinGraphAliasMap);
+        candidate.setCapability(new CapabilityResult());
+        VacantIndexPruningRule vacantIndexPruningRule = new 
VacantIndexPruningRule();
+        vacantIndexPruningRule.apply(candidate);
+        Assertions.assertTrue(candidate.getCapability().isCapable());
+        Assertions.assertTrue(candidate.getCapability().isVacant());
+        NLayoutCandidate selectedCandidate = (NLayoutCandidate) 
candidate.getCapability().getSelectedCandidate();
+        Assertions.assertEquals(20000000001L, 
selectedCandidate.getLayoutEntity().getId());
+    }
+
+    @Test
+    void testMatchedAggIndex() throws SqlParseException {
+        String modelId = "d67bf0e4-30f4-9248-2528-52daa80be91a";
+        String sql = "select max(LO_ORDERDATE) from ssb.lineorder";
+        NDataModelManager modelMgr = 
NDataModelManager.getInstance(getTestConfig(), getProject());
+        modelMgr.listAllModels().stream().filter(model -> !model.isBroken())
+                .forEach(model -> 
cleanAlreadyExistingLayoutsInSegments(model.getId()));
+        List<OLAPContext> olapContexts = 
OlapContextUtil.getOlapContexts(getProject(), sql);
+        OLAPContext olapContext = olapContexts.get(0);
+        NDataflow df = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+        Map<String, String> matchedJoinGraphAliasMap = 
OlapContextUtil.matchJoins(df.getModel(), olapContext);
+        olapContext.fixModel(df.getModel(), matchedJoinGraphAliasMap);
+
+        Candidate candidate = new Candidate(df, olapContext, 
matchedJoinGraphAliasMap);
+        candidate.setCapability(new CapabilityResult());
+        VacantIndexPruningRule vacantIndexPruningRule = new 
VacantIndexPruningRule();
+        vacantIndexPruningRule.apply(candidate);
+        Assertions.assertTrue(candidate.getCapability().isCapable());
+        Assertions.assertTrue(candidate.getCapability().isVacant());
+        NLayoutCandidate selectedCandidate = (NLayoutCandidate) 
candidate.getCapability().getSelectedCandidate();
+        Assertions.assertEquals(1L, 
selectedCandidate.getLayoutEntity().getId());
+    }
+
+    private void cleanAlreadyExistingLayoutsInSegments(String modelId) {
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            NDataflowManager dfMgr = 
NDataflowManager.getInstance(getTestConfig(), getProject());
+            NDataflow dataflow = dfMgr.getDataflow(modelId);
+            NDataSegment latestReadySegment = dataflow.getLatestReadySegment();
+            if (latestReadySegment != null) {
+                NDataSegDetails segDetails = 
latestReadySegment.getSegDetails();
+                List<NDataLayout> allLayouts = segDetails.getAllLayouts();
+
+                // update
+                NDataflowUpdate dataflowUpdate = new NDataflowUpdate(modelId);
+                dataflowUpdate.setToRemoveLayouts(allLayouts.toArray(new 
NDataLayout[0]));
+                dfMgr.updateDataflow(dataflowUpdate);
+            }
+            return null;
+        }, getProject());
+    }
+}
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java
index 5fe45a9425..3db016ffbf 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java
@@ -90,10 +90,6 @@ public class Candidate {
         this.matchedJoinsGraphAliasMap = matchedJoinsGraphAliasMap;
     }
 
-    // for testing only
-    Candidate() {
-    }
-
     @Override
     public String toString() {
         return realization.toString();
@@ -125,7 +121,7 @@ public class Candidate {
     }
 
     public static Comparator<Candidate> realizationCapabilityCostSorter() {
-        return Comparator.comparingDouble(c -> c.getCapability().cost);
+        return Comparator.comparingDouble(c -> c.getCapability().getCost());
     }
 
     public static Comparator<Candidate> modelUuidSorter() {
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
index 1105ee4e9c..3222c8dec6 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
@@ -25,7 +25,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Ordering;
 import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.query.relnode.OLAPContext;
 
 import lombok.Getter;
 
@@ -35,27 +34,26 @@ public class QueryRouter {
     }
 
     public static void applyRules(Candidate candidate) {
-        Strategy pruningStrategy = getStrategy(candidate.getCtx());
+        Strategy pruningStrategy = 
getStrategy(candidate.getCtx().olapSchema.getProjectName());
         for (PruningRule r : pruningStrategy.getRules()) {
             r.apply(candidate);
         }
     }
 
-    public static void sortCandidates(OLAPContext context, List<Candidate> 
candidates) {
-        Strategy strategy = getStrategy(context);
+    public static void sortCandidates(String project, List<Candidate> 
candidates) {
+        Strategy strategy = getStrategy(project);
         candidates.sort(strategy.getSorter());
     }
 
-    private static Strategy getStrategy(OLAPContext context) {
-        String project = context.olapSchema.getProjectName();
-        KylinConfig projectConfig = NProjectManager.getProjectConfig(project);
-        return new Strategy(projectConfig);
+    private static Strategy getStrategy(String project) {
+        return new Strategy(NProjectManager.getProjectConfig(project));
     }
 
     public static class Strategy {
         private static final PruningRule SEGMENT_PRUNING = new 
SegmentPruningRule();
         private static final PruningRule PARTITION_PRUNING = new 
PartitionPruningRule();
         private static final PruningRule REMOVE_INCAPABLE_REALIZATIONS = new 
RemoveIncapableRealizationsRule();
+        private static final PruningRule VACANT_INDEX_PRUNING = new 
VacantIndexPruningRule();
 
         @Getter
         List<PruningRule> rules = Lists.newArrayList();
@@ -72,6 +70,9 @@ public class QueryRouter {
             rules.add(SEGMENT_PRUNING);
             rules.add(PARTITION_PRUNING);
             rules.add(REMOVE_INCAPABLE_REALIZATIONS);
+            if (config.isVacantIndexPruningEnabled()) {
+                rules.add(VACANT_INDEX_PRUNING);
+            }
 
             // add all sorters
             if (config.useTableIndexAnswerSelectStarEnabled()) {
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
index 13720f8932..0613f62d72 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
@@ -227,7 +227,7 @@ public class RealizationChooser {
         }
 
         // Step 3. find the lowest-cost candidate
-        QueryRouter.sortCandidates(context, candidates);
+        QueryRouter.sortCandidates(context.olapSchema.getProjectName(), 
candidates);
         logger.trace("Cost Sorted Realizations {}", candidates);
         Candidate candidate = candidates.get(0);
         restoreOLAPContextProps(context, candidate.getRewrittenCtx());
@@ -235,6 +235,15 @@ public class RealizationChooser {
         adjustForCapabilityInfluence(candidate, context);
 
         context.realization = candidate.realization;
+        if (candidate.getCapability().isVacant()) {
+            QueryContext.current().getQueryTagInfo().setVacant(true);
+            NLayoutCandidate layoutCandidate = (NLayoutCandidate) 
candidate.capability.getSelectedCandidate();
+            context.storageContext.setCandidate(layoutCandidate);
+            
context.storageContext.setLayoutId(layoutCandidate.getLayoutEntity().getId());
+            context.storageContext.setEmptyLayout(true);
+            return;
+        }
+
         if (candidate.capability.getSelectedCandidate() instanceof 
NLookupCandidate) {
             boolean useSnapshot = 
context.isFirstTableLookupTableInModel(context.realization.getModel());
             context.storageContext.setUseSnapshot(useSnapshot);
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/VacantIndexPruningRule.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/VacantIndexPruningRule.java
new file mode 100644
index 0000000000..ac5ef26d1c
--- /dev/null
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/VacantIndexPruningRule.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.routing;
+
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.cube.cuboid.AggIndexMatcher;
+import org.apache.kylin.metadata.cube.cuboid.ChooserContext;
+import org.apache.kylin.metadata.cube.cuboid.IndexMatcher;
+import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
+import org.apache.kylin.metadata.cube.cuboid.TableIndexMatcher;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.model.AntiFlatChecker;
+import org.apache.kylin.metadata.model.ColExcludedChecker;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.HybridRealization;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.query.util.QueryInterruptChecker;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class VacantIndexPruningRule extends PruningRule {
+    @Override
+    public void apply(Candidate candidate) {
+        if (nonBatchRealizationSkipEmptySegments(candidate)) {
+            log.info("{}({}/{}): only batch model support this feature, but 
the type of this model is {}",
+                    this.getClass().getName(), 
candidate.getRealization().getProject(),
+                    candidate.getRealization().getCanonicalName(),
+                    candidate.getRealization().getModel().getModelType());
+            return;
+        }
+
+        if (candidate.getCapability() == null || 
candidate.getCapability().isCapable()) {
+            log.debug("skip the rule of {}.", this.getClass().getName());
+            return;
+        }
+
+        List<IRealization> realizations = 
candidate.getRealization().getRealizations();
+        if (CollectionUtils.isEmpty(realizations)) {
+            log.warn("It seems that unlikely things happened when matching 
indexes haven't built. "
+                    + "Expected size of realizations(models) is 1.");
+            return;
+        }
+        NDataflow dataflow = (NDataflow) realizations.get(0);
+        CapabilityResult capabilityResult = match(dataflow, 
candidate.getCtx().getSQLDigest());
+        candidate.setCapability(capabilityResult);
+    }
+
+    private CapabilityResult match(NDataflow dataflow, SQLDigest digest) {
+        CapabilityResult result = new CapabilityResult();
+        log.info("Try matching no built indexes from model.");
+
+        NLayoutCandidate layoutCandidate = selectLayoutCandidate(dataflow, 
digest);
+        if (layoutCandidate != null) {
+            
result.influences.addAll(layoutCandidate.getCapabilityResult().influences);
+            
result.setLayoutUnmatchedColsSize(layoutCandidate.getCapabilityResult().getLayoutUnmatchedColsSize());
+            result.setSelectedCandidate(layoutCandidate);
+            log.info("Matched layout {} snapshot in dataflow {} ", 
layoutCandidate, dataflow);
+            result.setCapable(true);
+            result.setVacant(true);
+        } else {
+            result.setCapable(false);
+        }
+
+        return result;
+    }
+
+    private NLayoutCandidate selectLayoutCandidate(NDataflow dataflow, 
SQLDigest sqlDigest) {
+
+        // This section is same as NQueryLayoutChooser#selectLayoutCandidate.
+        String project = dataflow.getProject();
+        NDataModel model = dataflow.getModel();
+        KylinConfig projectConfig = NProjectManager.getProjectConfig(project);
+        ChooserContext chooserContext = new ChooserContext(sqlDigest, 
dataflow);
+        ColExcludedChecker excludedChecker = new 
ColExcludedChecker(projectConfig, project, model);
+        AntiFlatChecker antiFlatChecker = new 
AntiFlatChecker(model.getJoinTables(), model);
+        AggIndexMatcher aggIndexMatcher = new AggIndexMatcher(sqlDigest, 
chooserContext, dataflow, excludedChecker,
+                antiFlatChecker);
+        TableIndexMatcher tableIndexMatcher = new TableIndexMatcher(sqlDigest, 
chooserContext, dataflow,
+                excludedChecker, antiFlatChecker);
+
+        if (chooserContext.isIndexMatchersInvalid()) {
+            return null;
+        }
+
+        // Find a layout that can match the sqlDigest, even if it hasn't been 
built yet.
+        NLayoutCandidate candidate = findCandidate(dataflow, aggIndexMatcher, 
tableIndexMatcher);
+        QueryInterruptChecker.checkThreadInterrupted("Interrupted exception 
occurs.",
+                "Current step were matching indexes haven't built ");
+        return candidate;
+    }
+
+    private static NLayoutCandidate findCandidate(NDataflow dataflow, 
AggIndexMatcher aggIndexMatcher,
+            TableIndexMatcher tableIndexMatcher) {
+        List<LayoutEntity> allLayouts = 
dataflow.getIndexPlan().getAllLayouts();
+        for (LayoutEntity layout : allLayouts) {
+            NLayoutCandidate candidate = new NLayoutCandidate(layout);
+            IndexMatcher.MatchResult matchResult = 
tableIndexMatcher.match(layout);
+            if (!matchResult.isMatched()) {
+                matchResult = aggIndexMatcher.match(layout);
+            }
+
+            if (!matchResult.isMatched()) {
+                continue;
+            }
+
+            CapabilityResult tempResult = new CapabilityResult();
+            tempResult.setSelectedCandidate(candidate);
+            candidate.setCapabilityResult(tempResult);
+            return candidate;
+        }
+        return null;
+    }
+
+    private boolean nonBatchRealizationSkipEmptySegments(Candidate candidate) {
+        IRealization realization = candidate.getRealization();
+        return realization instanceof HybridRealization || 
realization.isStreaming();
+    }
+}
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 455cf305c7..bbee32c7dc 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -25,6 +25,8 @@ import java.util.List;
 
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
 import org.apache.kylin.metadata.query.NativeQueryRealization;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.slf4j.Logger;
@@ -32,8 +34,6 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -68,6 +68,9 @@ public class SQLResponse implements Serializable {
 
     protected boolean isPartial = false;
 
+    @JsonProperty("vacant")
+    private boolean isVacant;
+
     private List<Long> scanRows;
 
     private List<Long> scanBytes;
@@ -148,9 +151,8 @@ public class SQLResponse implements Serializable {
         }
     }
 
-    public SQLResponse(List<SelectedColumnMeta> columnMetas, 
List<List<String>> results,
-            int affectedRowCount, boolean isException, String exceptionMessage,
-            boolean isPartial, boolean isPushDown) {
+    public SQLResponse(List<SelectedColumnMeta> columnMetas, 
List<List<String>> results, int affectedRowCount,
+            boolean isException, String exceptionMessage, boolean isPartial, 
boolean isPushDown) {
         this.columnMetas = columnMetas;
         this.results = results;
         this.affectedRowCount = affectedRowCount;
@@ -165,8 +167,7 @@ public class SQLResponse implements Serializable {
     }
 
     public SQLResponse(List<SelectedColumnMeta> columnMetas, 
Iterable<List<String>> results, int resultSize,
-            int affectedRowCount, boolean isException, String exceptionMessage,
-            boolean isPartial, boolean isPushDown) {
+            int affectedRowCount, boolean isException, String 
exceptionMessage, boolean isPartial, boolean isPushDown) {
         this.columnMetas = columnMetas;
         this.results = results;
         this.affectedRowCount = affectedRowCount;
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 3a821568d4..3896737392 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -81,6 +81,14 @@ import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.guava30.shaded.common.base.Joiner;
+import org.apache.kylin.guava30.shaded.common.collect.Collections2;
+import org.apache.kylin.guava30.shaded.common.collect.HashMultimap;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.SetMultimap;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.acl.AclTCR;
@@ -162,14 +170,6 @@ import org.springframework.stereotype.Component;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
-import org.apache.kylin.guava30.shaded.common.base.Joiner;
-import org.apache.kylin.guava30.shaded.common.collect.Collections2;
-import org.apache.kylin.guava30.shaded.common.collect.HashMultimap;
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.SetMultimap;
 import com.google.gson.Gson;
 
 import lombok.AllArgsConstructor;
@@ -1364,7 +1364,9 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
 
         response.setNativeRealizations(OLAPContext.getNativeRealizations());
 
-        setAppMaterURL(response);
+        if (!queryContext.getQueryTagInfo().isVacant()) {
+            setAppMaterURL(response);
+        }
 
         if (isPushDown) {
             response.setNativeRealizations(Lists.newArrayList());
@@ -1381,6 +1383,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
 
         response.setEngineType(QueryHistory.EngineType.NATIVE.name());
         
response.setSignature(QueryCacheSignatureUtil.createCacheSignature(response, 
project));
+        
response.setVacant(QueryContext.current().getQueryTagInfo().isVacant());
 
         if (QueryContext.current().getMetrics().getQueryExecutedPlan() != 
null) {
             
response.setExecutedPlan(QueryContext.current().getMetrics().getQueryExecutedPlan());
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java 
b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
index 0355fb60bd..bd41205514 100644
--- 
a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
+++ 
b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
@@ -82,11 +82,10 @@ public class QueryContextCutter {
             }
         }
 
-        ContextUtil.dumpCalcitePlan(
-                "cannot find proper realizations After re-cut " + 
MAX_RETRY_TIMES_OF_CONTEXT_CUT + " times", root, log);
-        log.error("too many unmatched join in this query, please check it or 
create correspond realization");
-        throw new NoRealizationFoundException(
-                "too many unmatched join in this query, please check it or 
create correspond realization");
+        String errorMsg = "too many unmatched joins in this query, please 
check it or create corresponding realization.";
+        ContextUtil.dumpCalcitePlan("cannot find proper realizations After 
re-cut " + MAX_RETRY_TIMES_OF_CONTEXT_CUT
+                + " times. \nError: " + errorMsg, root, log);
+        throw new NoRealizationFoundException(errorMsg);
     }
 
     private static List<OLAPContext> 
collectContextInfoAndSelectRealization(RelNode queryRoot) {
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java 
b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java
index f9799f42ff..530bafff85 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java
@@ -18,287 +18,107 @@
 
 package org.apache.kylin.query.routing;
 
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Ordering;
-import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
-import org.apache.kylin.metadata.cube.model.NDataSegment;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.CapabilityResult;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.QueryableSeg;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
-import lombok.val;
-
-public class CandidateSortTest {
+class CandidateSortTest {
 
     @Test
-    public void testModelHintCandidateSort() {
+    void testModelPrioritySorter() {
         try (QueryContext queryContext = QueryContext.current()) {
+            Comparator<Candidate> sorter = Candidate.modelPrioritySorter();
+
+            // assert that c1 is more prioritary than c2
             {
-                queryContext.setModelPriorities(new String[] {});
-                val model1 = mockCandidate("model0001", "modelA", 1, 1);
-                val model2 = mockCandidate("model0002", "modelB", 2, 2);
-                sort(model1, model2).assertFirst(model1);
+                String[] modelPriorities = CandidateTestUtils
+                        .mockModelPriorityValues(new String[] { "modelA", 
"modelB" });
+                queryContext.setModelPriorities(modelPriorities);
+                Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 1, 1);
+                Candidate c2 = CandidateTestUtils.mockCandidate("model0001", 
"modelB", 1, 1);
+
+                Assertions.assertEquals(-1, sorter.compare(c1, c2));
+                Assertions.assertEquals(1, sorter.compare(c2, c1));
+                assertSortResult(c1, sorter, Lists.newArrayList(c1, c2));
             }
 
+            // with empty model priorities, c1 and c2 has the same priority
             {
-                queryContext.setModelPriorities(new String[] { "MODELB" });
-                val model1 = mockCandidate("model0001", "modelA", 1, 1);
-                val model2 = mockCandidate("model0002", "modelB", 2, 2);
-                sort(model1, model2).assertFirst(model2);
+                queryContext.setModelPriorities(new String[] {});
+                Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 1, 1);
+                Candidate c2 = CandidateTestUtils.mockCandidate("model0002", 
"modelB", 2, 2);
+                assertSortResult(c1, sorter, Lists.newArrayList(c1, c2));
             }
 
             {
-                queryContext.setModelPriorities(new String[] { "MODELB", 
"MODELA" });
-                val model1 = mockCandidate("model0001", "modelA", 1, 1);
-                val model2 = mockCandidate("model0002", "modelB", 2, 2);
-                sort(model1, model2).assertFirst(model2);
+                
queryContext.setModelPriorities(CandidateTestUtils.mockModelPriorityValues(new 
String[] { "modelB" }));
+                Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 1, 1);
+                Candidate c2 = CandidateTestUtils.mockCandidate("model0002", 
"modelB", 2, 2);
+                Assertions.assertEquals(Integer.MAX_VALUE, sorter.compare(c1, 
c2));
+                assertSortResult(c2, sorter, Lists.newArrayList(c1, c2));
             }
 
             {
-                queryContext.setModelPriorities(new String[] { "MODELC", 
"MODELA" });
-                val model1 = mockCandidate("model0001", "modelA", 1, 1);
-                val model2 = mockCandidate("model0002", "modelB", 2, 2);
-                val model3 = mockCandidate("model0003", "modelC", 4, 4);
-                sort(model1, model2, model3).assertFirst(model3);
+                queryContext.setModelPriorities(new String[] { "MODELB", 
"MODELA" });
+                Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 1, 1);
+                Candidate c2 = CandidateTestUtils.mockCandidate("model0002", 
"modelB", 2, 2);
+                Assertions.assertEquals(1, sorter.compare(c1, c2));
+                assertSortResult(c2, sorter, Lists.newArrayList(c1, c2));
             }
         }
     }
 
     @Test
-    public void testSort() {
-        {
-            val model1 = mockCandidate("model0001", "modelA", 1, 1);
-            val model2 = mockCandidate("model0002", "modelB", 2, 2);
-            sort(model1, model2).assertFirst(model1);
-        }
-
-        {
-            val model1 = mockCandidate("model0001", "modelA", 2, 1);
-            val model2 = mockCandidate("model0002", "modelB", 2, 2);
-            sort(model1, model2).assertFirst(model1);
-        }
-
-        {
-            val model1 = mockCandidate("model0001", "modelA", 2, 2);
-            val model2 = mockCandidate("model0002", "modelB", 2, 2);
-            sort(model1, model2).assertFirst(model1);
-        }
-
-        {
-            val model1 = mockCandidate("model0001", "modelA", 1, 1);
-            val model2 = mockCandidate("model0002", "modelB", 2, 2);
-            val model3 = mockCandidate("model0003", "modelC", 4, 4);
-            sort(model1, model2, model3).assertFirst(model1);
-        }
-
-        {
-            val model1 = mockCandidate("model0001", "modelA", 1, 1);
-            val model2 = mockEmptyCandidate("model0002", "modelB", 1);
-            sort(model1, model2).assertFirst(model1);
-        }
-
-        {
-            val model1 = mockStreamingCandidate("model0001", "modelA", 1, 1);
-            val model2 = mockEmptyCandidate("model0002", "modelB", 1);
-            sort(model1, model2).assertFirst(model1);
-        }
-
-        {
-            val model1 = mockHybridCandidate("model0001", "modelA", 1, 1, 2);
-            val model2 = mockEmptyCandidate("model0002", "modelB", 1);
-            sort(model1, model2).assertFirst(model1);
-        }
-
-        {
-            val model1 = mockCandidate("model0001", "modelA", 1, 3);
-            val model2 = mockStreamingCandidate("model0002", "modelB", 1, 2);
-            val model3 = mockHybridCandidate("model0003", "modelC", 1, 4, 2);
-            sort(model1, model2, model3).assertFirst(model2);
-        }
-    }
-
-    private interface SortedCandidate {
-
-        void assertFirst(Candidate candidate);
-    }
-
-    private SortedCandidate sort(Candidate... candidates) {
-        List<Comparator<Candidate>> sorters = Lists.newArrayList();
-        sorters.add(Candidate.modelPrioritySorter());
-        sorters.add(Candidate.realizationCostSorter());
-        sorters.add(Candidate.realizationCapabilityCostSorter());
-        sorters.add(Candidate.modelUuidSorter());
-
-        return candidate -> {
-            Arrays.sort(candidates, Ordering.compound(sorters));
-            
Assert.assertEquals(candidate.getRealization().getModel().getAlias(),
-                    candidates[0].getRealization().getModel().getAlias());
-        };
+    void realizationCostSorterTest() {
+        Comparator<Candidate> comparator = Candidate.realizationCostSorter();
+        NDataflow df1 = Mockito.mock(NDataflow.class);
+        NDataflow df2 = Mockito.mock(NDataflow.class);
+        OLAPContext olapContext = Mockito.mock(OLAPContext.class);
+        Candidate c1 = new Candidate(df1, olapContext, Maps.newHashMap());
+        Candidate c2 = new Candidate(df2, olapContext, Maps.newHashMap());
+        Mockito.when(c1.getRealization().getCost()).thenReturn(1);
+        Mockito.when(c2.getRealization().getCost()).thenReturn(2);
+
+        // Assert that the Comparator sorts the Candidates correctly
+        assertSortResult(c1, comparator, Lists.newArrayList(c1, c2));
     }
 
-    private Candidate mockCandidate(String modelId, String modelName, int 
modelCost, double candidateCost) {
-        val candidate = new Candidate();
-        candidate.realization = mockRealization(modelId, modelName, modelCost);
-        val cap = new CapabilityResult();
-        cap.setSelectedCandidate(() -> candidateCost);
-        cap.cost = (int) cap.getSelectedCandidate().getCost();
-        candidate.setCapability(cap);
-        return candidate;
+    @Test
+    void realizationCapabilityCostSorter() {
+        Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 
1, 1);
+        Candidate c2 = CandidateTestUtils.mockCandidate("model0001", "modelA", 
1, 2);
+        Candidate c3 = CandidateTestUtils.mockCandidate("model0001", "modelA", 
1, 2);
+        Comparator<Candidate> comparator = 
Candidate.realizationCapabilityCostSorter();
+        assertSortResult(c1, comparator, Lists.newArrayList(c1, c2));
+        assertSortResult(c2, comparator, Lists.newArrayList(c2, c3));
     }
 
-    private Candidate mockStreamingCandidate(String modelId, String modelName, 
int modelCost, double candidateCost) {
-        val candidate = new Candidate();
-        candidate.realization = mockRealization(modelId, modelName, modelCost);
-        val cap = new CapabilityResult();
-        cap.setSelectedStreamingCandidate(() -> candidateCost);
-        cap.cost = (int) cap.getSelectedStreamingCandidate().getCost();
-        candidate.setCapability(cap);
-        return candidate;
+    @Test
+    void testModelUuidSorter() {
+        Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 
1, 1);
+        Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 
1, 1);
+        Comparator<Candidate> comparator = Candidate.modelUuidSorter();
+        assertSortResult(c1, comparator, Lists.newArrayList(c1, c2));
     }
 
-    private Candidate mockHybridCandidate(String modelId, String modelName, 
int modelCost, double candidateCost,
-            double streamingCandidateCost) {
-        val candidate = new Candidate();
-        candidate.realization = mockRealization(modelId, modelName, modelCost);
-        val cap = new CapabilityResult();
-        cap.setSelectedCandidate(() -> candidateCost);
-        cap.setSelectedStreamingCandidate(() -> streamingCandidateCost);
-        cap.cost = (int) Math.min(cap.getSelectedCandidate().getCost(), 
cap.getSelectedStreamingCandidate().getCost());
-        candidate.setCapability(cap);
-        return candidate;
+    @Test
+    void testTableIndexUnmatchedColSizeComparator() {
+        Comparator<Candidate> comparator = 
Candidate.tableIndexUnmatchedColSizeSorter();
+        Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 
1, 1, 1);
+        Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 
1, 2, 2);
+        assertSortResult(c1, comparator, Lists.newArrayList(c2, c1));
     }
 
-    private Candidate mockEmptyCandidate(String modelId, String modelName, int 
modelCost) {
-        val candidate = new Candidate();
-        candidate.realization = mockRealization(modelId, modelName, modelCost);
-        val cap = new CapabilityResult();
-        cap.setSelectedCandidate(NLayoutCandidate.EMPTY);
-        cap.setSelectedStreamingCandidate(NLayoutCandidate.EMPTY);
-        candidate.setCapability(cap);
-        return candidate;
+    private void assertSortResult(Candidate expected, Comparator<Candidate> 
comparator, List<Candidate> candidates) {
+        candidates.sort(comparator);
+        Assertions.assertEquals(expected, candidates.get(0));
     }
-
-    private IRealization mockRealization(String modelId, String modelName, int 
cost) {
-        return new IRealization() {
-            @Override
-            public CapabilityResult isCapable(SQLDigest digest, 
List<NDataSegment> prunedSegments,
-                    Map<String, Set<Long>> chSegToLayoutsMap) {
-                return null;
-            }
-
-            @Override
-            public CapabilityResult isCapable(SQLDigest digest, QueryableSeg 
queryableSeg) {
-                return null;
-            }
-
-            @Override
-            public String getType() {
-                return null;
-            }
-
-            @Override
-            public KylinConfig getConfig() {
-                return null;
-            }
-
-            @Override
-            public NDataModel getModel() {
-                val model = new NDataModel();
-                model.setAlias(modelName);
-                model.setUuid(modelId);
-                return model;
-            }
-
-            @Override
-            public Set<TblColRef> getAllColumns() {
-                return null;
-            }
-
-            @Override
-            public List<TblColRef> getAllDimensions() {
-                return null;
-            }
-
-            @Override
-            public List<MeasureDesc> getMeasures() {
-                return null;
-            }
-
-            @Override
-            public List<IRealization> getRealizations() {
-                return null;
-            }
-
-            @Override
-            public FunctionDesc findAggrFunc(FunctionDesc aggrFunc) {
-                return null;
-            }
-
-            @Override
-            public boolean isOnline() {
-                return true;
-            }
-
-            @Override
-            public String getUuid() {
-                return null;
-            }
-
-            @Override
-            public String getCanonicalName() {
-                return null;
-            }
-
-            @Override
-            public long getDateRangeStart() {
-                return 0;
-            }
-
-            @Override
-            public long getDateRangeEnd() {
-                return 0;
-            }
-
-            @Override
-            public int getCost() {
-                return cost;
-            }
-
-            @Override
-            public boolean hasPrecalculatedFields() {
-                return false;
-            }
-
-            @Override
-            public int getStorageType() {
-                return 0;
-            }
-
-            @Override
-            public boolean isStreaming() {
-                return false;
-            }
-
-            @Override
-            public String getProject() {
-                return null;
-            }
-        };
-    }
-
 }
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/routing/CandidateTestUtils.java
 
b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateTestUtils.java
new file mode 100644
index 0000000000..341a5a8686
--- /dev/null
+++ 
b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateTestUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.routing;
+
+import java.util.Arrays;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.query.relnode.OLAPContext;
+
+import lombok.val;
+
+public class CandidateTestUtils {
+
+    static Candidate mockCandidate(String modelId, String modelName, int 
realizationCost, double candidateCost) {
+        return mockCandidate(modelId, modelName, realizationCost, 
candidateCost, 0);
+    }
+
+    static Candidate mockCandidate(String modelId, String modelName, int 
realizationCost, double candidateCost,
+            int unmatchedColSize) {
+        IRealization realization = mockRealization(modelId, modelName, 
realizationCost);
+        OLAPContext olapContext = mockOlapContext();
+        val candidate = new Candidate(realization, olapContext, 
Maps.newHashMap());
+        val cap = new CapabilityResult();
+        cap.setSelectedCandidate(() -> candidateCost);
+        cap.setCost(cap.getSelectedCandidate().getCost());
+        cap.setLayoutUnmatchedColsSize(unmatchedColSize);
+        candidate.setCapability(cap);
+        return candidate;
+    }
+
+    static OLAPContext mockOlapContext() {
+        return new OLAPContext(-1);
+    }
+
+    static IRealization mockRealization(String modelId, String modelName, int 
cost) {
+        return new NDataflow() {
+            @Override
+            public NDataModel getModel() {
+                val model = new NDataModel();
+                model.setAlias(modelName);
+                model.setUuid(modelId);
+                return model;
+            }
+
+            @Override
+            public boolean isOnline() {
+                return true;
+            }
+
+            @Override
+            public int getCost() {
+                return cost;
+            }
+        };
+    }
+
+    static String[] mockModelPriorityValues(String[] arr) {
+        return 
Arrays.stream(arr).map(StringUtils::upperCase).toArray(String[]::new);
+    }
+}
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/routing/LayoutCandidateSortTest.java
 
b/src/query/src/test/java/org/apache/kylin/query/routing/LayoutCandidateSortTest.java
new file mode 100644
index 0000000000..0666ef3a72
--- /dev/null
+++ 
b/src/query/src/test/java/org/apache/kylin/query/routing/LayoutCandidateSortTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.routing;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
+import org.apache.kylin.metadata.cube.cuboid.NQueryLayoutChooser;
+import org.apache.kylin.metadata.cube.model.IndexEntity;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
+import org.apache.kylin.metadata.cube.model.RuleBasedIndex;
+import org.apache.kylin.metadata.model.DeriveInfo;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.util.MetadataTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+@MetadataInfo
+class LayoutCandidateSortTest {
+
+    @Test
+    void testPreferAggComparator() {
+        MockEntity mock1 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + 
1, ImmutableList.of(1), 2);
+        MockEntity mock2 = new MockEntity(1L, ImmutableList.of(1), 3);
+        assertSortedResult(1L, NQueryLayoutChooser.preferAggComparator(), 
mock1, mock2);
+    }
+
+    @Test
+    void testSegmentRangeComparator() {
+        MockEntity mock1 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + 
1, ImmutableList.of(1, 2), 5000, 2000);
+        MockEntity mock2 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + 
IndexEntity.INDEX_ID_STEP + 1,
+                ImmutableList.of(1, 3), 2000, 2000);
+        assertSortedResult(IndexEntity.TABLE_INDEX_START_ID + 1, 
NQueryLayoutChooser.segmentRangeComparator(), mock1,
+                mock2);
+    }
+
+    @Test
+    void testSegmentEffectivenessComparator() {
+        MockEntity mock1 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + 
1, ImmutableList.of(1, 2), 1000, 3000);
+        MockEntity mock2 = new MockEntity(IndexEntity.TABLE_INDEX_START_ID + 
IndexEntity.INDEX_ID_STEP + 1,
+                ImmutableList.of(1, 3), 1000, 2000);
+        assertSortedResult(IndexEntity.TABLE_INDEX_START_ID + 1, 
NQueryLayoutChooser.segmentEffectivenessComparator(),
+                mock1, mock2);
+    }
+
+    @Test
+    void testRowSizeComparator() {
+        MockEntity mock1 = new MockEntity(1L, ImmutableList.of(1, 2), 90);
+        MockEntity mock2 = new MockEntity(IndexEntity.INDEX_ID_STEP + 1L, 
ImmutableList.of(1, 4), 30);
+        MockEntity mock3 = new MockEntity(2 * IndexEntity.INDEX_ID_STEP + 1L, 
ImmutableList.of(1, 5), 10);
+        assertSortedResult(2 * IndexEntity.INDEX_ID_STEP + 1L, 
NQueryLayoutChooser.rowSizeComparator(), mock1, mock2,
+                mock3);
+    }
+
+    @Test
+    void testDerivedLayoutComparator() {
+        DeriveInfo mockDeriveInfo = Mockito.mock(DeriveInfo.class);
+        MockEntity mock1 = new MockEntity(1L, ImmutableList.of(1, 2), 
ImmutableMap.of(5, mockDeriveInfo));
+        MockEntity mock2 = new MockEntity(2 * IndexEntity.INDEX_ID_STEP + 1L, 
ImmutableList.of(1, 4),
+                ImmutableMap.of(3, mockDeriveInfo));
+        MockEntity mock3 = new MockEntity(IndexEntity.INDEX_ID_STEP + 1L, 
ImmutableList.of(1, 3), ImmutableMap.of());
+
+        Comparator<NLayoutCandidate> comparator = 
NQueryLayoutChooser.derivedLayoutComparator();
+
+        // both not empty, choose the first one
+        assertSortedResult(1L, comparator, mock1, mock2);
+
+        // choose the layout no need derivedInfo
+        assertSortedResult(IndexEntity.INDEX_ID_STEP + 1, comparator, mock1, 
mock3);
+
+        // turn on table exclusion, but not prefer snapshot
+        MetadataTestUtils.updateProjectConfig("default", 
"kylin.metadata.table-exclusion-enabled", "true");
+        MetadataTestUtils.updateProjectConfig("default", 
"kylin.query.snapshot-preferred-for-table-exclusion", "false");
+        assertSortedResult(IndexEntity.INDEX_ID_STEP + 1, comparator, mock1, 
mock3);
+
+        // turn on table exclusion and prefer snapshot
+        MetadataTestUtils.updateProjectConfig("default", 
"kylin.metadata.table-exclusion-enabled", "true");
+        MetadataTestUtils.updateProjectConfig("default", 
"kylin.query.snapshot-preferred-for-table-exclusion", "true");
+        assertSortedResult(1L, comparator, mock1, mock3);
+    }
+
+    @Test
+    void testShardByComparator() {
+
+        {
+            MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0, 1, 2), 
ImmutableList.of(), ImmutableList.of(0));
+            MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 0, 2), 
ImmutableList.of(), ImmutableList.of(1));
+
+            // all layout candidates have shardBy column
+            List<Integer> sortedFilters = Lists.newArrayList(2, 1, 0);
+            assertSortedResult(2L, 
NQueryLayoutChooser.shardByComparator(sortedFilters), mock1, mock2);
+
+            sortedFilters = Lists.newArrayList(2, 0, 1);
+            assertSortedResult(1L, 
NQueryLayoutChooser.shardByComparator(sortedFilters), mock1, mock2);
+        }
+
+        {
+            // the layout(id=2) has shardBy column(colId=1)
+            MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0, 1, 2), 
ImmutableList.of(), ImmutableList.of());
+            MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 0, 2), 
ImmutableList.of(), ImmutableList.of(1));
+
+            List<Integer> sortedFilters = Lists.newArrayList(2, 1, 0);
+            assertSortedResult(2L, 
NQueryLayoutChooser.shardByComparator(sortedFilters), mock1, mock2);
+        }
+
+        {
+            // the layout(id=1) has shardBy column(colId=0)
+            MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0, 1, 2), 
ImmutableList.of(), ImmutableList.of(0));
+            MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 0, 2), 
ImmutableList.of(), ImmutableList.of());
+
+            List<Integer> sortedFilters = Lists.newArrayList(2, 1, 0);
+            assertSortedResult(1L, 
NQueryLayoutChooser.shardByComparator(sortedFilters), mock1, mock2);
+        }
+    }
+
+    @Test
+    void testFilterColumnComparator() {
+        {
+            // both with shardBy, subject to the shardBy order
+            MockEntity mock1 = new MockEntity(1L, ImmutableList.of(1, 2, 3), 
ImmutableList.of(), ImmutableList.of(1));
+            MockEntity mock2 = new MockEntity(2L, ImmutableList.of(2, 1, 3), 
ImmutableList.of(), ImmutableList.of(2));
+            List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3);
+            assertSortedResult(1L, 
NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2);
+
+            sortedFilters = Lists.newArrayList(3, 2, 1);
+            assertSortedResult(2L, 
NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2);
+        }
+
+        {
+            // one with shardBy
+            MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), 
ImmutableList.of(), ImmutableList.of());
+            MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), 
ImmutableList.of(), ImmutableList.of(1));
+            List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3);
+            assertSortedResult(2L, 
NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2);
+        }
+
+        {
+            // one with shardBy
+            MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), 
ImmutableList.of(), ImmutableList.of(2));
+            MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), 
ImmutableList.of(), ImmutableList.of());
+            List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3);
+            assertSortedResult(1L, 
NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2);
+        }
+
+        {
+            // both without shardBy
+            MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), 
ImmutableMap.of());
+            MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), 
ImmutableMap.of());
+            List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3);
+            assertSortedResult(2L, 
NQueryLayoutChooser.filterColumnComparator(sortedFilters), mock1, mock2);
+        }
+    }
+
+    @Test
+    void testNonFilterColumnComparator() {
+        {
+            MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), 
ImmutableMap.of());
+            MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), 
ImmutableMap.of());
+            List<Integer> sortedFilters = Lists.newArrayList(1, 2, 3);
+            assertSortedResult(2L, 
NQueryLayoutChooser.nonFilterColumnComparator(sortedFilters), mock1, mock2);
+        }
+
+        {
+            MockEntity mock1 = new MockEntity(1L, ImmutableList.of(2, 1, 3), 
ImmutableMap.of());
+            MockEntity mock2 = new MockEntity(2L, ImmutableList.of(1, 2, 3), 
ImmutableMap.of());
+            List<Integer> sortedFilters = Lists.newArrayList(2, 1, 3);
+            assertSortedResult(1L, 
NQueryLayoutChooser.nonFilterColumnComparator(sortedFilters), mock1, mock2);
+        }
+    }
+
+    @Test
+    void testMeasureSizeComparator() {
+        MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0), 
ImmutableList.of(100_000, 100_001));
+        MockEntity mock2 = new MockEntity(10_001, ImmutableList.of(0), 
ImmutableList.of(100_000));
+        List<NLayoutCandidate> layoutCandidates = mockLayouts(mock1, mock2);
+        layoutCandidates.sort(NQueryLayoutChooser.measureSizeComparator());
+        Assertions.assertEquals(10_001L, 
layoutCandidates.get(0).getLayoutEntity().getId());
+    }
+
+    @Test
+    void testDimensionSizeComparator() {
+        MockEntity mock1 = new MockEntity(1L, ImmutableList.of(0, 1, 2), 
ImmutableList.of());
+        MockEntity mock2 = new MockEntity(10_001L, ImmutableList.of(0, 1, 2, 
3), ImmutableList.of());
+        List<NLayoutCandidate> layoutCandidates = mockLayouts(mock1, mock2);
+        layoutCandidates.sort(NQueryLayoutChooser.dimensionSizeComparator());
+        Assertions.assertEquals(1L, 
layoutCandidates.get(0).getLayoutEntity().getId());
+    }
+
+    private void assertSortedResult(long expectedId, 
Comparator<NLayoutCandidate> comparator,
+            MockEntity... mockEntities) {
+        List<NLayoutCandidate> layoutCandidates = mockLayouts(mockEntities);
+        layoutCandidates.sort(comparator);
+        Assertions.assertEquals(expectedId, 
layoutCandidates.get(0).getLayoutEntity().getId());
+    }
+
+    static class MockEntity {
+        long id;
+        List<Integer> dimensions;
+        List<Integer> measures;
+        List<Integer> shardByCols;
+        Map<Integer, DeriveInfo> deriveInfoMap;
+        long segRange;
+        long maxSegEnd;
+        double rowCost;
+
+        MockEntity(long id, List<Integer> dimensions, Map<Integer, DeriveInfo> 
deriveInfoMap) {
+            this(id, dimensions, Lists.newArrayList(100_000), 
Lists.newArrayList());
+            this.deriveInfoMap = deriveInfoMap;
+        }
+
+        MockEntity(long id, List<Integer> dimensions, double rowCost) {
+            this(id, dimensions, Lists.newArrayList(100_000), 
Lists.newArrayList());
+            this.rowCost = rowCost;
+        }
+
+        MockEntity(long id, List<Integer> dimensions, long segRange, long 
maxSegEnd) {
+            this(id, dimensions, Lists.newArrayList(100_000), 
Lists.newArrayList());
+            this.segRange = segRange;
+            this.maxSegEnd = maxSegEnd;
+        }
+
+        MockEntity(long id, List<Integer> dimensions, List<Integer> measures) {
+            this(id, dimensions, measures, Lists.newArrayList());
+        }
+
+        MockEntity(long id, List<Integer> dimensions, List<Integer> measures, 
List<Integer> shardByCols) {
+            this.shardByCols = shardByCols;
+            this.id = id;
+            this.dimensions = dimensions;
+            this.measures = id < IndexEntity.TABLE_INDEX_START_ID ? measures : 
Lists.newArrayList();
+        }
+    }
+
+    private List<NLayoutCandidate> mockLayouts(MockEntity... entities) {
+        String project = "default";
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        String modelId = "82fa7671-a935-45f5-8779-85703601f49a";
+        NDataModelManager modelMgr = 
NDataModelManager.getInstance(kylinConfig, project);
+        modelMgr.updateDataModel(modelId, copyForWrite -> {
+            List<NDataModel.NamedColumn> allNamedColumns = 
copyForWrite.getAllNamedColumns();
+            for (NDataModel.NamedColumn column : allNamedColumns) {
+                if (column.isExist()) {
+                    column.setStatus(NDataModel.ColumnStatus.DIMENSION);
+                }
+            }
+        });
+        Map<Long, MockEntity> idToMockEntityMap = Maps.newHashMap();
+        NIndexPlanManager indexMgr = 
NIndexPlanManager.getInstance(kylinConfig, project);
+        indexMgr.updateIndexPlan(modelId, copyForWrite -> {
+            // clear ruleBasedIndex and auto indexes
+            copyForWrite.setRuleBasedIndex(new RuleBasedIndex());
+            copyForWrite.getIndexes().clear();
+            Map<Long, IndexEntity> indexMap = Maps.newHashMap();
+            for (MockEntity entity : entities) {
+                idToMockEntityMap.put(entity.id, entity);
+                long indexId = entity.id - entity.id % 
IndexEntity.INDEX_ID_STEP;
+                IndexEntity index = indexMap.get(indexId);
+                if (index == null) {
+                    index = new IndexEntity();
+                    index.setId(indexId);
+                    index.setDimensions(entity.dimensions);
+                    index.setMeasures(entity.measures);
+                }
+                // add layout
+                LayoutEntity layout = new LayoutEntity();
+                List<Integer> colOrder = Lists.newArrayList();
+                colOrder.addAll(entity.dimensions);
+                colOrder.addAll(entity.measures);
+                layout.setIndex(index);
+                layout.setAuto(true);
+                layout.setColOrder(colOrder);
+                layout.setShardByColumns(entity.shardByCols);
+                layout.setId(entity.id);
+                index.addLayout(layout);
+                indexMap.put(index.getId(), index);
+            }
+            copyForWrite.updateNextId();
+            copyForWrite.getIndexes().addAll(indexMap.values());
+        });
+        List<LayoutEntity> allLayouts = 
indexMgr.getIndexPlan(modelId).getAllLayouts();
+        return allLayouts.stream().map(layout -> {
+            NLayoutCandidate layoutCandidate = new NLayoutCandidate(layout, 0, 
new CapabilityResult());
+            MockEntity mockEntity = idToMockEntityMap.get(layout.getId());
+            layoutCandidate.setRange(mockEntity.segRange);
+            layoutCandidate.setMaxSegEnd(mockEntity.maxSegEnd);
+            layoutCandidate.setCost(mockEntity.rowCost);
+            layoutCandidate.setDerivedToHostMap(mockEntity.deriveInfoMap);
+            return layoutCandidate;
+        }).collect(Collectors.toList());
+    }
+}
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/routing/QueryRouterTest.java 
b/src/query/src/test/java/org/apache/kylin/query/routing/QueryRouterTest.java
new file mode 100644
index 0000000000..6f5fc71a7d
--- /dev/null
+++ 
b/src/query/src/test/java/org/apache/kylin/query/routing/QueryRouterTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.routing;
+
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.util.MetadataTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.val;
+
+@MetadataInfo
+class QueryRouterTest {
+    @Test
+    void testSort() {
+        {
+            Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 1, 1);
+            Candidate c2 = CandidateTestUtils.mockCandidate("model0002", 
"modelB", 2, 2);
+            assertSortedResults(c1, Lists.newArrayList(c1, c2));
+        }
+
+        {
+            Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 2, 1);
+            Candidate c2 = CandidateTestUtils.mockCandidate("model0002", 
"modelB", 2, 2);
+            List<Candidate> candidates = Lists.newArrayList(c1, c2);
+            QueryRouter.sortCandidates("default", candidates);
+            Assertions.assertEquals(c1, candidates.get(0));
+        }
+
+        {
+            Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 2, 2);
+            Candidate c2 = CandidateTestUtils.mockCandidate("model0002", 
"modelB", 2, 2);
+            List<Candidate> candidates = Lists.newArrayList(c1, c2);
+            QueryRouter.sortCandidates("default", candidates);
+            Assertions.assertEquals(c1, candidates.get(0));
+        }
+
+        {
+            Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 1, 1);
+            Candidate c2 = CandidateTestUtils.mockCandidate("model0002", 
"modelB", 2, 2);
+            Candidate c3 = CandidateTestUtils.mockCandidate("model0003", 
"modelC", 4, 4);
+            List<Candidate> candidates = Lists.newArrayList(c1, c2, c3);
+            QueryRouter.sortCandidates("default", candidates);
+            Assertions.assertEquals(c1, candidates.get(0));
+        }
+
+        {
+            Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 1, 1);
+            Candidate c2 = mockEmptyCandidate("model0003", "modelC", 1);
+            List<Candidate> candidates = Lists.newArrayList(c1, c2);
+            QueryRouter.sortCandidates("default", candidates);
+            Assertions.assertEquals(c1, candidates.get(0));
+        }
+
+        {
+            Candidate c1 = mockStreamingCandidate("model0001", "modelA", 2, 1);
+            Candidate c2 = mockEmptyCandidate("model0002", "modelB", 2);
+            List<Candidate> candidates = Lists.newArrayList(c1, c2);
+            QueryRouter.sortCandidates("default", candidates);
+            Assertions.assertEquals(c1, candidates.get(0));
+        }
+
+        {
+            Candidate c1 = mockHybridCandidate("model0001", "modelA", 3, 1, 2);
+            Candidate c2 = mockEmptyCandidate("model0002", "modelB", 3);
+
+            assertSortedResults(c1, Lists.newArrayList(c1, c2));
+        }
+
+        {
+            Candidate c1 = CandidateTestUtils.mockCandidate("model0001", 
"modelA", 1, 3);
+            Candidate c2 = mockStreamingCandidate("model0002", "modelB", 1, 2);
+            Candidate c3 = mockHybridCandidate("model0003", "modelC", 1, 4, 
2.5);
+            List<Candidate> candidates = Lists.newArrayList(c1, c2, c3);
+            QueryRouter.sortCandidates("default", candidates);
+            Assertions.assertEquals(c2, candidates.get(0));
+        }
+    }
+
+    @Test
+    void testSortWithVacantPruningRule() {
+        // This property does affect the sorting of candidates in different 
models.
+        MetadataTestUtils.updateProjectConfig("default", 
"kylin.query.index-match-rules",
+                KylinConfigBase.USE_VACANT_INDEXES);
+        testSort();
+    }
+
+    @Test
+    void testTableIndexAnswerSelectStar() {
+        MetadataTestUtils.updateProjectConfig("default", 
"kylin.query.index-match-rules",
+                KylinConfigBase.USE_TABLE_INDEX_ANSWER_SELECT_STAR);
+        Candidate c1 = CandidateTestUtils.mockCandidate("model0001", "modelA", 
2, 1, 1);
+        Candidate c2 = CandidateTestUtils.mockCandidate("model0002", "modelB", 
1, 1, 2);
+        assertSortedResults(c1, Lists.newArrayList(c1, c2));
+
+        MetadataTestUtils.updateProjectConfig("default", 
"kylin.query.index-match-rules", "");
+        assertSortedResults(c2, Lists.newArrayList(c1, c2));
+    }
+
+    private void assertSortedResults(Candidate expectCandidate, 
List<Candidate> candidates) {
+        QueryRouter.sortCandidates("default", candidates);
+        Assertions.assertEquals(expectCandidate, candidates.get(0));
+    }
+
+    private Candidate mockStreamingCandidate(String modelId, String modelName, 
int realizationCost,
+            double candidateCost) {
+        IRealization realization = CandidateTestUtils.mockRealization(modelId, 
modelName, realizationCost);
+        OLAPContext olapContext = CandidateTestUtils.mockOlapContext();
+        val candidate = new Candidate(realization, olapContext, 
Maps.newHashMap());
+        val cap = new CapabilityResult();
+        cap.setSelectedStreamingCandidate(() -> candidateCost);
+        cap.setCost(cap.getSelectedStreamingCandidate().getCost());
+        candidate.setCapability(cap);
+        return candidate;
+    }
+
+    private Candidate mockHybridCandidate(String modelId, String modelName, 
int realizationCost, double candidateCost,
+            double streamingCandidateCost) {
+        IRealization realization = CandidateTestUtils.mockRealization(modelId, 
modelName, realizationCost);
+        OLAPContext olapContext = CandidateTestUtils.mockOlapContext();
+        val candidate = new Candidate(realization, olapContext, 
Maps.newHashMap());
+        val cap = new CapabilityResult();
+        cap.setSelectedCandidate(() -> candidateCost);
+        cap.setSelectedStreamingCandidate(() -> streamingCandidateCost);
+        cap.setCost(
+                (int) Math.min(cap.getSelectedCandidate().getCost(), 
cap.getSelectedStreamingCandidate().getCost()));
+        candidate.setCapability(cap);
+        return candidate;
+    }
+
+    private Candidate mockEmptyCandidate(String modelId, String modelName, int 
realizationCost) {
+        IRealization realization = CandidateTestUtils.mockRealization(modelId, 
modelName, realizationCost);
+        OLAPContext olapContext = CandidateTestUtils.mockOlapContext();
+        val candidate = new Candidate(realization, olapContext, 
Maps.newHashMap());
+        candidate.realization = CandidateTestUtils.mockRealization(modelId, 
modelName, realizationCost);
+        val cap = new CapabilityResult();
+        cap.setSelectedCandidate(NLayoutCandidate.EMPTY);
+        cap.setSelectedStreamingCandidate(NLayoutCandidate.EMPTY);
+        candidate.setCapability(cap);
+        return candidate;
+    }
+}
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index b664963211..483e35aa50 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -292,7 +292,9 @@ class FilePruner(val session: SparkSession,
     QueryContext.current().getMetrics.setFileCount(totalFileCount)
     val totalFileSize = selected.flatMap(partition => 
partition.files).map(_.getLen).sum
     val sourceRows = selected.map(seg => {
-      val layoutRows = 
dataflow.getSegment(seg.segmentID).getLayout(layout.getId).getRows
+      val segment = dataflow.getSegment(seg.segmentID)
+      val dataLayout = segment.getLayout(layout.getId)
+      val layoutRows = if (dataLayout == null) 0 else dataLayout.getRows
       logInfo(s"Source scan rows: Query Id: 
${QueryContext.current().getQueryId}, Segment Id: ${seg.segmentID}, " +
         s"Layout Id: ${layout.getId}, rows: $layoutRows.")
       layoutRows
@@ -361,7 +363,9 @@ class FilePruner(val session: SparkSession,
 
   private def pruneEmptySegments(segDirs: Seq[SegmentDirectory]): 
Seq[SegmentDirectory] = {
     segDirs.filter(seg => {
-      if (dataflow.getSegment(seg.segmentID).getLayout(layout.getId).isEmpty) {
+      val segment = dataflow.getSegment(seg.segmentID)
+      val dataLayout = segment.getLayout(layout.getId)
+      if (dataLayout == null || dataLayout.isEmpty) {
         logDebug(s"pruning empty segment: segment ${seg.segmentID} 
${layout.getId} is empty.")
         false
       } else {

Reply via email to