This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5d251e313959b13cb15c2789eb69b6bb9b72c6b5
Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
AuthorDate: Tue Feb 28 15:26:19 2023 +0800

    KYLIN-5536 Limit the segment range of MAX query to improve query performance
    
    Co-authored-by: sibing.zhang <sibing.zh...@qq.com>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../apache/kylin/common/KylinConfigBaseTest.java   |   9 ++
 .../kylin/query/routing/RealizationPruner.java     | 177 +++++++++++++++------
 .../kylin/rest/service/QueryServiceTest.java       |  16 +-
 .../kylin/query/engine/SelectRealizationTest.java  |   7 +-
 5 files changed, 154 insertions(+), 59 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7f90613fad..19021c4d73 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3838,6 +3838,10 @@ public abstract class KylinConfigBase implements 
Serializable {
                 && 
Boolean.parseBoolean(getOptional("kylin.query.diagnose-enabled", TRUE));
     }
 
+    public long getMaxMeasureSegmentPrunerBeforeDays() {
+        return 
Long.parseLong(getOptional("kylin.query.max-measure-segment-pruner-before-days",
 "-1"));
+    }
+
     // 
============================================================================
     // Cost based index Planner
     // 
============================================================================
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 5dfad30ca5..3bbd191aff 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1427,6 +1427,15 @@ class KylinConfigBaseTest {
         
config.setProperty("kylin.query.calcite.bindable.cache.concurrencyLevel", "3");
         assertEquals(3, config.getCalciteBindableCacheConcurrencyLevel());
     }
