This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 609deb43a06370bee98a5f6e8e8dcb32867ccba8
Author: Yaguang Jia <jiayagu...@foxmail.com>
AuthorDate: Fri Apr 7 14:47:28 2023 +0800

    KYLIN-5602 add log for derived segment prune (#30254)
    
    * KYLIN-5602 add log for derived segment prune
---
 .../apache/kylin/rest/service/QueryService.java    | 19 +++++++++++++++--
 .../sql/execution/KylinFileSourceScanExec.scala    |  4 ++--
 .../sql/execution/datasource/FilePruner.scala      | 24 ++++++++++++++++------
 .../execution/datasource/KylinSourceStrategy.scala |  9 ++++----
 4 files changed, 42 insertions(+), 14 deletions(-)

diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index d55ca6dcc5..3a821568d4 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -122,6 +122,7 @@ import org.apache.kylin.query.engine.data.QueryResult;
 import org.apache.kylin.query.engine.data.TableSchema;
 import org.apache.kylin.query.exception.NotSupportedSQLException;
 import org.apache.kylin.query.exception.UserStopQueryException;
+import org.apache.kylin.query.relnode.ContextUtil;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.util.QueryLimiter;
 import org.apache.kylin.query.util.QueryModelPriorities;
@@ -415,13 +416,20 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
             sql = request.getSql();
 
         Collection<String> snapShots;
+        Collection<String> snapShotFilters;
         if (response.getNativeRealizations() == null) {
             snapShots = Lists.newArrayList();
+            snapShotFilters = Lists.newArrayList();
         } else {
             snapShots = response.getNativeRealizations().stream()
                     .flatMap(nativeQueryRealization -> 
nativeQueryRealization.getSnapshots().stream()).distinct()
                     .collect(Collectors.toList());
+            snapShotFilters = ContextUtil
+                    .listContexts().stream().flatMap(ctx -> 
ctx.filterColumns.stream()
+                            .filter(col -> 
snapShots.contains(col.getTable())).map(TblColRef::getCanonicalName))
+                    .collect(Collectors.toList());
         }
+        boolean isDerived = !snapShots.isEmpty() && 
layoutIds.stream().anyMatch(id -> !StringUtils.equals("-1", id));
 
         String errorMsg = response.getExceptionMessage();
         if (StringUtils.isNotBlank(errorMsg)) {
@@ -436,7 +444,8 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
                 .put(LogReport.PROJECT, 
request.getProject()).put(LogReport.REALIZATION_NAMES, modelNames)
                 .put(LogReport.INDEX_LAYOUT_IDS, 
layoutIds).put(LogReport.IS_PARTIAL_MATCH_MODEL, isPartialMatchModel)
                 .put(LogReport.SCAN_ROWS, response.getScanRows())
-                .put(LogReport.TOTAL_SCAN_ROWS, 
response.getTotalScanRows()).put(LogReport.SNAPSHOTS, snapShots)
+                .put(LogReport.TOTAL_SCAN_ROWS, 
response.getTotalScanRows()).put(LogReport.IS_DERIVED, isDerived)
+                .put(LogReport.SNAPSHOTS, 
snapShots).put(LogReport.SNAPSHOT_FILTERS, snapShotFilters)
                 .put(LogReport.SCAN_BYTES, response.getScanBytes())
                 .put(LogReport.TOTAL_SCAN_BYTES, response.getTotalScanBytes())
                 .put(LogReport.RESULT_ROW_COUNT, resultRowCount)
@@ -1461,6 +1470,9 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
         static final String REALIZATION_NAMES = "realization";
         static final String INDEX_LAYOUT_IDS = "layout";
         static final String SNAPSHOTS = "snapshots";
+        static final String IS_DERIVED = "is_derived";
+        static final String SNAPSHOT_FILTERS = "snapshot_filters";
+
         static final String IS_PARTIAL_MATCH_MODEL = "is_partial_match";
         static final String SCAN_ROWS = "scan_rows";
         static final String TOTAL_SCAN_ROWS = "total_scan_rows";
@@ -1492,7 +1504,8 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
         static final ImmutableMap<String, String> O2N = new 
ImmutableMap.Builder<String, String>()
                 .put(QUERY_ID, "Query Id: ").put(SQL, "SQL: ").put(USER, 
"User: ").put(SUCCESS, "Success: ")
                 .put(DURATION, "Duration: ").put(PROJECT, "Project: 
").put(REALIZATION_NAMES, "Realization Names: ")
-                .put(INDEX_LAYOUT_IDS, "Index Layout Ids: ").put(SNAPSHOTS, 
"Snapshot Names: ")
+                .put(INDEX_LAYOUT_IDS, "Index Layout Ids: ").put(IS_DERIVED, 
"Is Dervied: ")
+                .put(SNAPSHOTS, "Snapshot Names: ").put(SNAPSHOT_FILTERS, 
"Snapshot Filter: ")
                 .put(IS_PARTIAL_MATCH_MODEL, "Is Partial Match Model: 
").put(SCAN_ROWS, "Scan rows: ")
                 .put(TOTAL_SCAN_ROWS, "Total Scan rows: ").put(SCAN_BYTES, 
"Scan bytes: ")
                 .put(TOTAL_SCAN_BYTES, "Total Scan Bytes: 
").put(RESULT_ROW_COUNT, "Result Row Count: ")
@@ -1540,7 +1553,9 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
                     + O2N.get(PROJECT) + get(PROJECT) + newLine //
                     + O2N.get(REALIZATION_NAMES) + get(REALIZATION_NAMES) + 
newLine //
                     + O2N.get(INDEX_LAYOUT_IDS) + get(INDEX_LAYOUT_IDS) + 
newLine //
+                    + O2N.get(IS_DERIVED) + get(IS_DERIVED) + newLine //
                     + O2N.get(SNAPSHOTS) + get(SNAPSHOTS) + newLine //
+                    + O2N.get(SNAPSHOT_FILTERS) + get(SNAPSHOT_FILTERS) + 
newLine //
                     + O2N.get(IS_PARTIAL_MATCH_MODEL) + 
get(IS_PARTIAL_MATCH_MODEL) + newLine //
                     + O2N.get(SCAN_ROWS) + get(SCAN_ROWS) + newLine //
                     + O2N.get(TOTAL_SCAN_ROWS) + get(TOTAL_SCAN_ROWS) + 
newLine //
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index 1c47d99eb8..0c3cdc76d0 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -57,8 +57,8 @@ class KylinFileSourceScanExec(
       .flatMap(filter => 
filter.asInstanceOf[BloomAndRangeFilterExpression].rangeRow)
     logInfo(s"Extra runtime filters from BloomAndRangeFilterExpression to " +
       s"prune segment: ${rangeRuntimeFilters.mkString(",")}")
-    val collectFilters = dataFilters ++ rangeRuntimeFilters
-    val ret = relation.location.listFiles(partitionFilters, collectFilters)
+    val filePruner = relation.location.asInstanceOf[FilePruner]
+    val ret = filePruner.listFiles(partitionFilters, dataFilters, 
rangeRuntimeFilters)
     val timeTakenMs = ((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs) / 1000 / 1000
 
     metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index bcfc71910c..b664963211 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -214,16 +214,22 @@ class FilePruner(val session: SparkSession,
     }
   }
 
-  var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), 
(Seq[PartitionDirectory], Long)]()
+  var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression], 
Seq[Expression]), (Seq[PartitionDirectory], Long)]()
 
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
-    if (cached.containsKey((partitionFilters, dataFilters))) {
-      return cached.get((partitionFilters, dataFilters))._1
+    listFiles(partitionFilters, dataFilters, Seq.empty[Expression]);
+  }
+
+  def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression],
+                derivedFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+    if (cached.containsKey((partitionFilters, dataFilters, derivedFilters))) {
+      return cached.get((partitionFilters, dataFilters, derivedFilters))._1
     }
 
     require(isResolved)
     val timePartitionFilters = getSpecFilter(dataFilters, timePartitionColumn)
     val dimFilters = getDimFilter(dataFilters, timePartitionColumn, 
shardByColumn)
+    val derivedDimFilters = getDimFilter(derivedFilters, timePartitionColumn, 
shardByColumn)
     logInfoIf(timePartitionFilters.nonEmpty)(s"Applying time partition 
filters: ${timePartitionFilters.mkString(",")}")
 
     // segment pruning
@@ -241,18 +247,24 @@ class FilePruner(val session: SparkSession,
       pruneSegments
     }
     val filteredSizeAfterTimePartition = selected.size
+    var filteredSizeAfterDimensionFilter = selected.size
 
     if (projectKylinConfig.isDimensionRangeFilterEnabled) {
       selected = afterPruning("pruning segment with dimension range", 
dimFilters, selected) {
         pruneSegmentsDimRange
       }
+      filteredSizeAfterDimensionFilter = selected.size
+      selected = afterPruning("pruning segment with derived dimension range", 
derivedDimFilters, selected) {
+        pruneSegmentsDimRange
+      }
     }
 
     QueryContext.current().record("seg_pruning")
     QueryContext.current().getMetrics.setSegCount(selected.size)
 
     logInfo(s"Segment Num: Before filter: ${prunedSegmentDirs.size}, After 
time partition filter: " +
-      s"$filteredSizeAfterTimePartition, After dimension filter: 
${selected.size}.")
+      s"$filteredSizeAfterTimePartition, After dimension filter: 
${filteredSizeAfterDimensionFilter}, " +
+      s"After derived dimension filter: ${selected.size}.")
     selected = selected.par.map { e =>
       val logString = s"[fetch file status for Segment ID: ${e.segmentID}; 
Partition Num: ${e.partitions.size}]"
       logTime(logString, true) {
@@ -288,11 +300,11 @@ class FilePruner(val session: SparkSession,
     setShufflePartitions(totalFileSize, sourceRows, session)
     if (selected.isEmpty) {
       val value = Seq.empty[PartitionDirectory]
-      cached.put((partitionFilters, dataFilters), (value, sourceRows))
+      cached.put((partitionFilters, dataFilters, derivedFilters), (value, 
sourceRows))
       value
     } else {
       val value = Seq(PartitionDirectory(InternalRow.empty, 
selected.flatMap(_.files)))
-      cached.put((partitionFilters, dataFilters), (value, sourceRows))
+      cached.put((partitionFilters, dataFilters, derivedFilters), (value, 
sourceRows))
       value
     }
 
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
index bd8784482d..f22842939e 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.execution.datasource
 import org.apache.kylin.common.QueryContext
 import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, ExpressionSet, NamedExpression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
@@ -48,8 +48,8 @@ import org.apache.spark.sql.{Strategy, execution}
  *   - If any file is larger than the threshold, split it into pieces based on 
that threshold
  *   - Sort the files by decreasing file size.
  *   - Assign the ordered files to buckets using the following algorithm.  If 
the current partition
- * is under the threshold with the addition of the next file, add it.  If not, 
open a new bucket
- * and add it.  Proceed to the next file.
+ *     is under the threshold with the addition of the next file, add it.  If 
not, open a new bucket
+ *     and add it.  Proceed to the next file.
  */
 object KylinSourceStrategy extends Strategy with LogEx {
 
@@ -122,7 +122,8 @@ object KylinSourceStrategy extends Strategy with LogEx {
       logTime("listFiles", debug = true) {
         filePruner.listFiles(partitionKeyFilters.iterator.toSeq, 
dataFilters.iterator.toSeq)
       }
-      val sourceScanRows = 
filePruner.cached.get((partitionKeyFilters.iterator.toSeq, 
dataFilters.iterator.toSeq))._2
+      val sourceScanRows = 
filePruner.cached.get((partitionKeyFilters.iterator.toSeq,
+        dataFilters.iterator.toSeq, Seq.empty[Expression]))._2
       QueryContext.current().getMetrics.addAccumSourceScanRows(sourceScanRows)
       val scan =
         new KylinFileSourceScanExec(

Reply via email to