This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 275d57dea54a3752cf621e86b7660d4a7eddb13e Author: jlf <longfei.ji...@kyligence.io> AuthorDate: Fri Oct 14 10:01:24 2022 +0800 KYLIN-5315 update AutoRefreshSnapshotScheduler afterPropertiesSet --- .../rest/scheduler/AutoRefreshSnapshotRunner.java | 64 ++++++++------------- .../scheduler/AutoRefreshSnapshotScheduler.java | 38 +++++++++++-- ...pshotThread.java => BuildSnapshotRunnable.java} | 4 +- ...leThread.java => CheckSourceTableRunnable.java} | 2 +- .../scheduler/AutoRefreshSnapshotConfigTest.java | 58 +++++++++++++++++++ .../scheduler/AutoRefreshSnapshotRunnerTest.java | 66 ++++++---------------- ...eadTest.java => BuildSnapshotRunnableTest.java} | 20 +++---- ...Test.java => CheckSourceTableRunnableTest.java} | 11 ++-- .../SnapshotSourceTableStatsServiceTest.scala | 7 +-- .../service/SnapshotSourceTableStatsService.java | 21 ++++--- .../TestSnapshotSourceTableStatsService.java | 36 ------------ 11 files changed, 166 insertions(+), 161 deletions(-) diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java index c3e7ce5046..294b6ddb75 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunner.java @@ -87,8 +87,6 @@ public class AutoRefreshSnapshotRunner implements Runnable { @Getter private Map<Future<String>, Long> checkSourceTableFutures = Maps.newConcurrentMap(); @Getter - private Map<Future<String>, Long> buildSnapshotFutures = Maps.newConcurrentMap(); - @Getter private final String project; @Setter @Getter @@ -141,6 +139,8 @@ public class AutoRefreshSnapshotRunner implements Runnable { poolExecutor.getPoolSize(), poolExecutor.getCorePoolSize(), poolExecutor.getActiveCount(), poolExecutor.getMaximumPoolSize()); } + projectConfig = NProjectManager.getInstance(KylinConfig.readSystemKylinConfig()).getProject(project) + .getConfig(); saveSnapshotViewMapping(project, restTemplate); val tables = SnapshotJobUtils.getSnapshotTables(projectConfig, project); val viewTableMapping = readViewTableMapping(); @@ -152,8 +152,6 @@ public class AutoRefreshSnapshotRunner implements Runnable { waitCheckSourceTableTaskDone(); - waitBuildSnapshotTaskDone(); - log.info("Project[{}] stop check and refresh snapshot", project); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); @@ -163,9 +161,7 @@ public class AutoRefreshSnapshotRunner implements Runnable { } finally { checkSourceTableQueue = new LinkedBlockingQueue<>(); cancelFuture(checkSourceTableFutures); - cancelFuture(buildSnapshotFutures); checkSourceTableFutures = Maps.newConcurrentMap(); - buildSnapshotFutures = Maps.newConcurrentMap(); sourceTableSnapshotMapping = Maps.newHashMap(); buildSnapshotCount = Maps.newConcurrentMap(); } @@ -214,10 +210,12 @@ public class AutoRefreshSnapshotRunner implements Runnable { } } for (TableDesc tableDesc : tables) { - val source = tableDesc.getIdentity().toLowerCase(Locale.ROOT); - val snapshots = result.getOrDefault(source, Lists.newArrayList()); - snapshots.add(tableDesc); - result.put(source, snapshots.stream().distinct().collect(Collectors.toList())); + if (!tableDesc.isView()) { + val source = tableDesc.getIdentity().toLowerCase(Locale.ROOT); + val snapshots = result.getOrDefault(source, Lists.newArrayList()); + snapshots.add(tableDesc); + result.put(source, snapshots.stream().distinct().collect(Collectors.toList())); + } } return result; } @@ -259,16 +257,16 @@ public class AutoRefreshSnapshotRunner implements Runnable { public void checkSourceTable(Set<String> allSourceTable) { for (String table : allSourceTable) { - val thread = new CheckSourceTableThread(); - thread.setProject(project); - thread.setConfig(projectConfig); - thread.setTableIdentity(table); - thread.setRestTemplate(restTemplate); - thread.setCheckSourceTableQueue(checkSourceTableQueue); + val runnable = new CheckSourceTableRunnable(); + runnable.setProject(project); + runnable.setConfig(projectConfig); + runnable.setTableIdentity(table); + runnable.setRestTemplate(restTemplate); + runnable.setCheckSourceTableQueue(checkSourceTableQueue); sourceTableSnapshotMapping.get(table).stream() .filter(tableDesc -> StringUtils.equalsIgnoreCase(table, tableDesc.getIdentity())).findFirst() - .ifPresent(tableDesc -> thread.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol())); - val submit = jobPool.submit(thread, "success"); + .ifPresent(tableDesc -> runnable.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol())); + val submit = jobPool.submit(runnable, "success"); checkSourceTableFutures.put(submit, System.currentTimeMillis()); } } @@ -304,33 +302,21 @@ public class AutoRefreshSnapshotRunner implements Runnable { } } - public void waitBuildSnapshotTaskDone() throws InterruptedException { - while (true) { - val doneCount = buildSnapshotFutures.keySet().stream().filter(Future::isDone).count(); - if (buildSnapshotFutures.size() == doneCount) { - break; - } - cancelTimeoutFuture(buildSnapshotFutures); - TimeUnit.SECONDS.sleep(10); - } - } - public void buildSnapshot(CheckSourceTableResult result) { val needBuildSnapshots = sourceTableSnapshotMapping.get(result.getTableIdentity()); for (TableDesc tableDesc : needBuildSnapshots) { val sourceTableCount = buildSnapshotCount.getOrDefault(tableDesc.getIdentity(), new AtomicInteger(0)); log.info("buildSnapshotCount is [{}], tableIdentity is [{}]", sourceTableCount, tableDesc.getIdentity()); if (sourceTableCount.getAndIncrement() == 0) { - val thread = new BuildSnapshotThread(); - thread.setProject(project); - thread.setConfig(projectConfig); - thread.setRestTemplate(restTemplate); - thread.setNeedRefresh(result.getNeedRefresh()); - thread.setNeedRefreshPartitionsValue(result.getNeedRefreshPartitionsValue()); - thread.setTableIdentity(tableDesc.getIdentity()); - thread.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol()); - val submit = jobPool.submit(thread, "success"); - buildSnapshotFutures.put(submit, System.currentTimeMillis()); + val runnable = new BuildSnapshotRunnable(); + runnable.setProject(project); + runnable.setConfig(projectConfig); + runnable.setRestTemplate(restTemplate); + runnable.setNeedRefresh(result.getNeedRefresh()); + runnable.setNeedRefreshPartitionsValue(result.getNeedRefreshPartitionsValue()); + runnable.setTableIdentity(tableDesc.getIdentity()); + runnable.setPartitionColumn(tableDesc.getSelectedSnapshotPartitionCol()); + runnable.run(); } buildSnapshotCount.put(tableDesc.getIdentity(), sourceTableCount); } diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java index 539bf6ac08..8639c09d30 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotScheduler.java @@ -26,17 +26,23 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.PostConstruct; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.scheduler.EpochStartedNotifier; +import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; -import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -46,7 +52,7 @@ import org.springframework.web.client.RestTemplate; import com.google.common.collect.Maps; -import org.apache.kylin.metadata.epoch.EpochManager; +import io.kyligence.kap.guava20.shaded.common.eventbus.Subscribe; import lombok.Getter; import lombok.val; import lombok.extern.slf4j.Slf4j; @@ -66,7 +72,7 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j @Component -public class AutoRefreshSnapshotScheduler implements InitializingBean { +public class AutoRefreshSnapshotScheduler { private static final Integer THREAD_POOL_TASK_SCHEDULER_DEFAULT_POOL_SIZE = 20; @Autowired @Qualifier("projectScheduler") @@ -228,7 +234,6 @@ public class AutoRefreshSnapshotScheduler implements InitializingBean { } } - @Override public void afterPropertiesSet() throws Exception { log.info("AutoRefreshSnapshotScheduler init..."); val fs = HadoopUtil.getWorkingFileSystem(); @@ -257,3 +262,28 @@ public class AutoRefreshSnapshotScheduler implements InitializingBean { } } } + +@Slf4j +@Configuration +@Order +class AutoRefreshSnapshotConfig { + @Autowired + private AutoRefreshSnapshotScheduler scheduler; + + @PostConstruct + public void init() { + val kylinConfig = KylinConfig.getInstanceFromEnv(); + if (kylinConfig.isJobNode()) { + EventBusFactory.getInstance().register(this, false); + } + } + + @Subscribe + public void registerScheduler(EpochStartedNotifier notifier) { + try { + scheduler.afterPropertiesSet(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } +} diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotThread.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.java similarity index 99% rename from src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotThread.java rename to src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.java index 4759260d54..e9ebd6c67a 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotThread.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnable.java @@ -42,6 +42,7 @@ import org.apache.kylin.common.exception.KylinRuntimeException; import org.apache.kylin.common.response.RestResponse; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.engine.spark.job.NSparkSnapshotJob; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; @@ -53,7 +54,6 @@ import org.springframework.http.HttpMethod; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.kylin.engine.spark.job.NSparkSnapshotJob; import io.kyligence.kap.guava20.shaded.common.collect.Lists; import io.kyligence.kap.guava20.shaded.common.collect.Maps; import io.kyligence.kap.guava20.shaded.common.collect.Sets; @@ -64,7 +64,7 @@ import lombok.val; import lombok.extern.slf4j.Slf4j; @Slf4j -public class BuildSnapshotThread extends AbstractSchedulerRunnable { +public class BuildSnapshotRunnable extends AbstractSchedulerRunnable { private static final String BUILD_SNAPSHOT_ERROR_MESSAGE = "Project[%s] Snapshot[%s] buildSnapshot failed"; @Override diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableThread.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnable.java similarity index 98% rename from src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableThread.java rename to src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnable.java index d983fec99f..9fc0fd9d1d 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableThread.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnable.java @@ -46,7 +46,7 @@ import lombok.val; import lombok.extern.slf4j.Slf4j; @Slf4j -public class CheckSourceTableThread extends AbstractSchedulerRunnable { +public class CheckSourceTableRunnable extends AbstractSchedulerRunnable { private static final String SNAPSHOT_TABLE_CHECK_ERROR_MESSAGE = "Project[%s] Snapshot source table[%s] check table stats Failed"; diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotConfigTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotConfigTest.java new file mode 100644 index 0000000000..450f0bc088 --- /dev/null +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotConfigTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.scheduler; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.scheduler.EpochStartedNotifier; +import org.apache.kylin.common.scheduler.EventBusFactory; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.test.util.ReflectionTestUtils; + +import lombok.val; + +class AutoRefreshSnapshotConfigTest { + @Test + void testRegisterScheduler() throws Exception { + registerScheduler(true); + registerScheduler(false); + } + + void registerScheduler(Boolean isJobNode) throws Exception { + try (val mockStatic = Mockito.mockStatic(EventBusFactory.class); + val configStatic = Mockito.mockStatic(KylinConfig.class)) { + val config = Mockito.mock(KylinConfig.class); + Mockito.when(config.isJobNode()).thenReturn(isJobNode); + configStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(config); + + val buildConfig = new AutoRefreshSnapshotConfig(); + val eventBus = Mockito.mock(EventBusFactory.class); + mockStatic.when(EventBusFactory::getInstance).thenReturn(eventBus); + buildConfig.init(); + val scheduler = Mockito.mock(AutoRefreshSnapshotScheduler.class); + ReflectionTestUtils.setField(buildConfig, "scheduler", scheduler); + + Mockito.doNothing().when(scheduler).afterPropertiesSet(); + buildConfig.registerScheduler(new EpochStartedNotifier()); + + Mockito.doThrow(new Exception("test")).when(scheduler).afterPropertiesSet(); + buildConfig.registerScheduler(new EpochStartedNotifier()); + } + } +} diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java index ae62d5c7a4..5dd01bf60a 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AutoRefreshSnapshotRunnerTest.java @@ -35,7 +35,6 @@ import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; @@ -67,7 +66,6 @@ import io.kyligence.kap.guava20.shaded.common.collect.Lists; import io.kyligence.kap.guava20.shaded.common.collect.Maps; import io.kyligence.kap.guava20.shaded.common.collect.Sets; import lombok.val; -import lombok.var; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -116,7 +114,6 @@ class AutoRefreshSnapshotRunnerTest { runner.doRun(); assertTrue(CollectionUtils.isEmpty(runner.getCheckSourceTableQueue())); assertTrue(MapUtils.isEmpty(runner.getBuildSnapshotCount())); - assertTrue(MapUtils.isEmpty(runner.getBuildSnapshotFutures())); assertTrue(MapUtils.isEmpty(runner.getCheckSourceTableFutures())); assertTrue(MapUtils.isEmpty(runner.getSourceTableSnapshotMapping())); } finally { @@ -247,6 +244,16 @@ class AutoRefreshSnapshotRunnerTest { if (i < 14) { tables.add(allTables.get(i)); } + if (allTables.get(i).isView()) { + tables.add(allTables.get(i)); + val sourceTables = Sets.<String> newHashSet(); + for (int j = 0; j < 7; j++) { + sourceTables.add("default.table_" + j); + excepted.add("default.table_" + j); + } + sourceTables.add(allTables.get(i).getIdentity().toLowerCase(Locale.ROOT)); + viewTableMapping.put(allTables.get(i).getIdentity(), sourceTables); + } if (i > 7) { val sourceTables = Sets.<String> newHashSet(); for (int j = 0; j < 7; j++) { @@ -307,9 +314,9 @@ class AutoRefreshSnapshotRunnerTest { val sourceTables = sourceTableSnapshotMapping.keySet(); runner.getSourceTableSnapshotMapping().putAll(sourceTableSnapshotMapping); - try (val ignored = Mockito.mockConstruction(CheckSourceTableThread.class, + try (val ignored = Mockito.mockConstruction(CheckSourceTableRunnable.class, (mock, context) -> Mockito.doNothing().when(mock).checkTable())) { - try (val ignored2 = Mockito.mockConstruction(BuildSnapshotThread.class, + try (val ignored2 = Mockito.mockConstruction(BuildSnapshotRunnable.class, (mock, context) -> Mockito.doNothing().when(mock).buildSnapshot())) { runner.checkSourceTable(sourceTables); @@ -334,9 +341,6 @@ class AutoRefreshSnapshotRunnerTest { exceptedTmp.addAll(tableDescs); val excepted = exceptedTmp.stream().distinct().collect(Collectors.toList()); assertEquals(excepted.size(), buildSnapshotCount.size()); - - val buildSnapshotFutures = runner.getBuildSnapshotFutures(); - assertEquals(excepted.size(), buildSnapshotFutures.size()); } } } finally { @@ -344,34 +348,6 @@ class AutoRefreshSnapshotRunnerTest { } } - @Test - void waitBuildSnapshotTaskDone() { - val project = "default"; - try { - val runner = AutoRefreshSnapshotRunner.getInstance(project); - val tasks = Lists.<Future<String>> newArrayList(); - for (int i = 0; i < 5; i++) { - val futureTask = new FutureTask<String>(() -> null); - tasks.add(futureTask); - runner.getBuildSnapshotFutures().put(futureTask, System.currentTimeMillis()); - } - val result = new AtomicBoolean(false); - val thread = new Thread(() -> { - try { - runner.waitBuildSnapshotTaskDone(); - result.set(true); - } catch (InterruptedException e) { - log.error(e.getMessage(), e); - } - }); - thread.start(); - tasks.forEach(task -> task.cancel(true)); - await().atMost(new Duration(12, SECONDS)).untilAsserted(() -> assertTrue(result.get())); - } finally { - AutoRefreshSnapshotRunner.shutdown(project); - } - } - @Test void cancelTimeoutFuture() { val project = RandomUtil.randomUUIDStr(); @@ -385,11 +361,11 @@ class AutoRefreshSnapshotRunnerTest { for (int i = 0; i < 5; i++) { val futureTask = new FutureTask<String>(() -> null); tasks.add(futureTask); - runner.getBuildSnapshotFutures().put(futureTask, System.currentTimeMillis()); + runner.getCheckSourceTableFutures().put(futureTask, System.currentTimeMillis()); } await().pollDelay(new Duration(2, SECONDS)).until(() -> true); - runner.cancelTimeoutFuture(runner.getBuildSnapshotFutures()); - runner.getBuildSnapshotFutures().keySet().forEach(future -> { + runner.cancelTimeoutFuture(runner.getCheckSourceTableFutures()); + runner.getCheckSourceTableFutures().keySet().forEach(future -> { assertTrue(future.isCancelled()); assertTrue(future.isDone()); }); @@ -457,16 +433,6 @@ class AutoRefreshSnapshotRunnerTest { val overrideProps = Maps.<String, String> newLinkedHashMap(); projectManager.createProject(project, "test", "", overrideProps); val runner = AutoRefreshSnapshotRunner.getInstance(project); - for (int i = 0; i < 5; i++) { - val futureTask = new FutureTask<String>(() -> null); - runner.getBuildSnapshotFutures().put(futureTask, System.currentTimeMillis()); - if (i % 2 == 0) { - futureTask.cancel(true); - } - } - runner.cancelFuture(runner.getBuildSnapshotFutures()); - var actual = runner.getBuildSnapshotFutures().keySet().stream().filter(Future::isDone).count(); - assertEquals(runner.getBuildSnapshotFutures().size(), actual); for (int i = 0; i < 5; i++) { val futureTask = new FutureTask<String>(() -> null); @@ -476,7 +442,7 @@ class AutoRefreshSnapshotRunnerTest { } } runner.cancelFuture(runner.getCheckSourceTableFutures()); - actual = runner.getCheckSourceTableFutures().keySet().stream().filter(Future::isDone).count(); + val actual = runner.getCheckSourceTableFutures().keySet().stream().filter(Future::isDone).count(); assertEquals(runner.getCheckSourceTableFutures().size(), actual); } finally { AutoRefreshSnapshotRunner.shutdown(project); diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotThreadTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnableTest.java similarity index 97% rename from src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotThreadTest.java rename to src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnableTest.java index 214ac7954d..b2d856887e 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotThreadTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/BuildSnapshotRunnableTest.java @@ -36,6 +36,7 @@ import org.apache.kylin.common.exception.KylinRuntimeException; import org.apache.kylin.common.response.RestResponse; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.engine.spark.job.NSparkSnapshotJob; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; @@ -51,7 +52,6 @@ import org.springframework.web.client.RestTemplate; import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.kylin.engine.spark.job.NSparkSnapshotJob; import io.kyligence.kap.guava20.shaded.common.collect.Lists; import io.kyligence.kap.guava20.shaded.common.collect.Maps; import io.kyligence.kap.guava20.shaded.common.collect.Sets; @@ -59,12 +59,12 @@ import lombok.val; import lombok.var; @MetadataInfo -class BuildSnapshotThreadTest { +class BuildSnapshotRunnableTest { private final RestTemplate restTemplate = Mockito.mock(RestTemplate.class); @Test void buildSnapshot() throws JsonProcessingException { - val thread = new BuildSnapshotThread(); + val thread = new BuildSnapshotRunnable(); thread.setProject("project"); thread.setConfig(KylinConfig.readSystemKylinConfig()); thread.setRestTemplate(restTemplate); @@ -112,7 +112,7 @@ class BuildSnapshotThreadTest { @Test void buildSnapshotFailed() throws JsonProcessingException { - val thread = new BuildSnapshotThread(); + val thread = new BuildSnapshotRunnable(); thread.setProject("project"); thread.setConfig(KylinConfig.readSystemKylinConfig()); thread.setRestTemplate(restTemplate); @@ -149,7 +149,7 @@ class BuildSnapshotThreadTest { @Test void checkSnapshotJobFile() { - val thread = new BuildSnapshotThread(); + val thread = new BuildSnapshotRunnable(); thread.setConfig(KylinConfig.getInstanceFromEnv()); thread.setTableIdentity("default.table_" + RandomUtil.randomUUIDStr().replace("-", "_")); val jobId = RandomUtil.randomUUIDStr(); @@ -182,7 +182,7 @@ class BuildSnapshotThreadTest { @Test void checkAutoRefreshJobSuccessOrRunning() { val jobId = RandomUtil.randomUUIDStr(); - val thread = new BuildSnapshotThread(); + val thread = new BuildSnapshotRunnable(); thread.setConfig(KylinConfig.getInstanceFromEnv()); assertFalse(thread.checkAutoRefreshJobSuccessOrRunning(jobId)); @@ -205,7 +205,7 @@ class BuildSnapshotThreadTest { @Test void snapshotJobFile() { - val thread = new BuildSnapshotThread(); + val thread = new BuildSnapshotRunnable(); thread.setConfig(KylinConfig.getInstanceFromEnv()); thread.setTableIdentity("default.table_" + RandomUtil.randomUUIDStr().replace("-", "_")); val jobId = RandomUtil.randomUUIDStr(); @@ -219,7 +219,7 @@ class BuildSnapshotThreadTest { @Test void snapshotJobFileNotExists() { - val thread = new BuildSnapshotThread(); + val thread = new BuildSnapshotRunnable(); thread.setConfig(KylinConfig.getInstanceFromEnv()); thread.setTableIdentity("default.table_" + RandomUtil.randomUUIDStr().replace("-", "_")); val snapshotJob = thread.readSnapshotJobFile(); @@ -228,7 +228,7 @@ class BuildSnapshotThreadTest { @Test void checkNeedBuildPartitionAndSetTableOption() throws JsonProcessingException { - val thread = new BuildSnapshotThread(); + val thread = new BuildSnapshotRunnable(); thread.setTableIdentity("default.table"); val req = Maps.newHashMap(); val runningJobs = Lists.<NSparkSnapshotJob> newArrayList(); @@ -279,7 +279,7 @@ class BuildSnapshotThreadTest { Mockito.when(executableManager.listExecByJobTypeAndStatus(ExecutableState::isRunning, SNAPSHOT_BUILD, SNAPSHOT_REFRESH)).thenReturn(runningJobs); - val thread = new BuildSnapshotThread(); + val thread = new BuildSnapshotRunnable(); thread.setTableIdentity("default.table"); thread.setProject("default"); try { diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableThreadTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnableTest.java similarity index 93% rename from src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableThreadTest.java rename to src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnableTest.java index 920e89bd76..db82b60d66 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableThreadTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/CheckSourceTableRunnableTest.java @@ -42,12 +42,12 @@ import io.kyligence.kap.guava20.shaded.common.collect.Sets; import lombok.val; @MetadataInfo -class CheckSourceTableThreadTest { +class CheckSourceTableRunnableTest { private final RestTemplate restTemplate = Mockito.mock(RestTemplate.class); @Test void checkTable() throws JsonProcessingException { - val thread = new CheckSourceTableThread(); + val thread = new CheckSourceTableRunnable(); thread.setProject("project"); thread.setConfig(KylinConfig.readSystemKylinConfig()); thread.setTableIdentity("default.table"); @@ -72,7 +72,7 @@ class CheckSourceTableThreadTest { @Test void checkTableFailed() { try { - val thread = new CheckSourceTableThread(); + val thread = new CheckSourceTableRunnable(); thread.setProject("project"); thread.setConfig(KylinConfig.readSystemKylinConfig()); thread.setTableIdentity("default.table"); @@ -83,9 +83,8 @@ class CheckSourceTableThreadTest { thread.checkTable(); } catch (Exception e) { assertTrue(e instanceof KylinRuntimeException); - assertEquals( - "Project[project] Snapshot source table[default.table] check table stats Failed", + assertEquals("Project[project] Snapshot source table[default.table] check table stats Failed", e.getMessage()); } } -} \ No newline at end of file +} diff --git a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala index 635b6e54f7..6663a71041 100644 --- a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala +++ b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala @@ -194,9 +194,8 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local val tableIdentity = table.qualifiedName.toLowerCase(Locale.ROOT) val locationPath = table.location.getPath val locationFilesStatus: util.List[FileStatus] = snapshotSourceTableStatsService.getLocationFileStatus(locationPath) - val snapshotTablesLocationsJson = Maps.newHashMap[String, SnapshotSourceTableStats]() - snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, config, - locationFilesStatus, snapshotTablesLocationsJson) + val snapshotTablesLocationsJson = snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, config, + locationFilesStatus) snapshotSourceTableStatsService.writeSourceTableStats(DEFAULT_PROJECT, tableIdentity, snapshotTablesLocationsJson) val fromJson = snapshotSourceTableStatsService.getSnapshotSourceTableStatsJsonFromHDFS(DEFAULT_PROJECT, tableIdentity).getSecond @@ -522,4 +521,4 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local assertFalse(checkStatsFile) }) } -} \ No newline at end of file +} diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java index 360ecad17f..8dcc09980d 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java @@ -141,6 +141,7 @@ public class SnapshotSourceTableStatsService extends BasicService { } catch (Exception e) { log.info("Project[{}] [{}.{}] refresh check and save snapshot table location files failed", project, database, table); + log.error(e.getMessage(), e); return new SnapshotSourceTableStatsResponse(false); } } @@ -197,8 +198,8 @@ public class SnapshotSourceTableStatsService extends BasicService { } val needRefresh = checkLocation(location, filesStatus, snapshotSourceTableStatsJson, projectConfig); if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist) || Boolean.TRUE.equals(needRefresh)) { - createSnapshotSourceTableStats(location, projectConfig, filesStatus, snapshotSourceTableStatsJson); - writeSourceTableStats(project, tableIdentity, snapshotSourceTableStatsJson); + val newSnapshotSourceTableStatsJson = createSnapshotSourceTableStats(location, projectConfig, filesStatus); + writeSourceTableStats(project, tableIdentity, newSnapshotSourceTableStatsJson); } if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist)) { return projectConfig.isSnapshotFirstAutoRefreshEnabled(); @@ -279,10 +280,10 @@ public class SnapshotSourceTableStatsService extends BasicService { tableFilesModifyTimesAndSize.get(FILES_SIZE)); } - public void createSnapshotSourceTableStats(String location, KylinConfig config, - List<FileStatus> locationFilesStatus, Map<String, SnapshotSourceTableStats> snapshotSourceTableStatsJson) { - val sourceTableStats = snapshotSourceTableStatsJson.computeIfAbsent(location, - key -> new SnapshotSourceTableStats()); + public Map<String, SnapshotSourceTableStats> createSnapshotSourceTableStats(String location, KylinConfig config, + List<FileStatus> locationFilesStatus) { + Map<String, SnapshotSourceTableStats> newSnapshotSourceTableStatsJson = Maps.newHashMap(); + val sourceTableStats = new SnapshotSourceTableStats(); val filesSize = Lists.<Long> newArrayList(); val filesModificationTime = Lists.<Long> newArrayList(); locationFilesStatus.stream().limit(config.getSnapshotAutoRefreshFetchFilesCount()).forEach(fileStatus -> { @@ -293,7 +294,8 @@ public class SnapshotSourceTableStatsService extends BasicService { sourceTableStats.setFilesModificationTime(filesModificationTime); sourceTableStats.setFilesCount(locationFilesStatus.size()); - snapshotSourceTableStatsJson.put(location, sourceTableStats); + newSnapshotSourceTableStatsJson.put(location, sourceTableStats); + return newSnapshotSourceTableStatsJson; } public void writeSourceTableStats(String project, String tableIdentity, @@ -332,11 +334,12 @@ public class SnapshotSourceTableStatsService extends BasicService { val needRefresh = checkPartitionsLocation(partitions, snapshotSourceTableStatsJson, needRefreshPartitions, needSavePartitionsFilesStatus, projectConfig); if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist) || Boolean.TRUE.equals(needRefresh)) { + Map<String, SnapshotSourceTableStats> newSnapshotSourceTableStatsJson = Maps.newHashMap(); for (CatalogTablePartition partition : partitions) { createPartitionSnapshotSourceTableStats(partition, needSavePartitionsFilesStatus, - snapshotSourceTableStatsJson, projectConfig); + newSnapshotSourceTableStatsJson, projectConfig); } - writeSourceTableStats(project, tableIdentity, snapshotSourceTableStatsJson); + writeSourceTableStats(project, tableIdentity, newSnapshotSourceTableStatsJson); } if (Boolean.FALSE.equals(snapshotSourceTableStatsJsonExist)) { return projectConfig.isSnapshotFirstAutoRefreshEnabled(); diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/TestSnapshotSourceTableStatsService.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/TestSnapshotSourceTableStatsService.java deleted file mode 100644 index 45b6850d92..0000000000 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/TestSnapshotSourceTableStatsService.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.rest.service; - -import org.apache.kylin.junit.annotation.MetadataInfo; -import org.junit.Test; -import org.mockito.InjectMocks; -import org.mockito.Mockito; - -@MetadataInfo -class TestSnapshotSourceTableStatsService { - private static final String DEFAULT_PROJECT = "default"; - @InjectMocks - private final SnapshotSourceTableStatsService locationService = Mockito.spy(SnapshotSourceTableStatsService.class); - - @Test - void saveSnapshotViewMapping() { - - } -}