+
+    @Test
+    void testGetMaxMeasureSegmentPrunerBeforeDays() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        long defaultValue = config.getMaxMeasureSegmentPrunerBeforeDays();
+        assertEquals(-1, defaultValue);
+        
config.setProperty("kylin.query.max-measure-segment-pruner-before-days", "1");
+        assertEquals(1, config.getMaxMeasureSegmentPrunerBeforeDays());
+    }
 }
 
 class EnvironmentUpdateUtils {
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationPruner.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationPruner.java
index 5c56de2fb3..905dcd02e1 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationPruner.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationPruner.java
@@ -60,6 +60,8 @@ import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.MultiPartitionDesc;
 import org.apache.kylin.metadata.model.MultiPartitionKeyMapping;
@@ -67,6 +69,7 @@ import 
org.apache.kylin.metadata.model.MultiPartitionKeyMappingProvider;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.query.relnode.OLAPContext;
@@ -89,6 +92,7 @@ public class RealizationPruner {
     private static final String STRING = "string";
     private static final String INTEGER = "integer";
     private static final String BIGINT = "bigint";
+    public static final long DAY = 24 * 3600 * 1000L;
     private static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC");
     private static final Pattern DATE_PATTERN = 
Pattern.compile("[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]");
     private static final Pattern TIMESTAMP_PATTERN = Pattern.compile(
@@ -102,80 +106,90 @@ public class RealizationPruner {
     private RealizationPruner() {
     }
 
-    public static List<NDataSegment> pruneSegments(NDataflow dataflow, 
OLAPContext olapContext) {
-        val kylinConfig = KylinConfig.getInstanceFromEnv();
-        val projectName = dataflow.getProject();
-
-        val projectInstance = 
NProjectManager.getInstance(kylinConfig).getProject(projectName);
-        val allReadySegments = dataflow.getQueryableSegments();
-        if (!projectInstance.getConfig().isHeterogeneousSegmentEnabled()) {
-            return allReadySegments;
-        }
-        val isStreamingFactTable = 
olapContext.firstTableScan.getOlapTable().getSourceTable()
-                .getSourceType() == ISourceAware.ID_STREAMING;
-        val isBatchFusionModel = isStreamingFactTable && 
dataflow.getModel().isFusionModel() && !dataflow.isStreaming();
-        val partitionDesc = isBatchFusionModel
-                ? getStreamingPartitionDesc(dataflow.getModel(), kylinConfig, 
projectName)
-                : dataflow.getModel().getPartitionDesc();
-        // no partition column
-        if (PartitionDesc.isEmptyPartitionDesc(partitionDesc)) {
-            log.info("No partition column");
+    public static Segments<NDataSegment> pruneSegments(NDataflow dataflow, 
OLAPContext olapContext) {
+        Segments<NDataSegment> allReadySegments = 
dataflow.getQueryableSegments();
+        if 
(!NProjectManager.getProjectConfig(dataflow.getProject()).isHeterogeneousSegmentEnabled())
 {
             return allReadySegments;
         }
 
-        val partitionColumn = partitionDesc.getPartitionDateColumnRef();
-        val dateFormat = partitionDesc.getPartitionDateFormat();
-        val filterColumns = olapContext.filterColumns;
-        // sql filter columns do not include partition column
-        if (!filterColumns.contains(partitionColumn)) {
-            log.info("Filter columns do not contain partition column");
+        // pruner segment by partition column and dataformat
+        PartitionDesc partitionCol = getPartitionDesc(dataflow, olapContext);
+        if (isFullBuildModel(partitionCol)) {
+            log.info("No partition column or partition column format is 
null.");
             return allReadySegments;
         }
 
-        val selectedSegments = Lists.<NDataSegment> newArrayList();
-        var filterConditions = olapContext.getExpandedFilterConditions();
+        // pruner segment by simplify sql filter
         val relOptCluster = olapContext.firstTableScan.getCluster();
         val rexBuilder = relOptCluster.getRexBuilder();
         val rexSimplify = new RexSimplify(relOptCluster.getRexBuilder(), 
RelOptPredicateList.EMPTY, true,
                 relOptCluster.getPlanner().getExecutor());
 
-        val partitionColInputRef = 
transformColumn2RexInputRef(partitionColumn, olapContext.allTableScans);
-        if (allReadySegments.size() > 0 && dateFormat != null) {
+        var filterConditions = olapContext.getExpandedFilterConditions();
+        val dateFormat = partitionCol.getPartitionDateFormat();
+        val partitionColRef = partitionCol.getPartitionDateColumnRef();
+        RexInputRef partitionColInputRef = null;
+        if (needRewritePartitionColInFilter(dataflow, olapContext)) {
+            partitionColInputRef = 
transformColumn2RexInputRef(partitionColRef, olapContext.allTableScans);
             try {
                 val firstSegmentRanges = 
transformSegment2RexCall(allReadySegments.get(0), dateFormat, rexBuilder,
-                        partitionColInputRef, partitionColumn.getType(), 
dataflow.isStreaming());
+                        partitionColInputRef, partitionColRef.getType(), 
dataflow.isStreaming());
                 RelDataTypeFamily segmentLiteralTypeFamily = 
getSegmentLiteralTypeFamily(firstSegmentRanges.getFirst());
-                filterConditions = filterConditions.stream()//
-                        .map(filterCondition -> 
rewriteRexCall(filterCondition, rexBuilder, segmentLiteralTypeFamily,
-                                partitionColInputRef, dateFormat))
-                        .collect(Collectors.toList());
+                List<RexNode> filterRexNodeList = new ArrayList<>();
+                for (RexNode filterCondition : filterConditions) {
+                    RexNode rexNode = rewriteRexCall(filterCondition, 
rexBuilder, segmentLiteralTypeFamily,
+                            partitionColInputRef, dateFormat);
+                    filterRexNodeList.add(rexNode);
+                }
+                filterConditions = filterRexNodeList;
             } catch (Exception ex) {
                 log.warn("Segment pruning error: ", ex);
+                if (canPruneSegmentsForMaxMeasure(dataflow, olapContext, 
partitionColRef)) {
+                    return selectSegmentsForMaxMeasure(dataflow);
+                }
                 return allReadySegments;
             }
         }
-        var simplifiedSqlFilter = rexSimplify.simplifyAnds(filterConditions);
 
-        // sql filter condition is always false
+        RexNode simplifiedSqlFilter = 
rexSimplify.simplifyAnds(filterConditions);
         if (simplifiedSqlFilter.isAlwaysFalse()) {
             log.info("SQL filter condition is always false, pruning all ready 
segments");
             olapContext.storageContext.setFilterCondAlwaysFalse(true);
-            return selectedSegments;
+            return Segments.empty();
         }
-        // sql filter condition is always true
-        if (simplifiedSqlFilter.isAlwaysTrue()) {
-            log.info("SQL filter condition is always true, pruning no 
segment");
+
+        // pruner segment by customized scene optimize
+        if (canPruneSegmentsForMaxMeasure(dataflow, olapContext, 
partitionColRef)) {
+            return selectSegmentsForMaxMeasure(dataflow);
+        }
+
+        if (!olapContext.filterColumns.contains(partitionColRef)) {
+            log.info("Filter columns do not contain partition column");
             return allReadySegments;
         }
 
-        if (dateFormat == null) {
+        if (simplifiedSqlFilter.isAlwaysTrue()) {
+            log.info("SQL filter condition is always true, pruning no 
segment");
             return allReadySegments;
         }
 
-        for (NDataSegment dataSegment : allReadySegments) {
+        // prune segments by partition filter
+        Segments<NDataSegment> selectedSegments = 
pruneSegmentsByPartitionFilter(dataflow, olapContext, rexSimplify,
+                partitionColInputRef, simplifiedSqlFilter);
+        log.info("Scan segment.size: {} after segment pruning", 
selectedSegments.size());
+        return selectedSegments;
+    }
+
+    private static Segments<NDataSegment> 
pruneSegmentsByPartitionFilter(NDataflow dataflow, OLAPContext olapContext,
+            RexSimplify rexSimplify, RexInputRef partitionColInputRef, RexNode 
simplifiedSqlFilter) {
+        Segments<NDataSegment> selectedSegments = new Segments<>();
+        PartitionDesc partitionCol = getPartitionDesc(dataflow, olapContext);
+        RexBuilder rexBuilder = 
olapContext.firstTableScan.getCluster().getRexBuilder();
+        for (NDataSegment dataSegment : dataflow.getQueryableSegments()) {
             try {
-                val segmentRanges = transformSegment2RexCall(dataSegment, 
dateFormat, rexBuilder, partitionColInputRef,
-                        partitionColumn.getType(), dataflow.isStreaming());
+                val segmentRanges = transformSegment2RexCall(dataSegment, 
partitionCol.getPartitionDateFormat(),
+                        rexBuilder, partitionColInputRef, 
partitionCol.getPartitionDateColumnRef().getType(),
+                        dataflow.isStreaming());
                 // compare with segment start
                 val segmentStartPredicate = RelOptPredicateList.of(rexBuilder,
                         Lists.newArrayList(segmentRanges.getFirst()));
@@ -197,10 +211,79 @@ public class RealizationPruner {
                 selectedSegments.add(dataSegment);
             }
         }
-        log.info("Scan segment.size: {} after segment pruning", 
selectedSegments.size());
         return selectedSegments;
     }
 
+    private static boolean needRewritePartitionColInFilter(NDataflow dataflow, 
OLAPContext olapContext) {
+        return !dataflow.getQueryableSegments().isEmpty() && 
olapContext.filterColumns
+                .contains(getPartitionDesc(dataflow, 
olapContext).getPartitionDateColumnRef());
+    }
+
+    private static boolean isFullBuildModel(PartitionDesc partitionCol) {
+        return PartitionDesc.isEmptyPartitionDesc(partitionCol) || 
partitionCol.getPartitionDateFormat() == null;
+    }
+
+    private static Segments<NDataSegment> 
selectSegmentsForMaxMeasure(NDataflow dataflow) {
+        Segments<NDataSegment> selectedSegments = new Segments<>();
+        long days = 
dataflow.getConfig().getMaxMeasureSegmentPrunerBeforeDays();
+        // segment was sorted
+        Segments<NDataSegment> allReadySegments = 
dataflow.getQueryableSegments();
+        long maxDt = 
allReadySegments.getLatestReadySegment().getTSRange().getEnd();
+        long minDt = maxDt - DAY * days;
+        for (int i = allReadySegments.size() - 1; i >= 0; i--) {
+            if (allReadySegments.get(i).getTSRange().getEnd() > minDt) {
+                selectedSegments.add(allReadySegments.get(i));
+            } else {
+                break;
+            }
+        }
+        log.info("Scan segment size: {} after max measure segment pruner. The 
before days: {}. Passed on segment: {}",
+                selectedSegments.size(), days,
+                
selectedSegments.stream().map(ISegment::getName).collect(Collectors.joining(",")));
+        return selectedSegments;
+    }
+
+    private static boolean canPruneSegmentsForMaxMeasure(NDataflow dataflow, 
OLAPContext olapContext,
+            TblColRef partitionColRef) {
+        if (dataflow.getConfig().getMaxMeasureSegmentPrunerBeforeDays() < 0) {
+            return false;
+        }
+
+        if (CollectionUtils.isNotEmpty(olapContext.getGroupByColumns())
+                && 
!olapContext.getGroupByColumns().stream().allMatch(partitionColRef::equals)) {
+            return false;
+        }
+
+        if (CollectionUtils.isEmpty(olapContext.aggregations)) {
+            return false;
+        }
+
+        for (FunctionDesc agg : olapContext.aggregations) {
+            if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(agg.getExpression())
+                    && 
!partitionColRef.equals(agg.getParameters().get(0).getColRef())) {
+                return false;
+            }
+            if (!FunctionDesc.FUNC_MAX.equalsIgnoreCase(agg.getExpression())
+                    && CollectionUtils.isNotEmpty(agg.getParameters())) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private static PartitionDesc getPartitionDesc(NDataflow dataflow, 
OLAPContext olapContext) {
+        NDataModel model = dataflow.getModel();
+        val isStreamingFactTable = 
olapContext.firstTableScan.getOlapTable().getSourceTable()
+                .getSourceType() == ISourceAware.ID_STREAMING;
+        val isBatchFusionModel = isStreamingFactTable && 
dataflow.getModel().isFusionModel() && !dataflow.isStreaming();
+        if (!isBatchFusionModel) {
+            return model.getPartitionDesc();
+        }
+        return NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), 
dataflow.getProject())
+                .getDataModelDesc(model.getFusionId()).getPartitionDesc();
+    }
+
     public static RexNode rewriteRexCall(RexNode rexNode, RexBuilder 
rexBuilder, RelDataTypeFamily relDataTypeFamily,
             RexInputRef partitionColInputRef, String dateFormat) {
         if (!(rexNode instanceof RexCall)) {
@@ -306,12 +389,6 @@ public class RealizationPruner {
         return null;
     }
 
-    private static PartitionDesc getStreamingPartitionDesc(NDataModel model, 
KylinConfig kylinConfig, String project) {
-        NDataModelManager modelManager = 
NDataModelManager.getInstance(kylinConfig, project);
-        NDataModel streamingModel = 
modelManager.getDataModelDesc(model.getFusionId());
-        return streamingModel.getPartitionDesc();
-    }
-
     private static Pair<RexNode, RexNode> 
transformSegment2RexCall(NDataSegment dataSegment, String dateFormat,
             RexBuilder rexBuilder, RexInputRef partitionColInputRef, DataType 
partitionColType, boolean isStreaming) {
         String start;
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 4f1c9c1fba..46a0ca4b2c 100644
--- 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -2104,17 +2104,17 @@ public class QueryServiceTest extends 
NLocalFileMetadataTestCase {
         PrepareSqlStateParam[] params1 = new PrepareSqlStateParam[2];
         params1[0] = new PrepareSqlStateParam(Double.class.getCanonicalName(), 
"123.1");
         params1[1] = new 
PrepareSqlStateParam(Integer.class.getCanonicalName(), "123");
-        String originSql1 = "with t1 as (select ORDER_ID, PRICE > ? from 
test_kylin_fact where ORDER_ID = ?)\n, "
-                + "t2 as (select bf from test_kylin_fact)\n" + "select * from 
t1 where ORDER_ID = '100'\n" //
+        String originSql1 = "with t1 as (select ORDER_ID, PRICE > ? from 
test_kylin_fact where ORDER_ID > ?)\n, "
+                + "t2 as (select bf from test_kylin_fact)\n" + "select * from 
t1 where ORDER_ID = '125'\n" //
                 + "union all\n" //
-                + "select * from t1 where ORDER_ID = '200'";
-        String filledSql1 = "with t1 as (select ORDER_ID, PRICE > 123.1 from 
test_kylin_fact where ORDER_ID = 123)\n"
-                + ", t2 as (select bf from test_kylin_fact)\n" + "select * 
from t1 where ORDER_ID = '100'\n"
-                + "union all\n" + "select * from t1 where ORDER_ID = '200'";
+                + "select * from t1 where ORDER_ID < '200'";
+        String filledSql1 = "with t1 as (select ORDER_ID, PRICE > 123.1 from 
test_kylin_fact where ORDER_ID > 123)\n"
+                + ", t2 as (select bf from test_kylin_fact)\n" + "select * 
from t1 where ORDER_ID = '125'\n"
+                + "union all\n" + "select * from t1 where ORDER_ID < '200'";
         String transformedFilledSql1 = "SELECT *\n" + "FROM (SELECT ORDER_ID, 
PRICE > 123.1\n"
-                + "FROM TEST_KYLIN_FACT\n" + "WHERE ORDER_ID = 123) AS T1\n" + 
"WHERE ORDER_ID = '100'\n"
+                + "FROM TEST_KYLIN_FACT\n" + "WHERE ORDER_ID > 123) AS T1\n" + 
"WHERE ORDER_ID = '125'\n"
                 + "UNION ALL\n" + "SELECT *\n" + "FROM (SELECT ORDER_ID, PRICE 
> 123.1\n" + "FROM TEST_KYLIN_FACT\n"
-                + "WHERE ORDER_ID = 123) AS T1\n" + "WHERE ORDER_ID = '200'";
+                + "WHERE ORDER_ID > 123) AS T1\n" + "WHERE ORDER_ID < '200'";
         queryWithParamWhenTransformWithToSubQuery(params1, originSql1, 
filledSql1, transformedFilledSql1);
 
         PrepareSqlStateParam[] params2 = new PrepareSqlStateParam[1];
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/engine/SelectRealizationTest.java
 
b/src/query/src/test/java/org/apache/kylin/query/engine/SelectRealizationTest.java
index f5ab4fbe1b..453ae67d60 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/engine/SelectRealizationTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/engine/SelectRealizationTest.java
@@ -31,11 +31,13 @@ import org.apache.calcite.prepare.Prepare;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexExecutorImpl;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.query.calcite.KylinRelDataTypeSystem;
 import org.apache.kylin.junit.annotation.MetadataInfo;
 import org.apache.kylin.query.QueryExtension;
+import org.apache.kylin.query.calcite.KylinRelDataTypeSystem;
+import org.apache.kylin.query.engine.meta.SimpleDataContext;
 import org.apache.kylin.query.util.QueryContextCutter;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -68,6 +70,9 @@ class SelectRealizationTest {
         String defaultSchemaName = schemaFactory.getDefaultSchema();
         val catalogReader = createCatalogReader(config, rootSchema, 
defaultSchemaName);
         val planner = new 
PlannerFactory(kylinConfig).createVolcanoPlanner(config);
+        SimpleDataContext dataContext = new 
SimpleDataContext(rootSchema.plus(), TypeSystem.javaTypeFactory(),
+                kylinConfig);
+        planner.setExecutor(new RexExecutorImpl(dataContext));
         val sqlConverter = SQLConverter.createConverter(config, planner, 
catalogReader);
         val queryOptimizer = new QueryOptimizer(planner);
         RelRoot relRoot = sqlConverter

Reply via email to