This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 609deb43a06370bee98a5f6e8e8dcb32867ccba8 Author: Yaguang Jia <jiayagu...@foxmail.com> AuthorDate: Fri Apr 7 14:47:28 2023 +0800 KYLIN-5602 add log for derived segment prune (#30254) * KYLIN-5602 add log for derived segment prune --- .../apache/kylin/rest/service/QueryService.java | 19 +++++++++++++++-- .../sql/execution/KylinFileSourceScanExec.scala | 4 ++-- .../sql/execution/datasource/FilePruner.scala | 24 ++++++++++++++++------ .../execution/datasource/KylinSourceStrategy.scala | 9 ++++---- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index d55ca6dcc5..3a821568d4 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -122,6 +122,7 @@ import org.apache.kylin.query.engine.data.QueryResult; import org.apache.kylin.query.engine.data.TableSchema; import org.apache.kylin.query.exception.NotSupportedSQLException; import org.apache.kylin.query.exception.UserStopQueryException; +import org.apache.kylin.query.relnode.ContextUtil; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.util.QueryLimiter; import org.apache.kylin.query.util.QueryModelPriorities; @@ -415,13 +416,20 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup sql = request.getSql(); Collection<String> snapShots; + Collection<String> snapShotFilters; if (response.getNativeRealizations() == null) { snapShots = Lists.newArrayList(); + snapShotFilters = Lists.newArrayList(); } else { snapShots = response.getNativeRealizations().stream() .flatMap(nativeQueryRealization -> nativeQueryRealization.getSnapshots().stream()).distinct() .collect(Collectors.toList()); + snapShotFilters = ContextUtil + .listContexts().stream().flatMap(ctx -> ctx.filterColumns.stream() + .filter(col -> snapShots.contains(col.getTable())).map(TblColRef::getCanonicalName)) + .collect(Collectors.toList()); } + boolean isDerived = !snapShots.isEmpty() && layoutIds.stream().anyMatch(id -> !StringUtils.equals("-1", id)); String errorMsg = response.getExceptionMessage(); if (StringUtils.isNotBlank(errorMsg)) { @@ -436,7 +444,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup .put(LogReport.PROJECT, request.getProject()).put(LogReport.REALIZATION_NAMES, modelNames) .put(LogReport.INDEX_LAYOUT_IDS, layoutIds).put(LogReport.IS_PARTIAL_MATCH_MODEL, isPartialMatchModel) .put(LogReport.SCAN_ROWS, response.getScanRows()) - .put(LogReport.TOTAL_SCAN_ROWS, response.getTotalScanRows()).put(LogReport.SNAPSHOTS, snapShots) + .put(LogReport.TOTAL_SCAN_ROWS, response.getTotalScanRows()).put(LogReport.IS_DERIVED, isDerived) + .put(LogReport.SNAPSHOTS, snapShots).put(LogReport.SNAPSHOT_FILTERS, snapShotFilters) .put(LogReport.SCAN_BYTES, response.getScanBytes()) .put(LogReport.TOTAL_SCAN_BYTES, response.getTotalScanBytes()) .put(LogReport.RESULT_ROW_COUNT, resultRowCount) @@ -1461,6 +1470,9 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup static final String REALIZATION_NAMES = "realization"; static final String INDEX_LAYOUT_IDS = "layout"; static final String SNAPSHOTS = "snapshots"; + static final String IS_DERIVED = "is_derived"; + static final String SNAPSHOT_FILTERS = "snapshot_filters"; + static final String IS_PARTIAL_MATCH_MODEL = "is_partial_match"; static final String SCAN_ROWS = "scan_rows"; static final String TOTAL_SCAN_ROWS = "total_scan_rows"; @@ -1492,7 +1504,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup static final ImmutableMap<String, String> O2N = new ImmutableMap.Builder<String, String>() .put(QUERY_ID, "Query Id: ").put(SQL, "SQL: ").put(USER, "User: ").put(SUCCESS, "Success: ") .put(DURATION, "Duration: ").put(PROJECT, "Project: ").put(REALIZATION_NAMES, "Realization Names: ") - .put(INDEX_LAYOUT_IDS, "Index Layout Ids: ").put(SNAPSHOTS, "Snapshot Names: ") + .put(INDEX_LAYOUT_IDS, "Index Layout Ids: ").put(IS_DERIVED, "Is Dervied: ") + .put(SNAPSHOTS, "Snapshot Names: ").put(SNAPSHOT_FILTERS, "Snapshot Filter: ") .put(IS_PARTIAL_MATCH_MODEL, "Is Partial Match Model: ").put(SCAN_ROWS, "Scan rows: ") .put(TOTAL_SCAN_ROWS, "Total Scan rows: ").put(SCAN_BYTES, "Scan bytes: ") .put(TOTAL_SCAN_BYTES, "Total Scan Bytes: ").put(RESULT_ROW_COUNT, "Result Row Count: ") @@ -1540,7 +1553,9 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup + O2N.get(PROJECT) + get(PROJECT) + newLine // + O2N.get(REALIZATION_NAMES) + get(REALIZATION_NAMES) + newLine // + O2N.get(INDEX_LAYOUT_IDS) + get(INDEX_LAYOUT_IDS) + newLine // + + O2N.get(IS_DERIVED) + get(IS_DERIVED) + newLine // + O2N.get(SNAPSHOTS) + get(SNAPSHOTS) + newLine // + + O2N.get(SNAPSHOT_FILTERS) + get(SNAPSHOT_FILTERS) + newLine // + O2N.get(IS_PARTIAL_MATCH_MODEL) + get(IS_PARTIAL_MATCH_MODEL) + newLine // + O2N.get(SCAN_ROWS) + get(SCAN_ROWS) + newLine // + O2N.get(TOTAL_SCAN_ROWS) + get(TOTAL_SCAN_ROWS) + newLine // diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala index 1c47d99eb8..0c3cdc76d0 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala @@ -57,8 +57,8 @@ class KylinFileSourceScanExec( .flatMap(filter => filter.asInstanceOf[BloomAndRangeFilterExpression].rangeRow) logInfo(s"Extra runtime filters from BloomAndRangeFilterExpression to " + s"prune segment: ${rangeRuntimeFilters.mkString(",")}") - val collectFilters = dataFilters ++ rangeRuntimeFilters - val ret = relation.location.listFiles(partitionFilters, collectFilters) + val filePruner = relation.location.asInstanceOf[FilePruner] + val ret = filePruner.listFiles(partitionFilters, dataFilters, rangeRuntimeFilters) val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 metrics("numFiles").add(ret.map(_.files.size.toLong).sum) diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index bcfc71910c..b664963211 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -214,16 +214,22 @@ class FilePruner(val session: SparkSession, } } - var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), (Seq[PartitionDirectory], Long)]() + var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression], Seq[Expression]), (Seq[PartitionDirectory], Long)]() override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - if (cached.containsKey((partitionFilters, dataFilters))) { - return cached.get((partitionFilters, dataFilters))._1 + listFiles(partitionFilters, dataFilters, Seq.empty[Expression]); + } + + def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression], + derivedFilters: Seq[Expression]): Seq[PartitionDirectory] = { + if (cached.containsKey((partitionFilters, dataFilters, derivedFilters))) { + return cached.get((partitionFilters, dataFilters, derivedFilters))._1 } require(isResolved) val timePartitionFilters = getSpecFilter(dataFilters, timePartitionColumn) val dimFilters = getDimFilter(dataFilters, timePartitionColumn, shardByColumn) + val derivedDimFilters = getDimFilter(derivedFilters, timePartitionColumn, shardByColumn) logInfoIf(timePartitionFilters.nonEmpty)(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}") // segment pruning @@ -241,18 +247,24 @@ class FilePruner(val session: SparkSession, pruneSegments } val filteredSizeAfterTimePartition = selected.size + var filteredSizeAfterDimensionFilter = selected.size if (projectKylinConfig.isDimensionRangeFilterEnabled) { selected = afterPruning("pruning segment with dimension range", dimFilters, selected) { pruneSegmentsDimRange } + filteredSizeAfterDimensionFilter = selected.size + selected = afterPruning("pruning segment with derived dimension range", derivedDimFilters, selected) { + pruneSegmentsDimRange + } } QueryContext.current().record("seg_pruning") QueryContext.current().getMetrics.setSegCount(selected.size) logInfo(s"Segment Num: Before filter: ${prunedSegmentDirs.size}, After time partition filter: " + - s"$filteredSizeAfterTimePartition, After dimension filter: ${selected.size}.") + s"$filteredSizeAfterTimePartition, After dimension filter: ${filteredSizeAfterDimensionFilter}, " + + s"After derived dimension filter: ${selected.size}.") selected = selected.par.map { e => val logString = s"[fetch file status for Segment ID: ${e.segmentID}; Partition Num: ${e.partitions.size}]" logTime(logString, true) { @@ -288,11 +300,11 @@ class FilePruner(val session: SparkSession, setShufflePartitions(totalFileSize, sourceRows, session) if (selected.isEmpty) { val value = Seq.empty[PartitionDirectory] - cached.put((partitionFilters, dataFilters), (value, sourceRows)) + cached.put((partitionFilters, dataFilters, derivedFilters), (value, sourceRows)) value } else { val value = Seq(PartitionDirectory(InternalRow.empty, selected.flatMap(_.files))) - cached.put((partitionFilters, dataFilters), (value, sourceRows)) + cached.put((partitionFilters, dataFilters, derivedFilters), (value, sourceRows)) value } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala index bd8784482d..f22842939e 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql.execution.datasource import org.apache.kylin.common.QueryContext import org.apache.kylin.engine.spark.utils.LogEx import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, ExpressionSet, NamedExpression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} @@ -48,8 +48,8 @@ import org.apache.spark.sql.{Strategy, execution} * - If any file is larger than the threshold, split it into pieces based on that threshold * - Sort the files by decreasing file size. * - Assign the ordered files to buckets using the following algorithm. If the current partition - * is under the threshold with the addition of the next file, add it. If not, open a new bucket - * and add it. Proceed to the next file. + * is under the threshold with the addition of the next file, add it. If not, open a new bucket + * and add it. Proceed to the next file. */ object KylinSourceStrategy extends Strategy with LogEx { @@ -122,7 +122,8 @@ object KylinSourceStrategy extends Strategy with LogEx { logTime("listFiles", debug = true) { filePruner.listFiles(partitionKeyFilters.iterator.toSeq, dataFilters.iterator.toSeq) } - val sourceScanRows = filePruner.cached.get((partitionKeyFilters.iterator.toSeq, dataFilters.iterator.toSeq))._2 + val sourceScanRows = filePruner.cached.get((partitionKeyFilters.iterator.toSeq, + dataFilters.iterator.toSeq, Seq.empty[Expression]))._2 QueryContext.current().getMetrics.addAccumSourceScanRows(sourceScanRows) val scan = new KylinFileSourceScanExec(