This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5_beta
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit fba85d28310e8d8eb01291cefdeec6a248336790
Author: Yaguang Jia <jiayagu...@foxmail.com>
AuthorDate: Mon May 8 17:49:35 2023 +0800

    KYLIN-5653 add page index filter log (#30361)
---
 pom.xml                                            |  4 +-
 .../src/main/resources/config/init.properties      |  4 +-
 .../src/main/resources/kylin-defaults0.properties  |  4 +-
 .../src/main/resources/config/init.properties      |  4 +-
 .../org/apache/kylin/newten/BloomFilterTest.java   |  7 +-
 .../src/main/resources/config/init.properties      |  4 +-
 .../apache/kylin/rest/service/QueryService.java    |  2 +
 .../src/main/resources/config/init.properties      |  4 +-
 .../scala/org/apache/spark/sql/SparderEnv.scala    |  4 +-
 .../spark/filter/ParquetPageFilterCollector.java   | 87 ++++++++++++++++++++++
 .../filter/ParquetPageFilterCollectorTest.java     | 49 ++++++++++++
 11 files changed, 153 insertions(+), 20 deletions(-)

diff --git a/pom.xml b/pom.xml
index 46f61ffa70..fed587f46c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,7 +124,7 @@
 
         <!-- Spark versions -->
         <delta.version>2.0.2</delta.version>
-        <spark.version>3.2.0-kylin-4.6.8.0-SNAPSHOT</spark.version>
+        <spark.version>3.2.0-kylin-4.6.9.0-SNAPSHOT</spark.version>
 
         <roaring.version>0.9.2-kylin-r4</roaring.version>
 
@@ -315,7 +315,7 @@
         <zkclient.version>0.8</zkclient.version>
         <grpc.version>1.0.2</grpc.version>
         <fastPFOR.version>0.0.13</fastPFOR.version>
-        <parquet.version>1.12.2-kylin-r4</parquet.version>
+        <parquet.version>1.12.2-kylin-r5</parquet.version>
         <quartz.version>2.1.1</quartz.version>
         <janino.version>3.0.9</janino.version>
 
diff --git a/src/common-booter/src/main/resources/config/init.properties 
b/src/common-booter/src/main/resources/config/init.properties
index 3ff1076256..2bddfae312 100644
--- a/src/common-booter/src/main/resources/config/init.properties
+++ b/src/common-booter/src/main/resources/config/init.properties
@@ -160,9 +160,7 @@ 
kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/sp
 # to avoid cartesian partition oom, set to -1 or empty to turn off
 kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1
 
-# disable parquet columnindex to save the overhead on read column index
-# as column index won't be used by spark vectorized reader for now
-kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false
+kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true
 
 # spark3 legacy config after calendar switch
 
kylin.storage.columnar.spark-conf.spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY
diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties 
b/src/core-common/src/main/resources/kylin-defaults0.properties
index 657f55c1e2..022ac5e57d 100644
--- a/src/core-common/src/main/resources/kylin-defaults0.properties
+++ b/src/core-common/src/main/resources/kylin-defaults0.properties
@@ -163,9 +163,7 @@ 
kylin.storage.columnar.spark-conf.spark.sql.catalog.spark_catalog=org.apache.spa
 # to avoid cartesian partition oom, set to -1 or empty to turn off
 kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1
 
-# disable parquet columnindex to save the overhead on read column index
-# as column index won't be used by spark vectorized reader for now
-kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false
+kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true
 
 # spark3 legacy config after calendar switch
 
kylin.storage.columnar.spark-conf.spark.sql.parquet.int96RebaseModeInWrite=LEGACY
diff --git a/src/data-loading-booter/src/main/resources/config/init.properties 
b/src/data-loading-booter/src/main/resources/config/init.properties
index 3ff1076256..2bddfae312 100644
--- a/src/data-loading-booter/src/main/resources/config/init.properties
+++ b/src/data-loading-booter/src/main/resources/config/init.properties
@@ -160,9 +160,7 @@ 
kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/sp
 # to avoid cartesian partition oom, set to -1 or empty to turn off
 kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1
 
-# disable parquet columnindex to save the overhead on read column index
-# as column index won't be used by spark vectorized reader for now
-kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false
+kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true
 
 # spark3 legacy config after calendar switch
 
kylin.storage.columnar.spark-conf.spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java
index 9dcebdebe8..996aef9571 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java
@@ -115,6 +115,11 @@ public class BloomFilterTest extends 
NLocalWithSparkSessionTest implements Adapt
 
     @Test
     public void testBuildBloomFilter() throws Exception {
+        Path projectFilterPath = getProjectFiltersFile(SERVER_HOST, 
getProject());
+        FileSystem fs = 
HadoopUtil.getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+        if (fs.exists(projectFilterPath)) {
+            fs.delete(projectFilterPath, true);
+        }
         String dfID = "c41390c5-b93d-4db3-b167-029874b85a2c";
         NDataflow dataflow = dfMgr.getDataflow(dfID);
         LayoutEntity layout = 
dataflow.getIndexPlan().getLayoutEntity(20000000001L);
@@ -137,8 +142,6 @@ public class BloomFilterTest extends 
NLocalWithSparkSessionTest implements Adapt
         String sql1 = "select * from SSB.P_LINEORDER where LO_CUSTKEY in 
(13,8) and LO_SHIPPRIOTITY = 0 ";
         query.add(Pair.newPair("bloomfilter", sql1));
         ExecAndComp.execAndCompare(query, getProject(), 
ExecAndComp.CompareLevel.NONE, "inner");
-        Path projectFilterPath = getProjectFiltersFile(SERVER_HOST, 
getProject());
-        FileSystem fs = 
HadoopUtil.getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
         // wait until `QueryFiltersCollector` record filter info
         await().atMost(120, TimeUnit.SECONDS).until(() -> {
             try {
diff --git a/src/query-booter/src/main/resources/config/init.properties 
b/src/query-booter/src/main/resources/config/init.properties
index 3ff1076256..2bddfae312 100644
--- a/src/query-booter/src/main/resources/config/init.properties
+++ b/src/query-booter/src/main/resources/config/init.properties
@@ -160,9 +160,7 @@ 
kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/sp
 # to avoid cartesian partition oom, set to -1 or empty to turn off
 kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1
 
-# disable parquet columnindex to save the overhead on read column index
-# as column index won't be used by spark vectorized reader for now
-kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false
+kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true
 
 # spark3 legacy config after calendar switch
 
kylin.storage.columnar.spark-conf.spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 333dc2857b..aae6a5edfb 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -81,6 +81,7 @@ import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector;
+import org.apache.kylin.engine.spark.filter.ParquetPageFilterCollector;
 import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
 import org.apache.kylin.guava30.shaded.common.base.Joiner;
 import org.apache.kylin.guava30.shaded.common.collect.Collections2;
@@ -438,6 +439,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
         }
 
         
BloomFilterSkipCollector.logAndCleanStatus(QueryContext.current().getQueryId());
+        
ParquetPageFilterCollector.logParquetPages(QueryContext.current().getQueryId());
         LogReport report = new LogReport().put(LogReport.QUERY_ID, 
QueryContext.current().getQueryId())
                 .put(LogReport.SQL, sql).put(LogReport.USER, user)
                 .put(LogReport.SUCCESS, null == 
response.getExceptionMessage()).put(LogReport.DURATION, duration)
diff --git a/src/server/src/main/resources/config/init.properties 
b/src/server/src/main/resources/config/init.properties
index 3ff1076256..2bddfae312 100644
--- a/src/server/src/main/resources/config/init.properties
+++ b/src/server/src/main/resources/config/init.properties
@@ -160,9 +160,7 @@ 
kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/sp
 # to avoid cartesian partition oom, set to -1 or empty to turn off
 kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1
 
-# disable parquet columnindex to save the overhead on read column index
-# as column index won't be used by spark vectorized reader for now
-kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false
+kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true
 
 # spark3 legacy config after calendar switch
 
kylin.storage.columnar.spark-conf.spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index c31e151202..8961a52708 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -31,7 +31,7 @@ import org.apache.kylin.common.exception.{KylinException, 
KylinTimeoutException,
 import org.apache.kylin.common.msg.MsgPicker
 import org.apache.kylin.common.util.{DefaultHostInfoFetcher, HadoopUtil, 
S3AUtil}
 import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
-import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector
+import org.apache.kylin.engine.spark.filter.{BloomFilterSkipCollector, 
ParquetPageFilterCollector}
 import org.apache.kylin.metadata.model.{NTableMetadataManager, TableExtDesc}
 import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
@@ -326,6 +326,8 @@ object SparderEnv extends Logging {
               inputMetrics.totalBloomBlocks, inputMetrics.totalSkipBloomBlocks,
               inputMetrics.totalSkipBloomRows, inputMetrics.footerReadTime,
               inputMetrics.footerReadNumber)
+            ParquetPageFilterCollector.addQueryMetrics(taskEnd.queryId, 
inputMetrics.totalPagesCount,
+              inputMetrics.filteredPagesCount, 
inputMetrics.afterFilterPagesCount)
           }
         } catch {
           case e: Throwable => logWarning("error when add metrics for query", 
e)
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollector.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollector.java
new file mode 100644
index 0000000000..a50224e2cc
--- /dev/null
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollector.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.filter;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.kylin.common.util.AutoReadWriteLock;
+import org.apache.kylin.guava30.shaded.common.cache.Cache;
+import org.apache.kylin.guava30.shaded.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ParquetPageFilterCollector {
+
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(ParquetPageFilterCollector.class);
+    public static final Cache<String, AtomicLong> queryTotalParquetPages = 
CacheBuilder.newBuilder()
+            .expireAfterAccess(10, TimeUnit.MINUTES).build();
+    public static final Cache<String, AtomicLong> queryFilteredParquetPages = 
CacheBuilder.newBuilder()
+            .expireAfterAccess(10, TimeUnit.MINUTES).build();
+    public static final Cache<String, AtomicLong> queryAfterFilterParquetPages 
= CacheBuilder.newBuilder()
+            .expireAfterAccess(10, TimeUnit.MINUTES).build();
+    private static final AutoReadWriteLock LOCK = new AutoReadWriteLock(new 
ReentrantReadWriteLock());
+
+    private ParquetPageFilterCollector() {
+        // for sonar
+    }
+
+    public static void addQueryMetrics(String queryId, long totalPages, long 
filteredPages, long afterFilterPages) {
+        long start = System.currentTimeMillis();
+        try (AutoReadWriteLock.AutoLock writeLock = LOCK.lockForWrite()) {
+            addQueryCounter(queryId, queryTotalParquetPages, totalPages);
+            addQueryCounter(queryId, queryFilteredParquetPages, filteredPages);
+            addQueryCounter(queryId, queryAfterFilterParquetPages, 
afterFilterPages);
+        } catch (Exception e) {
+            LOGGER.error("Error when add query metrics.", e);
+        }
+        long end = System.currentTimeMillis();
+        if ((end - start) > 100) {
+            LOGGER.warn("Parquet page filter collector cost too much time: {} 
ms ", (end - start));
+        }
+    }
+
+    private static void addQueryCounter(String queryId, Cache<String, 
AtomicLong> counter, long step)
+            throws ExecutionException {
+        AtomicLong pageCnt = counter.get(queryId, () -> new AtomicLong(0L));
+        pageCnt.addAndGet(step);
+    }
+
+    public static void logParquetPages(String queryId) {
+        try {
+            AtomicLong totalPages = 
queryTotalParquetPages.getIfPresent(queryId);
+            if (totalPages != null && totalPages.get() > 0) {
+                AtomicLong filteredPages = 
queryFilteredParquetPages.get(queryId, () -> new AtomicLong(0L));
+                AtomicLong afterFilteredPages = 
queryAfterFilterParquetPages.get(queryId, () -> new AtomicLong(0L));
+                LOGGER.info("Query total parquet pages {}, filtered pages {}, 
after filter pages {}, filter rate {}",
+                        totalPages.get(), filteredPages.get(), 
afterFilteredPages.get(),
+                        (double) filteredPages.get() / totalPages.get());
+            }
+            queryTotalParquetPages.invalidate(queryId);
+            queryFilteredParquetPages.invalidate(queryId);
+            queryAfterFilterParquetPages.invalidate(queryId);
+
+        } catch (ExecutionException e) {
+            LOGGER.error("Error when log query metrics.", e);
+        }
+
+    }
+}
diff --git 
a/src/spark-project/spark-common/src/test/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollectorTest.java
 
b/src/spark-project/spark-common/src/test/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollectorTest.java
new file mode 100644
index 0000000000..cda0c6bdb6
--- /dev/null
+++ 
b/src/spark-project/spark-common/src/test/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollectorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.filter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.junit.Test;
+
+public class ParquetPageFilterCollectorTest {
+
+    @Test
+    public void testParquetPageFilterCollector() {
+        String queryId = "test_query_id";
+        long totalPages = 10L;
+        long filteredPages = 5L;
+        long afterFilterPages = 3L;
+
+        ParquetPageFilterCollector.addQueryMetrics(queryId, totalPages, 
filteredPages, afterFilterPages);
+        
assertNotNull(ParquetPageFilterCollector.queryTotalParquetPages.getIfPresent(queryId));
+        
assertEquals(ParquetPageFilterCollector.queryTotalParquetPages.getIfPresent(queryId).get(),
 totalPages);
+        
assertNotNull(ParquetPageFilterCollector.queryFilteredParquetPages.getIfPresent(queryId));
+        
assertEquals(ParquetPageFilterCollector.queryFilteredParquetPages.getIfPresent(queryId).get(),
 filteredPages);
+        
assertNotNull(ParquetPageFilterCollector.queryAfterFilterParquetPages.getIfPresent(queryId));
+        
assertEquals(ParquetPageFilterCollector.queryAfterFilterParquetPages.getIfPresent(queryId).get(),
 afterFilterPages);
+
+        ParquetPageFilterCollector.logParquetPages(queryId);
+        
assertNull(ParquetPageFilterCollector.queryTotalParquetPages.getIfPresent(queryId));
+        
assertNull(ParquetPageFilterCollector.queryFilteredParquetPages.getIfPresent(queryId));
+        
assertNull(ParquetPageFilterCollector.queryAfterFilterParquetPages.getIfPresent(queryId));
+    }
+}

Reply via email to