This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit bec0b64913bb53f2b836d9978a262a241b762d10
Author: ChenliangLu <31469905+yab...@users.noreply.github.com>
AuthorDate: Thu Mar 23 20:03:20 2023 +0800

    KYLIN-5582 Minor fix for query collectors on BloomFilter
---
 .../test/java/org/apache/kylin/job/execution/DagExecutableTest.java  | 2 +-
 .../org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java     | 5 ++++-
 .../org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java  | 3 ++-
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
index 1aaf7c9d0c..9a5db64961 100644
--- 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
+++ 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java
@@ -567,7 +567,7 @@ class DagExecutableTest {
         manager.addJob(job);
         job.doWork(context);
 
-        await().atMost(new Duration(20, TimeUnit.SECONDS)).untilAsserted(() -> 
{
+        await().atMost(new Duration(120, TimeUnit.SECONDS)).untilAsserted(() 
-> {
             assertEquals(ExecutableState.SUCCEED, executable1.getStatus());
             assertEquals(ExecutableState.SUCCEED, executable2.getStatus());
             assertEquals(ExecutableState.SUCCEED, executable22.getStatus());
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
index c1f5cee65c..d0be382682 100644
--- 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
@@ -79,6 +79,9 @@ public class ParquetBloomFilter {
             for (FileStatus host : hostsDir) {
                 String hostName = host.getPath().getName();
                 Path projectFiltersFile = 
QueryFiltersCollector.getProjectFiltersFile(hostName, project);
+                if (!fs.exists(projectFiltersFile)) {
+                    continue;
+                }
                 Map<String, Map<String, Integer>> modelColumns = 
JsonUtil.readValue(
                         HadoopUtil.readStringFromHdfs(fs, projectFiltersFile), 
Map.class);
                 if (modelColumns.containsKey(modelId)) {
@@ -90,7 +93,7 @@ public class ParquetBloomFilter {
             }
             columnsHits.forEach((column, hit) -> columnFilters.add(new 
ColumnFilter(column, hit)));
             String columnFiltersLog = Arrays.toString(columnFilters.toArray());
-            LOGGER.info("register BloomFilter info : {}", columnFiltersLog);
+            LOGGER.info("Register BloomFilter info from HDFS: {}", 
columnFiltersLog);
         } catch (Exception e) {
             LOGGER.error("Error when register BloomFilter.", e);
         }
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java
index 7c1fbb815c..af0405b3cb 100644
--- 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java
@@ -50,8 +50,9 @@ public class QueryFiltersCollector {
 
     public static final String SERVER_HOST = AddressUtil.getLocalServerInfo();
 
+    // path should start with `_` to avoid being cleaned in storage
     public static final String FILTER_STORAGE_PATH =
-            KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + 
"/query_filter/";
+            KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + 
"/_query_filter/";
 
 
     public static void increaseHit(String project, String modelId, String 
columnId) {

Reply via email to