This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c22a0aeb594990060688eb5726fd1d4bc8210886 Author: Yaguang Jia <jiayagu...@foxmail.com> AuthorDate: Mon May 8 17:49:35 2023 +0800 KYLIN-5653 add page index filter log (#30361) --- pom.xml | 4 +- .../src/main/resources/config/init.properties | 4 +- .../src/main/resources/kylin-defaults0.properties | 4 +- .../src/main/resources/config/init.properties | 4 +- .../org/apache/kylin/newten/BloomFilterTest.java | 7 +- .../src/main/resources/config/init.properties | 4 +- .../apache/kylin/rest/service/QueryService.java | 2 + .../src/main/resources/config/init.properties | 4 +- .../scala/org/apache/spark/sql/SparderEnv.scala | 4 +- .../spark/filter/ParquetPageFilterCollector.java | 87 ++++++++++++++++++++++ .../filter/ParquetPageFilterCollectorTest.java | 49 ++++++++++++ 11 files changed, 153 insertions(+), 20 deletions(-) diff --git a/pom.xml b/pom.xml index 46f61ffa70..fed587f46c 100644 --- a/pom.xml +++ b/pom.xml @@ -124,7 +124,7 @@ <!-- Spark versions --> <delta.version>2.0.2</delta.version> - <spark.version>3.2.0-kylin-4.6.8.0-SNAPSHOT</spark.version> + <spark.version>3.2.0-kylin-4.6.9.0-SNAPSHOT</spark.version> <roaring.version>0.9.2-kylin-r4</roaring.version> @@ -315,7 +315,7 @@ <zkclient.version>0.8</zkclient.version> <grpc.version>1.0.2</grpc.version> <fastPFOR.version>0.0.13</fastPFOR.version> - <parquet.version>1.12.2-kylin-r4</parquet.version> + <parquet.version>1.12.2-kylin-r5</parquet.version> <quartz.version>2.1.1</quartz.version> <janino.version>3.0.9</janino.version> diff --git a/src/common-booter/src/main/resources/config/init.properties b/src/common-booter/src/main/resources/config/init.properties index 3ff1076256..2bddfae312 100644 --- a/src/common-booter/src/main/resources/config/init.properties +++ b/src/common-booter/src/main/resources/config/init.properties @@ -160,9 +160,7 @@ kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/sp # to avoid cartesian partition oom, set to -1 or empty to turn off kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1 -# disable parquet columnindex to save the overhead on read column index -# as column index won't be used by spark vectorized reader for now -kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false +kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true # spark3 legacy config after calendar switch kylin.storage.columnar.spark-conf.spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties b/src/core-common/src/main/resources/kylin-defaults0.properties index 657f55c1e2..022ac5e57d 100644 --- a/src/core-common/src/main/resources/kylin-defaults0.properties +++ b/src/core-common/src/main/resources/kylin-defaults0.properties @@ -163,9 +163,7 @@ kylin.storage.columnar.spark-conf.spark.sql.catalog.spark_catalog=org.apache.spa # to avoid cartesian partition oom, set to -1 or empty to turn off kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1 -# disable parquet columnindex to save the overhead on read column index -# as column index won't be used by spark vectorized reader for now -kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false +kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true # spark3 legacy config after calendar switch kylin.storage.columnar.spark-conf.spark.sql.parquet.int96RebaseModeInWrite=LEGACY diff --git a/src/data-loading-booter/src/main/resources/config/init.properties b/src/data-loading-booter/src/main/resources/config/init.properties index 3ff1076256..2bddfae312 100644 --- a/src/data-loading-booter/src/main/resources/config/init.properties +++ b/src/data-loading-booter/src/main/resources/config/init.properties @@ -160,9 +160,7 @@ kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/sp # to avoid cartesian partition oom, set to -1 or empty to turn off kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1 -# disable parquet columnindex to save the overhead on read column index -# as column index won't be used by spark vectorized reader for now -kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false +kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true # spark3 legacy config after calendar switch kylin.storage.columnar.spark-conf.spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java index 9dcebdebe8..996aef9571 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java @@ -115,6 +115,11 @@ public class BloomFilterTest extends NLocalWithSparkSessionTest implements Adapt @Test public void testBuildBloomFilter() throws Exception { + Path projectFilterPath = getProjectFiltersFile(SERVER_HOST, getProject()); + FileSystem fs = HadoopUtil.getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); + if (fs.exists(projectFilterPath)) { + fs.delete(projectFilterPath, true); + } String dfID = "c41390c5-b93d-4db3-b167-029874b85a2c"; NDataflow dataflow = dfMgr.getDataflow(dfID); LayoutEntity layout = dataflow.getIndexPlan().getLayoutEntity(20000000001L); @@ -137,8 +142,6 @@ public class BloomFilterTest extends NLocalWithSparkSessionTest implements Adapt String sql1 = "select * from SSB.P_LINEORDER where LO_CUSTKEY in (13,8) and LO_SHIPPRIOTITY = 0 "; query.add(Pair.newPair("bloomfilter", sql1)); ExecAndComp.execAndCompare(query, getProject(), ExecAndComp.CompareLevel.NONE, "inner"); - Path projectFilterPath = getProjectFiltersFile(SERVER_HOST, getProject()); - FileSystem fs = HadoopUtil.getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); // wait until `QueryFiltersCollector` record filter info await().atMost(120, TimeUnit.SECONDS).until(() -> { try { diff --git a/src/query-booter/src/main/resources/config/init.properties b/src/query-booter/src/main/resources/config/init.properties index 3ff1076256..2bddfae312 100644 --- a/src/query-booter/src/main/resources/config/init.properties +++ b/src/query-booter/src/main/resources/config/init.properties @@ -160,9 +160,7 @@ kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/sp # to avoid cartesian partition oom, set to -1 or empty to turn off kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1 -# disable parquet columnindex to save the overhead on read column index -# as column index won't be used by spark vectorized reader for now -kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false +kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true # spark3 legacy config after calendar switch kylin.storage.columnar.spark-conf.spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index 333dc2857b..aae6a5edfb 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -81,6 +81,7 @@ import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector; +import org.apache.kylin.engine.spark.filter.ParquetPageFilterCollector; import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; import org.apache.kylin.guava30.shaded.common.base.Joiner; import org.apache.kylin.guava30.shaded.common.collect.Collections2; @@ -438,6 +439,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup } BloomFilterSkipCollector.logAndCleanStatus(QueryContext.current().getQueryId()); + ParquetPageFilterCollector.logParquetPages(QueryContext.current().getQueryId()); LogReport report = new LogReport().put(LogReport.QUERY_ID, QueryContext.current().getQueryId()) .put(LogReport.SQL, sql).put(LogReport.USER, user) .put(LogReport.SUCCESS, null == response.getExceptionMessage()).put(LogReport.DURATION, duration) diff --git a/src/server/src/main/resources/config/init.properties b/src/server/src/main/resources/config/init.properties index 3ff1076256..2bddfae312 100644 --- a/src/server/src/main/resources/config/init.properties +++ b/src/server/src/main/resources/config/init.properties @@ -160,9 +160,7 @@ kylin.storage.columnar.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/sp # to avoid cartesian partition oom, set to -1 or empty to turn off kylin.storage.columnar.spark-conf.spark.sql.cartesianPartitionNumThreshold=-1 -# disable parquet columnindex to save the overhead on read column index -# as column index won't be used by spark vectorized reader for now -kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=false +kylin.storage.columnar.spark-conf.parquet.filter.columnindex.enabled=true # spark3 legacy config after calendar switch kylin.storage.columnar.spark-conf.spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala index c31e151202..8961a52708 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala @@ -31,7 +31,7 @@ import org.apache.kylin.common.exception.{KylinException, KylinTimeoutException, import org.apache.kylin.common.msg.MsgPicker import org.apache.kylin.common.util.{DefaultHostInfoFetcher, HadoopUtil, S3AUtil} import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext} -import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector +import org.apache.kylin.engine.spark.filter.{BloomFilterSkipCollector, ParquetPageFilterCollector} import org.apache.kylin.metadata.model.{NTableMetadataManager, TableExtDesc} import org.apache.kylin.metadata.project.NProjectManager import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache @@ -326,6 +326,8 @@ object SparderEnv extends Logging { inputMetrics.totalBloomBlocks, inputMetrics.totalSkipBloomBlocks, inputMetrics.totalSkipBloomRows, inputMetrics.footerReadTime, inputMetrics.footerReadNumber) + ParquetPageFilterCollector.addQueryMetrics(taskEnd.queryId, inputMetrics.totalPagesCount, + inputMetrics.filteredPagesCount, inputMetrics.afterFilterPagesCount) } } catch { case e: Throwable => logWarning("error when add metrics for query", e) diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollector.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollector.java new file mode 100644 index 0000000000..a50224e2cc --- /dev/null +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollector.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.filter; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.kylin.common.util.AutoReadWriteLock; +import org.apache.kylin.guava30.shaded.common.cache.Cache; +import org.apache.kylin.guava30.shaded.common.cache.CacheBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ParquetPageFilterCollector { + + public static final Logger LOGGER = LoggerFactory.getLogger(ParquetPageFilterCollector.class); + public static final Cache<String, AtomicLong> queryTotalParquetPages = CacheBuilder.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES).build(); + public static final Cache<String, AtomicLong> queryFilteredParquetPages = CacheBuilder.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES).build(); + public static final Cache<String, AtomicLong> queryAfterFilterParquetPages = CacheBuilder.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES).build(); + private static final AutoReadWriteLock LOCK = new AutoReadWriteLock(new ReentrantReadWriteLock()); + + private ParquetPageFilterCollector() { + // for sonar + } + + public static void addQueryMetrics(String queryId, long totalPages, long filteredPages, long afterFilterPages) { + long start = System.currentTimeMillis(); + try (AutoReadWriteLock.AutoLock writeLock = LOCK.lockForWrite()) { + addQueryCounter(queryId, queryTotalParquetPages, totalPages); + addQueryCounter(queryId, queryFilteredParquetPages, filteredPages); + addQueryCounter(queryId, queryAfterFilterParquetPages, afterFilterPages); + } catch (Exception e) { + LOGGER.error("Error when add query metrics.", e); + } + long end = System.currentTimeMillis(); + if ((end - start) > 100) { + LOGGER.warn("Parquet page filter collector cost too much time: {} ms ", (end - start)); + } + } + + private static void addQueryCounter(String queryId, Cache<String, AtomicLong> counter, long step) + throws ExecutionException { + AtomicLong pageCnt = counter.get(queryId, () -> new AtomicLong(0L)); + pageCnt.addAndGet(step); + } + + public static void logParquetPages(String queryId) { + try { + AtomicLong totalPages = queryTotalParquetPages.getIfPresent(queryId); + if (totalPages != null && totalPages.get() > 0) { + AtomicLong filteredPages = queryFilteredParquetPages.get(queryId, () -> new AtomicLong(0L)); + AtomicLong afterFilteredPages = queryAfterFilterParquetPages.get(queryId, () -> new AtomicLong(0L)); + LOGGER.info("Query total parquet pages {}, filtered pages {}, after filter pages {}, filter rate {}", + totalPages.get(), filteredPages.get(), afterFilteredPages.get(), + (double) filteredPages.get() / totalPages.get()); + } + queryTotalParquetPages.invalidate(queryId); + queryFilteredParquetPages.invalidate(queryId); + queryAfterFilterParquetPages.invalidate(queryId); + + } catch (ExecutionException e) { + LOGGER.error("Error when log query metrics.", e); + } + + } +} diff --git a/src/spark-project/spark-common/src/test/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollectorTest.java b/src/spark-project/spark-common/src/test/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollectorTest.java new file mode 100644 index 0000000000..cda0c6bdb6 --- /dev/null +++ b/src/spark-project/spark-common/src/test/java/org/apache/kylin/engine/spark/filter/ParquetPageFilterCollectorTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.filter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.junit.Test; + +public class ParquetPageFilterCollectorTest { + + @Test + public void testParquetPageFilterCollector() { + String queryId = "test_query_id"; + long totalPages = 10L; + long filteredPages = 5L; + long afterFilterPages = 3L; + + ParquetPageFilterCollector.addQueryMetrics(queryId, totalPages, filteredPages, afterFilterPages); + assertNotNull(ParquetPageFilterCollector.queryTotalParquetPages.getIfPresent(queryId)); + assertEquals(ParquetPageFilterCollector.queryTotalParquetPages.getIfPresent(queryId).get(), totalPages); + assertNotNull(ParquetPageFilterCollector.queryFilteredParquetPages.getIfPresent(queryId)); + assertEquals(ParquetPageFilterCollector.queryFilteredParquetPages.getIfPresent(queryId).get(), filteredPages); + assertNotNull(ParquetPageFilterCollector.queryAfterFilterParquetPages.getIfPresent(queryId)); + assertEquals(ParquetPageFilterCollector.queryAfterFilterParquetPages.getIfPresent(queryId).get(), afterFilterPages); + + ParquetPageFilterCollector.logParquetPages(queryId); + assertNull(ParquetPageFilterCollector.queryTotalParquetPages.getIfPresent(queryId)); + assertNull(ParquetPageFilterCollector.queryFilteredParquetPages.getIfPresent(queryId)); + assertNull(ParquetPageFilterCollector.queryAfterFilterParquetPages.getIfPresent(queryId)); + } +}