This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5d251e313959b13cb15c2789eb69b6bb9b72c6b5 Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Tue Feb 28 15:26:19 2023 +0800 KYLIN-5536 Limit the segment range of MAX query to improve query performance Co-authored-by: sibing.zhang <sibing.zh...@qq.com> --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../apache/kylin/common/KylinConfigBaseTest.java | 9 ++ .../kylin/query/routing/RealizationPruner.java | 177 +++++++++++++++------ .../kylin/rest/service/QueryServiceTest.java | 16 +- .../kylin/query/engine/SelectRealizationTest.java | 7 +- 5 files changed, 154 insertions(+), 59 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 7f90613fad..19021c4d73 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3838,6 +3838,10 @@ public abstract class KylinConfigBase implements Serializable { && Boolean.parseBoolean(getOptional("kylin.query.diagnose-enabled", TRUE)); } + public long getMaxMeasureSegmentPrunerBeforeDays() { + return Long.parseLong(getOptional("kylin.query.max-measure-segment-pruner-before-days", "-1")); + } + // ============================================================================ // Cost based index Planner // ============================================================================ diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java index 5dfad30ca5..3bbd191aff 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java @@ -1427,6 +1427,15 @@ class KylinConfigBaseTest { config.setProperty("kylin.query.calcite.bindable.cache.concurrencyLevel", "3"); assertEquals(3, config.getCalciteBindableCacheConcurrencyLevel()); } + + @Test + void testGetMaxMeasureSegmentPrunerBeforeDays() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + long defaultValue = config.getMaxMeasureSegmentPrunerBeforeDays(); + assertEquals(-1, defaultValue); + config.setProperty("kylin.query.max-measure-segment-pruner-before-days", "1"); + assertEquals(1, config.getMaxMeasureSegmentPrunerBeforeDays()); + } } class EnvironmentUpdateUtils { diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationPruner.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationPruner.java index 5c56de2fb3..905dcd02e1 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationPruner.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationPruner.java @@ -60,6 +60,8 @@ import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.MultiPartitionDesc; import org.apache.kylin.metadata.model.MultiPartitionKeyMapping; @@ -67,6 +69,7 @@ import org.apache.kylin.metadata.model.MultiPartitionKeyMappingProvider; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.query.relnode.OLAPContext; @@ -89,6 +92,7 @@ public class RealizationPruner { private static final String STRING = "string"; private static final String INTEGER = "integer"; private static final String BIGINT = "bigint"; + public static final long DAY = 24 * 3600 * 1000L; private static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC"); private static final Pattern DATE_PATTERN = Pattern.compile("[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]"); private static final Pattern TIMESTAMP_PATTERN = Pattern.compile( @@ -102,80 +106,90 @@ public class RealizationPruner { private RealizationPruner() { } - public static List<NDataSegment> pruneSegments(NDataflow dataflow, OLAPContext olapContext) { - val kylinConfig = KylinConfig.getInstanceFromEnv(); - val projectName = dataflow.getProject(); - - val projectInstance = NProjectManager.getInstance(kylinConfig).getProject(projectName); - val allReadySegments = dataflow.getQueryableSegments(); - if (!projectInstance.getConfig().isHeterogeneousSegmentEnabled()) { - return allReadySegments; - } - val isStreamingFactTable = olapContext.firstTableScan.getOlapTable().getSourceTable() - .getSourceType() == ISourceAware.ID_STREAMING; - val isBatchFusionModel = isStreamingFactTable && dataflow.getModel().isFusionModel() && !dataflow.isStreaming(); - val partitionDesc = isBatchFusionModel - ? getStreamingPartitionDesc(dataflow.getModel(), kylinConfig, projectName) - : dataflow.getModel().getPartitionDesc(); - // no partition column - if (PartitionDesc.isEmptyPartitionDesc(partitionDesc)) { - log.info("No partition column"); + public static Segments<NDataSegment> pruneSegments(NDataflow dataflow, OLAPContext olapContext) { + Segments<NDataSegment> allReadySegments = dataflow.getQueryableSegments(); + if (!NProjectManager.getProjectConfig(dataflow.getProject()).isHeterogeneousSegmentEnabled()) { return allReadySegments; } - val partitionColumn = partitionDesc.getPartitionDateColumnRef(); - val dateFormat = partitionDesc.getPartitionDateFormat(); - val filterColumns = olapContext.filterColumns; - // sql filter columns do not include partition column - if (!filterColumns.contains(partitionColumn)) { - log.info("Filter columns do not contain partition column"); + // pruner segment by partition column and dataformat + PartitionDesc partitionCol = getPartitionDesc(dataflow, olapContext); + if (isFullBuildModel(partitionCol)) { + log.info("No partition column or partition column format is null."); return allReadySegments; } - val selectedSegments = Lists.<NDataSegment> newArrayList(); - var filterConditions = olapContext.getExpandedFilterConditions(); + // pruner segment by simplify sql filter val relOptCluster = olapContext.firstTableScan.getCluster(); val rexBuilder = relOptCluster.getRexBuilder(); val rexSimplify = new RexSimplify(relOptCluster.getRexBuilder(), RelOptPredicateList.EMPTY, true, relOptCluster.getPlanner().getExecutor()); - val partitionColInputRef = transformColumn2RexInputRef(partitionColumn, olapContext.allTableScans); - if (allReadySegments.size() > 0 && dateFormat != null) { + var filterConditions = olapContext.getExpandedFilterConditions(); + val dateFormat = partitionCol.getPartitionDateFormat(); + val partitionColRef = partitionCol.getPartitionDateColumnRef(); + RexInputRef partitionColInputRef = null; + if (needRewritePartitionColInFilter(dataflow, olapContext)) { + partitionColInputRef = transformColumn2RexInputRef(partitionColRef, olapContext.allTableScans); try { val firstSegmentRanges = transformSegment2RexCall(allReadySegments.get(0), dateFormat, rexBuilder, - partitionColInputRef, partitionColumn.getType(), dataflow.isStreaming()); + partitionColInputRef, partitionColRef.getType(), dataflow.isStreaming()); RelDataTypeFamily segmentLiteralTypeFamily = getSegmentLiteralTypeFamily(firstSegmentRanges.getFirst()); - filterConditions = filterConditions.stream()// - .map(filterCondition -> rewriteRexCall(filterCondition, rexBuilder, segmentLiteralTypeFamily, - partitionColInputRef, dateFormat)) - .collect(Collectors.toList()); + List<RexNode> filterRexNodeList = new ArrayList<>(); + for (RexNode filterCondition : filterConditions) { + RexNode rexNode = rewriteRexCall(filterCondition, rexBuilder, segmentLiteralTypeFamily, + partitionColInputRef, dateFormat); + filterRexNodeList.add(rexNode); + } + filterConditions = filterRexNodeList; } catch (Exception ex) { log.warn("Segment pruning error: ", ex); + if (canPruneSegmentsForMaxMeasure(dataflow, olapContext, partitionColRef)) { + return selectSegmentsForMaxMeasure(dataflow); + } return allReadySegments; } } - var simplifiedSqlFilter = rexSimplify.simplifyAnds(filterConditions); - // sql filter condition is always false + RexNode simplifiedSqlFilter = rexSimplify.simplifyAnds(filterConditions); if (simplifiedSqlFilter.isAlwaysFalse()) { log.info("SQL filter condition is always false, pruning all ready segments"); olapContext.storageContext.setFilterCondAlwaysFalse(true); - return selectedSegments; + return Segments.empty(); } - // sql filter condition is always true - if (simplifiedSqlFilter.isAlwaysTrue()) { - log.info("SQL filter condition is always true, pruning no segment"); + + // pruner segment by customized scene optimize + if (canPruneSegmentsForMaxMeasure(dataflow, olapContext, partitionColRef)) { + return selectSegmentsForMaxMeasure(dataflow); + } + + if (!olapContext.filterColumns.contains(partitionColRef)) { + log.info("Filter columns do not contain partition column"); return allReadySegments; } - if (dateFormat == null) { + if (simplifiedSqlFilter.isAlwaysTrue()) { + log.info("SQL filter condition is always true, pruning no segment"); return allReadySegments; } - for (NDataSegment dataSegment : allReadySegments) { + // prune segments by partition filter + Segments<NDataSegment> selectedSegments = pruneSegmentsByPartitionFilter(dataflow, olapContext, rexSimplify, + partitionColInputRef, simplifiedSqlFilter); + log.info("Scan segment.size: {} after segment pruning", selectedSegments.size()); + return selectedSegments; + } + + private static Segments<NDataSegment> pruneSegmentsByPartitionFilter(NDataflow dataflow, OLAPContext olapContext, + RexSimplify rexSimplify, RexInputRef partitionColInputRef, RexNode simplifiedSqlFilter) { + Segments<NDataSegment> selectedSegments = new Segments<>(); + PartitionDesc partitionCol = getPartitionDesc(dataflow, olapContext); + RexBuilder rexBuilder = olapContext.firstTableScan.getCluster().getRexBuilder(); + for (NDataSegment dataSegment : dataflow.getQueryableSegments()) { try { - val segmentRanges = transformSegment2RexCall(dataSegment, dateFormat, rexBuilder, partitionColInputRef, - partitionColumn.getType(), dataflow.isStreaming()); + val segmentRanges = transformSegment2RexCall(dataSegment, partitionCol.getPartitionDateFormat(), + rexBuilder, partitionColInputRef, partitionCol.getPartitionDateColumnRef().getType(), + dataflow.isStreaming()); // compare with segment start val segmentStartPredicate = RelOptPredicateList.of(rexBuilder, Lists.newArrayList(segmentRanges.getFirst())); @@ -197,10 +211,79 @@ public class RealizationPruner { selectedSegments.add(dataSegment); } } - log.info("Scan segment.size: {} after segment pruning", selectedSegments.size()); return selectedSegments; } + private static boolean needRewritePartitionColInFilter(NDataflow dataflow, OLAPContext olapContext) { + return !dataflow.getQueryableSegments().isEmpty() && olapContext.filterColumns + .contains(getPartitionDesc(dataflow, olapContext).getPartitionDateColumnRef()); + } + + private static boolean isFullBuildModel(PartitionDesc partitionCol) { + return PartitionDesc.isEmptyPartitionDesc(partitionCol) || partitionCol.getPartitionDateFormat() == null; + } + + private static Segments<NDataSegment> selectSegmentsForMaxMeasure(NDataflow dataflow) { + Segments<NDataSegment> selectedSegments = new Segments<>(); + long days = dataflow.getConfig().getMaxMeasureSegmentPrunerBeforeDays(); + // segment was sorted + Segments<NDataSegment> allReadySegments = dataflow.getQueryableSegments(); + long maxDt = allReadySegments.getLatestReadySegment().getTSRange().getEnd(); + long minDt = maxDt - DAY * days; + for (int i = allReadySegments.size() - 1; i >= 0; i--) { + if (allReadySegments.get(i).getTSRange().getEnd() > minDt) { + selectedSegments.add(allReadySegments.get(i)); + } else { + break; + } + } + log.info("Scan segment size: {} after max measure segment pruner. The before days: {}. Passed on segment: {}", + selectedSegments.size(), days, + selectedSegments.stream().map(ISegment::getName).collect(Collectors.joining(","))); + return selectedSegments; + } + + private static boolean canPruneSegmentsForMaxMeasure(NDataflow dataflow, OLAPContext olapContext, + TblColRef partitionColRef) { + if (dataflow.getConfig().getMaxMeasureSegmentPrunerBeforeDays() < 0) { + return false; + } + + if (CollectionUtils.isNotEmpty(olapContext.getGroupByColumns()) + && !olapContext.getGroupByColumns().stream().allMatch(partitionColRef::equals)) { + return false; + } + + if (CollectionUtils.isEmpty(olapContext.aggregations)) { + return false; + } + + for (FunctionDesc agg : olapContext.aggregations) { + if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(agg.getExpression()) + && !partitionColRef.equals(agg.getParameters().get(0).getColRef())) { + return false; + } + if (!FunctionDesc.FUNC_MAX.equalsIgnoreCase(agg.getExpression()) + && CollectionUtils.isNotEmpty(agg.getParameters())) { + return false; + } + } + + return true; + } + + private static PartitionDesc getPartitionDesc(NDataflow dataflow, OLAPContext olapContext) { + NDataModel model = dataflow.getModel(); + val isStreamingFactTable = olapContext.firstTableScan.getOlapTable().getSourceTable() + .getSourceType() == ISourceAware.ID_STREAMING; + val isBatchFusionModel = isStreamingFactTable && dataflow.getModel().isFusionModel() && !dataflow.isStreaming(); + if (!isBatchFusionModel) { + return model.getPartitionDesc(); + } + return NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), dataflow.getProject()) + .getDataModelDesc(model.getFusionId()).getPartitionDesc(); + } + public static RexNode rewriteRexCall(RexNode rexNode, RexBuilder rexBuilder, RelDataTypeFamily relDataTypeFamily, RexInputRef partitionColInputRef, String dateFormat) { if (!(rexNode instanceof RexCall)) { @@ -306,12 +389,6 @@ public class RealizationPruner { return null; } - private static PartitionDesc getStreamingPartitionDesc(NDataModel model, KylinConfig kylinConfig, String project) { - NDataModelManager modelManager = NDataModelManager.getInstance(kylinConfig, project); - NDataModel streamingModel = modelManager.getDataModelDesc(model.getFusionId()); - return streamingModel.getPartitionDesc(); - } - private static Pair<RexNode, RexNode> transformSegment2RexCall(NDataSegment dataSegment, String dateFormat, RexBuilder rexBuilder, RexInputRef partitionColInputRef, DataType partitionColType, boolean isStreaming) { String start; diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index 4f1c9c1fba..46a0ca4b2c 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -2104,17 +2104,17 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase { PrepareSqlStateParam[] params1 = new PrepareSqlStateParam[2]; params1[0] = new PrepareSqlStateParam(Double.class.getCanonicalName(), "123.1"); params1[1] = new PrepareSqlStateParam(Integer.class.getCanonicalName(), "123"); - String originSql1 = "with t1 as (select ORDER_ID, PRICE > ? from test_kylin_fact where ORDER_ID = ?)\n, " - + "t2 as (select bf from test_kylin_fact)\n" + "select * from t1 where ORDER_ID = '100'\n" // + String originSql1 = "with t1 as (select ORDER_ID, PRICE > ? from test_kylin_fact where ORDER_ID > ?)\n, " + + "t2 as (select bf from test_kylin_fact)\n" + "select * from t1 where ORDER_ID = '125'\n" // + "union all\n" // - + "select * from t1 where ORDER_ID = '200'"; - String filledSql1 = "with t1 as (select ORDER_ID, PRICE > 123.1 from test_kylin_fact where ORDER_ID = 123)\n" - + ", t2 as (select bf from test_kylin_fact)\n" + "select * from t1 where ORDER_ID = '100'\n" - + "union all\n" + "select * from t1 where ORDER_ID = '200'"; + + "select * from t1 where ORDER_ID < '200'"; + String filledSql1 = "with t1 as (select ORDER_ID, PRICE > 123.1 from test_kylin_fact where ORDER_ID > 123)\n" + + ", t2 as (select bf from test_kylin_fact)\n" + "select * from t1 where ORDER_ID = '125'\n" + + "union all\n" + "select * from t1 where ORDER_ID < '200'"; String transformedFilledSql1 = "SELECT *\n" + "FROM (SELECT ORDER_ID, PRICE > 123.1\n" - + "FROM TEST_KYLIN_FACT\n" + "WHERE ORDER_ID = 123) AS T1\n" + "WHERE ORDER_ID = '100'\n" + + "FROM TEST_KYLIN_FACT\n" + "WHERE ORDER_ID > 123) AS T1\n" + "WHERE ORDER_ID = '125'\n" + "UNION ALL\n" + "SELECT *\n" + "FROM (SELECT ORDER_ID, PRICE > 123.1\n" + "FROM TEST_KYLIN_FACT\n" - + "WHERE ORDER_ID = 123) AS T1\n" + "WHERE ORDER_ID = '200'"; + + "WHERE ORDER_ID > 123) AS T1\n" + "WHERE ORDER_ID < '200'"; queryWithParamWhenTransformWithToSubQuery(params1, originSql1, filledSql1, transformedFilledSql1); PrepareSqlStateParam[] params2 = new PrepareSqlStateParam[1]; diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/SelectRealizationTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/SelectRealizationTest.java index f5ab4fbe1b..453ae67d60 100644 --- a/src/query/src/test/java/org/apache/kylin/query/engine/SelectRealizationTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/engine/SelectRealizationTest.java @@ -31,11 +31,13 @@ import org.apache.calcite.prepare.Prepare; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexExecutorImpl; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.kylin.common.debug.BackdoorToggles; -import org.apache.kylin.query.calcite.KylinRelDataTypeSystem; import org.apache.kylin.junit.annotation.MetadataInfo; import org.apache.kylin.query.QueryExtension; +import org.apache.kylin.query.calcite.KylinRelDataTypeSystem; +import org.apache.kylin.query.engine.meta.SimpleDataContext; import org.apache.kylin.query.util.QueryContextCutter; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -68,6 +70,9 @@ class SelectRealizationTest { String defaultSchemaName = schemaFactory.getDefaultSchema(); val catalogReader = createCatalogReader(config, rootSchema, defaultSchemaName); val planner = new PlannerFactory(kylinConfig).createVolcanoPlanner(config); + SimpleDataContext dataContext = new SimpleDataContext(rootSchema.plus(), TypeSystem.javaTypeFactory(), + kylinConfig); + planner.setExecutor(new RexExecutorImpl(dataContext)); val sqlConverter = SQLConverter.createConverter(config, planner, catalogReader); val queryOptimizer = new QueryOptimizer(planner); RelRoot relRoot = sqlConverter