This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit bec0b64913bb53f2b836d9978a262a241b762d10 Author: ChenliangLu <31469905+yab...@users.noreply.github.com> AuthorDate: Thu Mar 23 20:03:20 2023 +0800 KYLIN-5582 Minor fix for query collectors on BloomFilter --- .../test/java/org/apache/kylin/job/execution/DagExecutableTest.java | 2 +- .../org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java | 5 ++++- .../org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java | 3 ++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java index 1aaf7c9d0c..9a5db64961 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java @@ -567,7 +567,7 @@ class DagExecutableTest { manager.addJob(job); job.doWork(context); - await().atMost(new Duration(20, TimeUnit.SECONDS)).untilAsserted(() -> { + await().atMost(new Duration(120, TimeUnit.SECONDS)).untilAsserted(() -> { assertEquals(ExecutableState.SUCCEED, executable1.getStatus()); assertEquals(ExecutableState.SUCCEED, executable2.getStatus()); assertEquals(ExecutableState.SUCCEED, executable22.getStatus()); diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java index c1f5cee65c..d0be382682 100644 --- a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java @@ -79,6 +79,9 @@ public class ParquetBloomFilter { for (FileStatus host : hostsDir) { String hostName = host.getPath().getName(); Path projectFiltersFile = QueryFiltersCollector.getProjectFiltersFile(hostName, project); + if (!fs.exists(projectFiltersFile)) { + continue; + } Map<String, Map<String, Integer>> modelColumns = JsonUtil.readValue( HadoopUtil.readStringFromHdfs(fs, projectFiltersFile), Map.class); if (modelColumns.containsKey(modelId)) { @@ -90,7 +93,7 @@ public class ParquetBloomFilter { } columnsHits.forEach((column, hit) -> columnFilters.add(new ColumnFilter(column, hit))); String columnFiltersLog = Arrays.toString(columnFilters.toArray()); - LOGGER.info("register BloomFilter info : {}", columnFiltersLog); + LOGGER.info("Register BloomFilter info from HDFS: {}", columnFiltersLog); } catch (Exception e) { LOGGER.error("Error when register BloomFilter.", e); } diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java index 7c1fbb815c..af0405b3cb 100644 --- a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java @@ -50,8 +50,9 @@ public class QueryFiltersCollector { public static final String SERVER_HOST = AddressUtil.getLocalServerInfo(); + // path should start with `_` to avoid being cleaned in storage public static final String FILTER_STORAGE_PATH = - KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/query_filter/"; + KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/_query_filter/"; public static void increaseHit(String project, String modelId, String columnId) {