This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 180ff07afeaf64da8e565b7aa5882c3a89f1c268
Author: fengguangyuan <qq272101...@gmail.com>
AuthorDate: Sat Jul 1 12:07:43 2023 +0800

    KYLIN-5745 Using a global thread pool to clean underlying storages
    
    1. Using a global thread pool to clean underlying storages;
    2. Launching cleaning tasks in the local thread and to ignore
    FileNotFoundException while collecting HDFS files.
    
    Co-authored-by: Guangyuan Feng <guangyuan.f...@kyligence.io>
---
 .../apache/kylin/rest/config/AppInitializer.java   |   7 +
 .../kylin/rest/service/AsyncTaskService.java       |   9 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |   8 +
 .../kylin/metadata/query/QueryHistoryDAO.java      |   6 +-
 .../kylin/metadata/query/RDBMSQueryHistoryDAO.java |  13 +-
 .../metadata/query/RDBMSQueryHistoryDaoTest.java   |   2 +-
 .../apache/kylin/rest/service/ScheduleService.java |  10 +-
 .../kylin/rest/service/MetaStoreService.java       |  10 +-
 .../rest/service/MetaStoreTenantServiceTest.java   |  10 +-
 .../org/apache/kylin/rest/InitConfiguration.java   |   1 -
 .../org/apache/kylin/rest/HAConfigurationTest.java |   6 +-
 .../org/apache/kylin/helper/RoutineToolHelper.java |  55 +++-
 .../tool/garbage/AbstractComparableCleanTask.java  |  93 ++++++
 .../tool/garbage/CleanTaskExecutorService.java     | 236 +++++++++++++++
 .../kylin/tool/garbage/PriorityExecutor.java       |  71 +++++
 .../apache/kylin/tool/garbage/StorageCleaner.java  |  28 +-
 .../kylin/tool/garbage/StorageCleanerCLI.java      |  18 +-
 .../apache/kylin/tool/routine/FastRoutineTool.java |   1 +
 .../org/apache/kylin/tool/routine/RoutineTool.java |  24 +-
 .../org/apache/kylin/tool/util/MetadataUtil.java   |   9 +-
 .../garbage/CleanTaskExecutorServiceTests.java     | 324 +++++++++++++++++++++
 21 files changed, 889 insertions(+), 52 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
index 5a2a833305..35af91a415 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
@@ -60,6 +60,8 @@ import 
org.apache.kylin.rest.service.task.QueryHistoryTaskScheduler;
 import org.apache.kylin.rest.util.JStackDumpTask;
 import org.apache.kylin.streaming.jobs.StreamingJobListener;
 import org.apache.kylin.tool.daemon.KapGuardianHATask;
+import org.apache.kylin.tool.garbage.CleanTaskExecutorService;
+import org.apache.kylin.tool.garbage.PriorityExecutor;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.event.ApplicationReadyEvent;
 import org.springframework.context.ApplicationContext;
@@ -175,6 +177,11 @@ public class AppInitializer {
         log.info("KylinConfig in env, ID is {}", kylinConfig.hashCode());
         log.info("KylinConfig in env, metadata is {}", 
kylinConfig.getMetadataUrl());
         log.info("KylinConfig in env, working dir is {}", 
kylinConfig.getHdfsWorkingDirectory());
+
+        // Init global static instances
+        CleanTaskExecutorService.getInstance().bindWorkingPool(
+            () -> PriorityExecutor.newWorkingThreadPool(
+                "clean-storages-pool", 
kylinConfig.getStorageCleanTaskConcurrency()));
     }
 
     private void warmUpSystemCache() {
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
index a343a464c6..8046f0c62a 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
@@ -27,6 +27,7 @@ import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import javax.servlet.ServletOutputStream;
@@ -50,6 +51,7 @@ import org.apache.kylin.metadata.query.QueryHistoryInfo;
 import org.apache.kylin.metadata.query.QueryHistoryRequest;
 import org.apache.kylin.metadata.query.QueryHistorySql;
 import org.apache.kylin.metadata.query.util.QueryHistoryUtil;
+import org.apache.kylin.tool.garbage.CleanTaskExecutorService;
 import org.apache.kylin.tool.garbage.StorageCleaner;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.AsyncResult;
@@ -69,8 +71,11 @@ public class AsyncTaskService implements 
AsyncTaskServiceSupporter {
 
         long startAt = System.currentTimeMillis();
         try {
-            val storageCleaner = new StorageCleaner();
-            storageCleaner.execute();
+            CleanTaskExecutorService.getInstance()
+                .submit(
+                    new 
StorageCleaner().withTag(StorageCleaner.CleanerTag.SERVICE),
+                    
KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(), 
TimeUnit.MILLISECONDS)
+                .get();
         } catch (Exception e) {
             MetricsGroup.hostTagCounterInc(MetricsName.STORAGE_CLEAN_FAILED, 
MetricsCategory.GLOBAL, GLOBAL);
             throw e;
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index cbd82ed108..7f5f2fb9e3 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2576,6 +2576,14 @@ public abstract class KylinConfigBase implements 
Serializable {
                 * 1024)).longValue();
     }
 
+    public long getStorageCleanTaskTimeout() {
+        return 
TimeUtil.timeStringAs(getOptional("kylin.storage.clean-timeout", "4h"), 
TimeUnit.MILLISECONDS);
+    }
+
+    public int getStorageCleanTaskConcurrency() {
+        return 
Integer.parseInt(getOptional("kylin.storage.clean-tasks-concurrency", "5"));
+    }
+
     public long getSourceUsageQuota() {
         Double d = 
Double.parseDouble(getOptional("kylin.storage.source-quota-in-giga-bytes", 
"-1"));
 
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
index 66f1d4aa22..2921098c70 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
@@ -51,11 +51,11 @@ public interface QueryHistoryDAO {
 
     String getQueryMetricMeasurement();
 
-    void deleteQueryHistoriesIfMaxSizeReached();
+    void deleteQueryHistoriesIfMaxSizeReached() throws InterruptedException;
 
-    void deleteQueryHistoriesIfRetainTimeReached();
+    void deleteQueryHistoriesIfRetainTimeReached() throws InterruptedException;
 
-    void deleteOldestQueryHistoriesByProject(String project, int deleteCount);
+    void deleteOldestQueryHistoriesByProject(String project, int deleteCount) 
throws InterruptedException;
 
     long getQueryHistoriesSize(QueryHistoryRequest request, String project);
 
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
index 3592dbec22..a6ed94675f 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
@@ -109,7 +109,7 @@ public class RDBMSQueryHistoryDAO implements 
QueryHistoryDAO {
         return jdbcQueryHisStore.queryByQueryId(queryId);
     }
 
-    public void deleteQueryHistoriesIfMaxSizeReached() {
+    public void deleteQueryHistoriesIfMaxSizeReached() throws 
InterruptedException {
         long maxSize = 
KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize();
         long totalCount = jdbcQueryHisStore.getCountOnQueryHistory();
         if (totalCount > maxSize) {
@@ -117,14 +117,14 @@ public class RDBMSQueryHistoryDAO implements 
QueryHistoryDAO {
         }
     }
 
-    public void deleteQueryHistoriesIfRetainTimeReached() {
+    public void deleteQueryHistoriesIfRetainTimeReached() throws 
InterruptedException {
         long rangeOutCount = 
jdbcQueryHisStore.getCountOnQueryHistory(getRetainTime());
         if (rangeOutCount > 0) {
             deleteQueryHistoryAndRealization((int) rangeOutCount);
         }
     }
 
-    public void deleteQueryHistoryAndRealization(int deleteCount) {
+    public void deleteQueryHistoryAndRealization(int deleteCount) throws 
InterruptedException {
         int singleLimit = 
KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize();
         largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> {
             QueryHistory queryHistory = 
jdbcQueryHisStore.getOldestQueryHistory(currentCount);
@@ -134,7 +134,7 @@ public class RDBMSQueryHistoryDAO implements 
QueryHistoryDAO {
         }, "Cleanup all query history");
     }
 
-    public void deleteOldestQueryHistoriesByProject(String project, int 
deleteCount) {
+    public void deleteOldestQueryHistoriesByProject(String project, int 
deleteCount) throws InterruptedException {
         int singleLimit = 
KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize();
         largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> {
             QueryHistory queryHistory = 
jdbcQueryHisStore.getOldestQueryHistory(project, currentCount);
@@ -280,9 +280,12 @@ public class RDBMSQueryHistoryDAO implements 
QueryHistoryDAO {
     }
 
     public static void largeSplitToSmallTask(int totalCount, int singleSize, 
IntFunction<Integer> function,
-            String description) {
+            String description) throws InterruptedException {
         int retainCount = totalCount;
         while (retainCount > 0) {
+            if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException("Interrupted during splitting a 
large query task!");
+            }
             int currentCount = Math.min(retainCount, singleSize);
             int actualCount = function.apply(currentCount);
             if (currentCount != actualCount && logger.isWarnEnabled()) {
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
index 30cd2d37d0..2842fd111c 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
@@ -836,7 +836,7 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testLargeSplitToSmallTask() {
+    public void testLargeSplitToSmallTask() throws InterruptedException {
         AtomicInteger executions = new AtomicInteger(0);
         AtomicInteger actualSize = new AtomicInteger(0);
         largeSplitToSmallTask(105, 10, currentCount -> {
diff --git 
a/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java
 
b/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java
index 2e80515367..af489e9933 100644
--- 
a/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java
+++ 
b/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java
@@ -22,7 +22,6 @@ import static 
org.apache.kylin.common.constant.Constants.METADATA_FILE;
 import static 
org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -55,7 +54,6 @@ import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.helper.MetadataToolHelper;
 import org.apache.kylin.helper.RoutineToolHelper;
 import org.apache.kylin.metadata.resourcegroup.KylinInstance;
 import org.apache.kylin.metadata.resourcegroup.RequestTypeEnum;
@@ -107,7 +105,7 @@ public class ScheduleService {
     private String tmpMetadataBackupFilePath;
 
     private static final ThreadLocal<Future<?>> CURRENT_FUTURE = new 
ThreadLocal<>();
-    private MetadataToolHelper metadataToolHelper = new MetadataToolHelper();
+
     private static final Map<Future<?>, Long> ASYNC_FUTURES = 
Maps.newConcurrentMap();
 
     @Scheduled(cron = "${kylin.metadata.ops-cron:0 0 0 * * *}")
@@ -126,7 +124,8 @@ public class ScheduleService {
                     AtomicReference<Pair<String, String>> backupFolder = new 
AtomicReference<>(null);
                     executeTask(() -> 
backupFolder.set(backupService.backupAll()), "MetadataBackup", startTime);
                     executeMetadataBackupInTenantMode(kylinConfig, startTime, 
backupFolder);
-                    executeTask(RoutineToolHelper::cleanQueryHistories, 
"QueryHistoriesCleanup", startTime);
+                    executeTask(() -> 
RoutineToolHelper.cleanQueryHistoriesAsync(getRemainingTime(startTime),
+                            TimeUnit.MILLISECONDS), "QueryHistoriesCleanup", 
startTime);
                     executeTask(RoutineToolHelper::cleanStreamingStats, 
"StreamingStatsCleanup", startTime);
                     executeTask(RoutineToolHelper::deleteRawRecItems, 
"RawRecItemsDeletion", startTime);
                     executeTask(RoutineToolHelper::cleanGlobalSourceUsage, 
"SourceUsageCleanup", startTime);
@@ -134,8 +133,7 @@ public class ScheduleService {
                 }
                 executeTask(() -> 
projectService.garbageCleanup(getRemainingTime(startTime)), 
"ProjectGarbageCleanup",
                         startTime);
-                executeTask(() -> metadataToolHelper.cleanStorage(true, 
Collections.emptyList(), 0, 0), "HdfsCleanup",
-                        startTime);
+                executeTask(RoutineToolHelper::cleanStorageForRoutine, 
"HdfsCleanup", startTime);
                 log.info("Finish to work, cost {}ms", 
System.currentTimeMillis() - startTime);
             }
         } catch (InterruptedException e) {
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
index 4976131faa..0a45d90c11 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
@@ -79,7 +79,6 @@ import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.guava30.shaded.common.io.ByteSource;
-import org.apache.kylin.helper.MetadataToolHelper;
 import org.apache.kylin.helper.RoutineToolHelper;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.metadata.cube.model.IndexPlan;
@@ -101,7 +100,6 @@ import 
org.apache.kylin.metadata.model.schema.SchemaNodeType;
 import org.apache.kylin.metadata.model.schema.SchemaUtil;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
 import org.apache.kylin.metadata.recommendation.candidate.RawRecItem;
@@ -122,6 +120,7 @@ import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.AclPermissionUtil;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.tool.garbage.CleanTaskExecutorService;
 import org.apache.kylin.tool.util.HashFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -161,8 +160,6 @@ public class MetaStoreService extends BasicService {
     @Autowired(required = false)
     private List<ModelChangeSupporter> modelChangeSupporters = 
Lists.newArrayList();
 
-    MetadataToolHelper metadataToolHelper = new MetadataToolHelper();
-
     public List<ModelPreviewResponse> getPreviewModels(String project, 
List<String> ids) {
         aclEvaluate.checkProjectWritePermission(project);
         return modelService.getManager(NDataflowManager.class, 
project).listAllDataflows(true).stream()
@@ -806,14 +803,15 @@ public class MetaStoreService extends BasicService {
     public void cleanupMeta(String project) {
         if (project.equals(UnitOfWork.GLOBAL_UNIT)) {
             RoutineToolHelper.cleanGlobalSourceUsage();
-            QueryHisStoreUtil.cleanQueryHistory();
+            RoutineToolHelper.cleanQueryHistoriesAsync().join();
         } else {
             RoutineToolHelper.cleanMetaByProject(project);
         }
     }
 
     public void cleanupStorage(String[] projectsToClean, boolean 
cleanupStorage) {
-        metadataToolHelper.cleanStorage(cleanupStorage, 
Arrays.asList(projectsToClean), 0D, 0);
+        
CleanTaskExecutorService.getInstance().cleanStorageForService(cleanupStorage, 
Arrays.asList(projectsToClean),
+                0D, 0);
     }
 
     public void cleanupStorage(StorageCleanupRequest request, 
HttpServletRequest servletRequest) {
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreTenantServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreTenantServiceTest.java
index 1c34c4b841..94ad1e8d3c 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreTenantServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreTenantServiceTest.java
@@ -19,17 +19,18 @@
 package org.apache.kylin.rest.service;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.response.RestResponse;
 import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.helper.MetadataToolHelper;
 import org.apache.kylin.metadata.resourcegroup.ResourceGroup;
 import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager;
 import org.apache.kylin.metadata.streaming.ReflectionUtils;
 import org.apache.kylin.rest.request.StorageCleanupRequest;
+import org.apache.kylin.tool.garbage.CleanTaskExecutorService;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -63,8 +64,6 @@ public class MetaStoreTenantServiceTest {
     private ResourceGroupManager rgManager = 
Mockito.mock(ResourceGroupManager.class);
     @Mock
     private KylinConfig kylinConfig = Mockito.mock(KylinConfig.class);
-    @Mock
-    private MetadataToolHelper metadataToolHelper = 
Mockito.spy(MetadataToolHelper.class);
 
     @Test
     public void cleanupStorageMultiTenantMode() throws IOException {
@@ -76,7 +75,6 @@ public class MetaStoreTenantServiceTest {
         
Mockito.when(kylinConfig.isKylinMultiTenantEnabled()).thenReturn(false);
 
         ReflectionUtils.setField(metaStoreService, "routeService", 
routeService);
-        ReflectionUtils.setField(metaStoreService, "metadataToolHelper", 
metadataToolHelper);
         ReflectionUtils.setField(routeService, "restTemplate", restTemplate);
 
         val restResult = JsonUtil.writeValueAsBytes(RestResponse.ok(true));
@@ -89,8 +87,8 @@ public class MetaStoreTenantServiceTest {
         Mockito.when(rgManager.getResourceGroup())
                 .thenReturn(JsonUtil.readValue(resourceGroupJson, new 
TypeReference<ResourceGroup>() {
                 }));
-        
Mockito.doNothing().when(metadataToolHelper).cleanStorage(ArgumentMatchers.anyBoolean(),
 ArgumentMatchers.any(),
-                ArgumentMatchers.anyDouble(), ArgumentMatchers.anyInt());
+        
CleanTaskExecutorService.getInstance().cleanStorageForRoutine(ArgumentMatchers.anyBoolean(),
 Collections.emptyList(),
+            ArgumentMatchers.anyDouble(), ArgumentMatchers.anyInt());
 
         val storageCleanupRequest = new StorageCleanupRequest();
         storageCleanupRequest.setCleanupStorage(false);
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/InitConfiguration.java 
b/src/server/src/main/java/org/apache/kylin/rest/InitConfiguration.java
index 99e675154c..aeaa76ad5c 100644
--- a/src/server/src/main/java/org/apache/kylin/rest/InitConfiguration.java
+++ b/src/server/src/main/java/org/apache/kylin/rest/InitConfiguration.java
@@ -31,7 +31,6 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Profile;
 
-
 import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
diff --git 
a/src/server/src/test/java/org/apache/kylin/rest/HAConfigurationTest.java 
b/src/server/src/test/java/org/apache/kylin/rest/HAConfigurationTest.java
index d0e35ad355..3b25cd6717 100644
--- a/src/server/src/test/java/org/apache/kylin/rest/HAConfigurationTest.java
+++ b/src/server/src/test/java/org/apache/kylin/rest/HAConfigurationTest.java
@@ -23,9 +23,9 @@ import javax.sql.DataSource;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
-import org.apache.kylin.helper.MetadataToolHelper;
 import org.apache.kylin.junit.annotation.MetadataInfo;
 import org.apache.kylin.junit.annotation.OverwriteProp;
+import org.apache.kylin.tool.util.MetadataUtil;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -48,13 +48,11 @@ class HAConfigurationTest {
     @Mock
     SessionProperties sessionProperties;
 
-    MetadataToolHelper metadataToolHelper = new MetadataToolHelper();
-
     DataSource dataSource;
 
     @BeforeEach
     public void setup() throws Exception {
-        dataSource = 
Mockito.spy(metadataToolHelper.getDataSource(getTestConfig()));
+        dataSource = Mockito.spy(MetadataUtil.getDataSource(getTestConfig()));
         ReflectionTestUtils.setField(configuration, "dataSource", dataSource);
     }
 
diff --git 
a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java 
b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java
index 8c9b06d441..7d796992b4 100644
--- a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java
+++ b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java
@@ -18,8 +18,12 @@
 
 package org.apache.kylin.helper;
 
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.SetThreadName;
@@ -30,12 +34,15 @@ import 
org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
 import org.apache.kylin.metadata.streaming.util.StreamingJobRecordStoreUtil;
 import org.apache.kylin.metadata.streaming.util.StreamingJobStatsStoreUtil;
+import org.apache.kylin.tool.garbage.AbstractComparableCleanTask;
+import org.apache.kylin.tool.garbage.CleanTaskExecutorService;
 import org.apache.kylin.tool.garbage.GarbageCleaner;
+import org.apache.kylin.tool.garbage.PriorityExecutor;
 import org.apache.kylin.tool.garbage.SourceUsageCleaner;
 import org.apache.kylin.tool.garbage.StorageCleaner;
 
-import java.util.List;
-import java.util.stream.Collectors;
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
 
 /*
  * this class is only for removing dependency of kylin-tool module, and should 
be refactor later
@@ -46,8 +53,42 @@ public class RoutineToolHelper {
     private RoutineToolHelper() {
     }
 
-    public static void cleanQueryHistories() {
-        QueryHisStoreUtil.cleanQueryHistory();
+    public static CompletableFuture<Void> cleanQueryHistoriesAsync(long 
timeout, TimeUnit timeUnit) {
+        tryInitCleanTaskExecutorService();
+        return CleanTaskExecutorService.getInstance().submit(new 
AbstractComparableCleanTask() {
+            @Override
+            public String getName() {
+                return "cleanQueryHistoriesForAllProjects";
+            }
+
+            @Override
+            protected void doRun() {
+                QueryHisStoreUtil.cleanQueryHistory();
+            }
+
+            @Override
+            public StorageCleaner.CleanerTag getCleanerTag() {
+                return StorageCleaner.CleanerTag.ROUTINE;
+            }
+        }, timeout, timeUnit);
+    }
+
+    public static CompletableFuture<Void> cleanQueryHistoriesAsync() {
+        return 
cleanQueryHistoriesAsync(KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public static void cleanStorageForRoutine() {
+        tryInitCleanTaskExecutorService();
+        CleanTaskExecutorService.getInstance().cleanStorageForRoutine(true, 
Collections.emptyList(), 0, 0);
+    }
+
+    private static void tryInitCleanTaskExecutorService() {
+        CleanTaskExecutorService.getInstance().bindWorkingPool(() -> {
+            log.warn("Init the cleaning task thread pool from thread {}.", 
Thread.currentThread().getName());
+            return 
PriorityExecutor.newWorkingThreadPool("routine-tool-helper-pool",
+                    
KylinConfig.getInstanceFromEnv().getStorageCleanTaskConcurrency());
+        });
     }
 
     public static void cleanStreamingStats() {
@@ -100,7 +141,7 @@ public class RoutineToolHelper {
             for (String projName : projectsToCleanup) {
                 cleanMetaByProject(projName);
             }
-            cleanQueryHistories();
+            cleanQueryHistoriesAsync();
             cleanStreamingStats();
             deleteRawRecItems();
             System.out.println("Metadata cleanup finished");
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/AbstractComparableCleanTask.java
 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/AbstractComparableCleanTask.java
new file mode 100644
index 0000000000..35bc86c16f
--- /dev/null
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/AbstractComparableCleanTask.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.garbage;
+
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The priority of the task to be executed is:
+ *   ROUTINE < CLI < API
+ */
+public abstract class AbstractComparableCleanTask implements Runnable, 
Comparable<AbstractComparableCleanTask> {
+    private final CompletableFuture<Void> watcher = new CompletableFuture<>();
+
+    public String getName() {
+        return getClass().getName();
+    }
+
+    public String getBrief() {
+        return String.format("Task-%s: {%s}", getName(), details());
+    }
+
+    protected String details() {
+        return String.format("tag: %s, class: %s", getCleanerTag(), 
getClass().getName());
+    }
+
+    public CompletableFuture<Void> getWatcher() {
+        return watcher;
+    }
+
+    public abstract StorageCleaner.CleanerTag getCleanerTag();
+
+    @Override
+    public void run() {
+        try {
+            doRun();
+            watcher.complete(null);
+        } catch (Throwable t) {
+            watcher.completeExceptionally(t);
+        }
+    }
+
+    protected void doRun() {
+    }
+
+    /**
+     * Note: this class has a natural ordering that is inconsistent with 
equals.
+     */
+    @Override
+    public int compareTo(@NotNull AbstractComparableCleanTask t) {
+        if (this == t || getCleanerTag() == t.getCleanerTag()) {
+            return 0;
+        } else if (getCleanerTag().ordinal() > t.getCleanerTag().ordinal()) {
+            return -1;
+        } else {
+            return 1;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AbstractComparableCleanTask that = (AbstractComparableCleanTask) o;
+        return Objects.equals(watcher, that.watcher) && 
Objects.equals(getCleanerTag(), that.getCleanerTag());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(watcher, getCleanerTag());
+    }
+}
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java
 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java
new file mode 100644
index 0000000000..da12dd15bb
--- /dev/null
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.garbage;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinRuntimeException;
+import org.apache.kylin.common.util.DaemonThreadFactory;
+import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ThreadSafe
+public class CleanTaskExecutorService implements Closeable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CleanTaskExecutorService.class);
+
+    private static final CleanTaskExecutorService INSTANCE = new 
CleanTaskExecutorService();
+
+    private static final int SHUTDOWN_TIMEOUT_SECONDS = 2;
+
+    private final ScheduledExecutorService timeoutCheckerPool = Executors
+            .newSingleThreadScheduledExecutor(new 
DaemonThreadFactory("storage-cleaner-timeout-checker"));
+
+    private final AtomicBoolean bound = new AtomicBoolean(false);
+    // Should move it outside to gain a more sensible composition code.
+    private ExecutorService pool;
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+                LOGGER.info("[{}] Shutdown when JVM is to shutdown.", 
CleanTaskExecutorService.class.getName());
+                INSTANCE.close();
+                LOGGER.info("[{}] Shutdown successfully.", 
CleanTaskExecutorService.class.getName());
+            } catch (Exception e) {
+                LOGGER.error("[{}] Occurring exceptions to shutdown!", 
CleanTaskExecutorService.class.getName());
+            }
+        }));
+    }
+
+    private CleanTaskExecutorService() {
+    }
+
+    public static CleanTaskExecutorService getInstance() {
+        return INSTANCE;
+    }
+
+    /**
+     * True if the instance binds nothing, else false.
+     */
+    public boolean bindWorkingPool(Supplier<ExecutorService> 
workingPoolProvider) {
+        Preconditions.checkArgument(workingPoolProvider != null, "workingPool 
is null");
+        if (bound.compareAndSet(false, true)) {
+            pool = workingPoolProvider.get();
+            return true;
+        }
+        return false;
+    }
+
+    public CompletableFuture<Void> cleanStorageForService(boolean 
storageCleanup, List<String> projects,
+            double requestFSRate, int retryTimes) {
+        return cleanStorageAsync(RandomUtil.randomUUIDStr(), 
StorageCleaner.CleanerTag.SERVICE, storageCleanup,
+                projects, requestFSRate, retryTimes);
+    }
+
+    public void cleanStorageForRoutine(boolean storageCleanup, List<String> 
projects, double requestFSRate,
+            int retryTimes) {
+        cleanStorageSync(StorageCleaner.CleanerTag.ROUTINE, storageCleanup, 
projects, requestFSRate, retryTimes);
+    }
+
+    private void cleanStorageSync(StorageCleaner.CleanerTag tag, boolean 
storageCleanup, List<String> projects,
+            double requestFSRate, int retryTimes) {
+        cleanStorageAsync(RandomUtil.randomUUIDStr(), tag, storageCleanup, 
projects, requestFSRate, retryTimes).join();
+    }
+
+    private CompletableFuture<Void> cleanStorageAsync(String traceId, 
StorageCleaner.CleanerTag tag,
+            boolean storageCleanup, List<String> projects, double 
requestFSRate, int retryTimes) {
+        StorageCleaner cleaner = makeCleaner(traceId, tag, projects, 
requestFSRate, retryTimes, storageCleanup);
+        if (cleaner != null) {
+            return cleanStorageAsync(cleaner);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private StorageCleaner makeCleaner(String traceId, 
StorageCleaner.CleanerTag tag, List<String> projects,
+            double requestFSRate, int retryTimes, boolean storageCleanup) {
+        Preconditions.checkArgument(projects != null, "projects is null");
+        StorageCleaner storageCleaner = null;
+        try {
+            storageCleaner = new StorageCleaner(storageCleanup, projects, 
requestFSRate, retryTimes);
+            storageCleaner.withTag(tag);
+            storageCleaner.withTraceId(traceId);
+        } catch (Exception e) {
+            LOGGER.error(
+                    "Failed to create storage cleaner for projects: {}. 
TraceId: {}", projects, traceId,
+                    e);
+        }
+        return storageCleaner;
+    }
+
+    private CompletableFuture<Void> cleanStorageAsync(StorageCleaner cleaner) {
+        LOGGER.info("To submit cleaning hdfs files task. TraceId: {}.", 
cleaner.getTraceId());
+        CompletableFuture<Void> f = submit(cleaner, 
KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(),
+                TimeUnit.MILLISECONDS);
+        f.whenComplete((v, t) -> {
+            if (t == null) {
+                LOGGER.info("HDFS files cleaning task has successfully 
completed. TraceId: {}", cleaner.getTraceId());
+                return;
+            }
+            LOGGER.error(StorageCleaner.ANSI_RED
+                    + "cleanup HDFS failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
+                    + StorageCleaner.ANSI_RESET + ". TraceId: " + 
cleaner.getTraceId(), t);
+        });
+        return f;
+    }
+
+    public CompletableFuture<Void> submit(StorageCleaner cleaner, long 
timeout, TimeUnit timeUnit) {
+        return submit(new AbstractComparableCleanTask() {
+            @Override
+            public String getName() {
+                return cleaner.getTraceId();
+            }
+
+            @Override
+            protected String details() {
+                return String.format("traceId: %s, tag: %s, projects: %s", 
cleaner.getTraceId(), cleaner.getTag(),
+                        cleaner.getProjectNames().toString());
+            }
+
+            @Override
+            public StorageCleaner.CleanerTag getCleanerTag() {
+                return cleaner.getTag();
+            }
+
+            @Override
+            protected void doRun() {
+                try {
+                    cleaner.execute();
+                } catch (Exception e) {
+                    throw new KylinRuntimeException(e);
+                }
+            }
+        }, timeout, timeUnit);
+    }
+
+    public CompletableFuture<Void> submit(AbstractComparableCleanTask task, 
long timeout, TimeUnit timeUnit) {
+        if (pool == null) {
+            throw new RejectedExecutionException("The working pool has not 
been initialized, please bind it at first!");
+        }
+
+        LOGGER.debug("To submit storage cleaning task {}.", task.getBrief());
+        CompletableFuture<Void> resultFuture = task.getWatcher();
+        AtomicReference<Future<?>> cancelFuture = new AtomicReference<>(null);
+        try {
+            if (!resultFuture.isDone()) {
+                Future<?> workingFuture = pool.submit(task);
+                LOGGER.info("Submitted storage cleaning task {}.", 
task.getBrief());
+
+                cancelFuture.set(timeoutCheckerPool.schedule(() -> {
+                    if (!workingFuture.isDone() && 
!workingFuture.cancel(true)) {
+                        LOGGER.warn("You may have leaked threads, failed to 
cancel task {}!", task.getBrief());
+                    }
+                    resultFuture.completeExceptionally(new 
TimeoutException("Timeout for cleaning!"));
+                }, timeout, timeUnit));
+            }
+        } catch (RejectedExecutionException re) {
+            resultFuture.completeExceptionally(re);
+        }
+
+        resultFuture.whenComplete((v, t) -> {
+            if (cancelFuture.get() != null) {
+                cancelFuture.get().cancel(true);
+            }
+            if (t == null) {
+                LOGGER.info("Cleaning task {} successfully completed!", 
task.getBrief());
+            } else {
+                throw new CompletionException(
+                        "To cancel task watcher because of " + "encountering 
exceptions for task " + task.getBrief(),
+                        t);
+            }
+        });
+
+        return resultFuture;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (pool != null) {
+            try {
+                pool.shutdown();
+                if (!pool.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)) {
+                    LOGGER.warn("The working thread pool couldn't shutdown 
before timeout {}s!",
+                            SHUTDOWN_TIMEOUT_SECONDS);
+                    pool.shutdownNow();
+                }
+            } catch (InterruptedException ie) {
+                pool.shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+}
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/PriorityExecutor.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/PriorityExecutor.java
new file mode 100644
index 0000000000..8ec0d3d51b
--- /dev/null
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/PriorityExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.garbage;
+
+import lombok.EqualsAndHashCode;
+import org.apache.kylin.common.util.DaemonThreadFactory;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class PriorityExecutor extends ThreadPoolExecutor {
+
+    @EqualsAndHashCode
+    public static class ComparableFutureTask<T extends Comparable<?>, R> 
extends FutureTask<R>
+            implements Comparable<ComparableFutureTask<T, R>> {
+
+        private final Comparable<T> task;
+
+        public ComparableFutureTask(Runnable runnable, R result) {
+            super(runnable, result);
+            Preconditions.checkArgument(runnable instanceof Comparable, 
"runnable should also be comparable!");
+            this.task = (Comparable<T>) runnable;
+        }
+
+        /**
+         * Note: this class has a natural ordering that is inconsistent with 
equals.
+         */
+        @Override
+        public int compareTo(ComparableFutureTask<T, R> o) {
+            return task.compareTo((T) o.task);
+        }
+    }
+
+    private PriorityExecutor(int corePoolSize, int maximumPoolSize, long 
keepAliveTime, TimeUnit unit,
+            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+    }
+
+    public static PriorityExecutor newWorkingThreadPool(String name, int 
maxPoolSize) {
+        return new PriorityExecutor(maxPoolSize, maxPoolSize, 60L, 
TimeUnit.SECONDS, new PriorityBlockingQueue<>(),
+                new DaemonThreadFactory(name));
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+        return new ComparableFutureTask<>(runnable, null);
+    }
+
+}
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
index 249814b9f4..30d052cf10 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java
@@ -98,6 +98,8 @@ public class StorageCleaner {
 
     private final boolean cleanup;
     private final boolean timeMachineEnabled;
+
+    @Getter
     private final Collection<String> projectNames;
     private final KylinConfig kylinConfig;
 
@@ -108,6 +110,16 @@ public class StorageCleaner {
     private final Map<String, String> trashRecord;
     private final ResourceStore resourceStore;
 
+    public enum CleanerTag {
+        ROUTINE, CLI, SERVICE
+    }
+
+    @Getter
+    private CleanerTag tag = CleanerTag.ROUTINE;
+
+    @Getter
+    private String traceId = "";
+
     public StorageCleaner() throws Exception {
         this(true);
     }
@@ -138,6 +150,16 @@ public class StorageCleaner {
         }
     }
 
+    public StorageCleaner withTag(CleanerTag tag) {
+        this.tag = tag;
+        return this;
+    }
+
+    public StorageCleaner withTraceId(String id) {
+        this.traceId = id;
+        return this;
+    }
+
     @Getter
     private Set<StorageItem> outdatedItems = Sets.newHashSet();
 
@@ -171,7 +193,11 @@ public class StorageCleaner {
         log.info("all file systems are {}", allFileSystems);
         for (StorageItem allFileSystem : allFileSystems) {
             log.debug("start to collect HDFS from {}", 
allFileSystem.getPath());
-            collectFromHDFS(allFileSystem);
+            try {
+                collectFromHDFS(allFileSystem);
+            } catch (FileNotFoundException e) {
+                log.warn("No garbage files collected from {}", 
allFileSystem.getPath());
+            }
             log.debug("folder {} is collected,detailed -> {}", 
allFileSystem.getPath(), allFileSystems);
         }
         UnitOfWork.doInTransactionWithRetry(() -> {
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleanerCLI.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleanerCLI.java
index 0b2b7bc2b1..865f5c570c 100644
--- 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleanerCLI.java
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleanerCLI.java
@@ -17,19 +17,29 @@
  */
 package org.apache.kylin.tool.garbage;
 
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Unsafe;
 
-import lombok.val;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kylin.guava30.shaded.common.util.concurrent.MoreExecutors;
 
 @Slf4j
 public class StorageCleanerCLI {
 
     public static void main(String[] args) {
+        System.out.println("start to cleanup HDFS.");
         try {
-            val storageCleaner = new StorageCleaner();
-            log.info("Start cleanup HDFS.");
-            storageCleaner.execute();
+            log.info("Init cleaning task thread pool as the direct executor 
service.");
+            
CleanTaskExecutorService.getInstance().bindWorkingPool(MoreExecutors::newDirectExecutorService);
+
+            StorageCleaner cleaner = new 
StorageCleaner().withTag(StorageCleaner.CleanerTag.CLI);
+            CleanTaskExecutorService.getInstance()
+                .submit(
+                    cleaner,
+                    
KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(), 
TimeUnit.MILLISECONDS)
+                .get();
         } catch (Exception e) {
             log.warn("cleanup HDFS failed.", e);
         }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java 
b/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java
index a623ef8046..a9ef41094c 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java
@@ -38,6 +38,7 @@ public class FastRoutineTool extends RoutineTool {
             return;
         }
         initOptionValues(optionsHelper);
+
         List<String> projectsToCleanup = getProjectsToCleanup();
         try {
             if (isMetadataCleanup()) {
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java 
b/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java
index a1f0500f8e..0de65f1780 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java
@@ -27,11 +27,14 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ExecutableApplication;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.guava30.shaded.common.util.concurrent.MoreExecutors;
 import org.apache.kylin.helper.MetadataToolHelper;
 import org.apache.kylin.helper.RoutineToolHelper;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.tool.MaintainModeTool;
+import org.apache.kylin.tool.garbage.CleanTaskExecutorService;
+import org.apache.kylin.tool.garbage.StorageCleaner;
 import org.apache.kylin.tool.util.ToolMainWrapper;
 import org.apache.kylin.metadata.epoch.EpochManager;
 
@@ -56,6 +59,11 @@ public class RoutineTool extends ExecutableApplication {
 
     private MetadataToolHelper helper = new MetadataToolHelper();
 
+    static {
+        log.info("Init cleaning task thread pool as the direct executor 
service.");
+        
CleanTaskExecutorService.getInstance().bindWorkingPool(MoreExecutors::newDirectExecutorService);
+    }
+
     public static void main(String[] args) {
         ToolMainWrapper.wrap(args, () -> {
             RoutineTool tool = new RoutineTool();
@@ -69,7 +77,7 @@ public class RoutineTool extends ExecutableApplication {
     }
 
     public static void cleanQueryHistories() {
-        RoutineToolHelper.cleanQueryHistories();
+        RoutineToolHelper.cleanQueryHistoriesAsync();
     }
 
     public static void cleanStreamingStats() {
@@ -105,6 +113,7 @@ public class RoutineTool extends ExecutableApplication {
             return;
         }
         initOptionValues(optionsHelper);
+
         System.out.println("Start to cleanup metadata");
         List<String> projectsToCleanup = getProjectsToCleanup();
         MaintainModeTool maintainModeTool = new MaintainModeTool("routine 
tool");
@@ -113,6 +122,7 @@ public class RoutineTool extends ExecutableApplication {
         if (EpochManager.getInstance().isMaintenanceMode()) {
             Runtime.getRuntime().addShutdownHook(new 
Thread(maintainModeTool::releaseEpochs));
         }
+
         doCleanup(projectsToCleanup);
     }
 
@@ -124,11 +134,21 @@ public class RoutineTool extends ExecutableApplication {
             cleanStorage();
         } catch (Exception e) {
             log.error("Failed to execute routintool", e);
+            throw e;
         }
     }
 
     public void cleanStorage() {
-        helper.cleanStorage(storageCleanup, Arrays.asList(projects), 
requestFSRate, retryTimes);
+        try {
+            System.out.println("Start to cleanup HDFS");
+            CleanTaskExecutorService.getInstance().cleanStorageForRoutine(
+                storageCleanup, Arrays.asList(projects), requestFSRate, 
retryTimes);
+            System.out.println("cleanup HDFS finished");
+        } catch (Exception e) {
+            System.out.println(StorageCleaner.ANSI_RED
+                + "cleanup HDFS failed. Detailed Message is at 
${KYLIN_HOME}/logs/shell.stderr"
+                + StorageCleaner.ANSI_RESET);
+        }
     }
 
     protected boolean printUsage(OptionsHelper optionsHelper) {
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/util/MetadataUtil.java 
b/src/tool/src/main/java/org/apache/kylin/tool/util/MetadataUtil.java
index ae8cb074a1..4a58bc2903 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/util/MetadataUtil.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/util/MetadataUtil.java
@@ -36,18 +36,17 @@ import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.ibatis.jdbc.ScriptRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.logging.LogOutputStream;
+import org.apache.kylin.common.persistence.metadata.JdbcDataSource;
 import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
-import org.apache.kylin.helper.MetadataToolHelper;
-
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 
+import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class MetadataUtil {
 
     private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
-    private static MetadataToolHelper metadataToolHelper = new 
MetadataToolHelper();
 
     private MetadataUtil() {
     }
@@ -60,7 +59,9 @@ public class MetadataUtil {
     }
 
     public static DataSource getDataSource(KylinConfig kylinConfig) throws 
Exception {
-        return metadataToolHelper.getDataSource(kylinConfig);
+        val url = kylinConfig.getMetadataUrl();
+        val props = JdbcUtil.datasourceParameters(url);
+        return JdbcDataSource.getDataSource(props);
     }
 
     public static void createTableIfNotExist(BasicDataSource dataSource, 
String tableName, String tableSql,
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java
new file mode 100644
index 0000000000..1b0e368eca
--- /dev/null
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.garbage;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
+import org.awaitility.core.ConditionTimeoutException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class CleanTaskExecutorServiceTests extends NLocalFileMetadataTestCase {
+
+    CleanTaskExecutorService mockedHelper = getTempInstance();
+
+    @BeforeEach
+    public void setup() {
+        createTestMetadata();
+    }
+
+    static class TestStorageCleaner extends StorageCleaner {
+
+        private long timeout = 1000;
+
+        private Runnable runnable;
+
+        public TestStorageCleaner(Runnable runnable) throws Exception {
+            this.runnable = runnable;
+        }
+
+        public TestStorageCleaner(long timeout) throws Exception {
+            this.timeout = timeout;
+        }
+
+        @Override
+        public void execute() throws Exception {
+            if (runnable != null) {
+                runnable.run();
+            }
+            try {
+                await().atMost(timeout, TimeUnit.MILLISECONDS).untilTrue(new 
AtomicBoolean(false));
+            } catch (ConditionTimeoutException cte) {
+                // Ignore
+            }
+        }
+    }
+
+    static class TestComparableCleanTask extends AbstractComparableCleanTask {
+        private final StorageCleaner.CleanerTag tag;
+        public TestComparableCleanTask(StorageCleaner.CleanerTag tag) {
+            this.tag = tag;
+        }
+
+        @Override
+        public StorageCleaner.CleanerTag getCleanerTag() {
+            return tag;
+        }
+    }
+
+    @Test
+    @Order(1)
+    void testCreateCleanTask() {
+        AbstractComparableCleanTask t1 = new 
TestComparableCleanTask(StorageCleaner.CleanerTag.ROUTINE);
+        
assertTrue(t1.getName().startsWith(CleanTaskExecutorServiceTests.class.getName()));
+        assertEquals(String.format("Task-%s: {tag: %s, class: %s}", 
t1.getName(), StorageCleaner.CleanerTag.ROUTINE,
+                t1.getClass().getName()), t1.getBrief());
+
+        AbstractComparableCleanTask t2 = new 
TestComparableCleanTask(StorageCleaner.CleanerTag.SERVICE);
+        assertEquals(TestComparableCleanTask.class.getName(), t2.getName());
+        assertEquals(String.format("Task-%s: {tag: %s, class: %s}", 
t2.getName(), StorageCleaner.CleanerTag.SERVICE,
+                t2.getClass().getName()), t2.getBrief());
+
+        // t2 has a higher priority
+        assertTrue(t1.compareTo(t2) > 0);
+
+        AbstractComparableCleanTask t3 = new 
TestComparableCleanTask(StorageCleaner.CleanerTag.SERVICE);
+        AbstractComparableCleanTask t4 = t3;
+
+        assertNotEquals(null, t1);
+        assertEquals(0, t2.compareTo(t3));
+        assertEquals(0, t3.compareTo(t4));
+        assertNotEquals(t2, t3);
+        assertEquals(t3, t4);
+    }
+
+    @Test
+    @Order(1)
+    void testComparableFutureTask() {
+        AbstractComparableCleanTask ct1 = new AbstractComparableCleanTask() {
+            @Override
+            public StorageCleaner.CleanerTag getCleanerTag() {
+                return StorageCleaner.CleanerTag.ROUTINE;
+            }
+        };
+
+        PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, 
Boolean> t1 =
+            new PriorityExecutor.ComparableFutureTask<>(ct1, true);
+        PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, 
Boolean> t11 =
+            new PriorityExecutor.ComparableFutureTask<>(ct1, true);
+
+        assertNotEquals(null, t1);
+
+        PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, 
Boolean> t1Ref = t1;
+        assertEquals(t1, t1Ref);
+        assertEquals(t1, t11);
+
+        AbstractComparableCleanTask ct2 = new AbstractComparableCleanTask() {
+            @Override
+            public StorageCleaner.CleanerTag getCleanerTag() {
+                return StorageCleaner.CleanerTag.ROUTINE;
+            }
+        };
+
+        PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, 
Boolean> t21 =
+            new PriorityExecutor.ComparableFutureTask<>(ct2, true);
+
+        assertNotEquals(t11, t21);
+
+        assertEquals(0, t21.compareTo(t11));
+        assertNotEquals(t11, t21);
+
+        AbstractComparableCleanTask ct3 = new AbstractComparableCleanTask() {
+            @Override
+            public StorageCleaner.CleanerTag getCleanerTag() {
+                return StorageCleaner.CleanerTag.SERVICE;
+            }
+        };
+
+        PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, 
Boolean> t31 =
+            new PriorityExecutor.ComparableFutureTask<>(ct3, true);
+
+        assertTrue(t31.compareTo(t21) < 0, "t31 should be the predecessor 
task");
+    }
+
+    @Test
+    @Order(1)
+    void testRunWithoutThreadPool() {
+        assertThrows(RejectedExecutionException.class,
+            () -> mockedHelper.submit(new TestStorageCleaner(1), 1, 
TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    @Order(1)
+    void testBindThreadPool() throws NoSuchFieldException, 
IllegalAccessException {
+        resetInnerPool();
+        assertTrue(mockedHelper.bindWorkingPool(this::createPriorityPool));
+        assertFalse(mockedHelper.bindWorkingPool(this::createPriorityPool));
+    }
+
+    @Test
+    @Order(1)
+    void testCleanTaskTimeout() throws Exception {
+        StorageCleaner routineSc = new 
TestStorageCleaner(5000).withTag(StorageCleaner.CleanerTag.ROUTINE)
+                .withTraceId("1");
+
+        overwriteSystemProp("kylin.storage.clean-tasks-concurrency", "1");
+
+        mockedHelper.bindWorkingPool(this::createPriorityPool);
+        CompletableFuture<?> f = mockedHelper.submit(routineSc, 1, 
TimeUnit.SECONDS);
+        CompletionException ce = assertThrows(CompletionException.class, 
f::join);
+
+        assertTrue(ce.getCause() instanceof TimeoutException);
+    }
+
+    @Test
+    @Order(10)
+    void testCleanTaskRejected() throws Exception {
+        StorageCleaner routineSc = new 
TestStorageCleaner(10000).withTag(StorageCleaner.CleanerTag.ROUTINE)
+                .withTraceId("1");
+
+        overwriteSystemProp("kylin.storage.clean-tasks-concurrency", "1");
+
+        mockedHelper.bindWorkingPool(this::createPriorityPool);
+        mockedHelper.close();
+
+        CompletableFuture<?> f = mockedHelper.submit(routineSc, 1, 
TimeUnit.SECONDS);
+        CompletionException ce = assertThrows(CompletionException.class, 
f::join);
+
+        assertTrue(ce.getCause() instanceof RejectedExecutionException);
+        assertTrue(f.isDone());
+        resetInnerPool();
+    }
+
+    @Test
+    @Order(1)
+    void testCleanTasksCompletedInOrderByTag() throws Exception {
+        StorageCleaner routineSc = new 
TestStorageCleaner(1000).withTag(StorageCleaner.CleanerTag.ROUTINE)
+                .withTraceId("1");
+        StorageCleaner cliSc = new 
TestStorageCleaner(200).withTag(StorageCleaner.CleanerTag.CLI).withTraceId("2");
+        StorageCleaner serviceSc = new 
TestStorageCleaner(200).withTag(StorageCleaner.CleanerTag.SERVICE)
+                .withTraceId("3");
+
+        overwriteSystemProp("kylin.storage.clean-tasks-concurrency", "1");
+
+        resetInnerPool();
+        mockedHelper.bindWorkingPool(this::createPriorityPool);
+
+        List<String> completedTasks = new ArrayList<>();
+        List<CompletableFuture<?>> tasks = new ArrayList<>();
+
+        StorageCleaner initSc = new TestStorageCleaner(() -> {
+            tasks.add(mockedHelper.submit(routineSc, 6, TimeUnit.SECONDS)
+                    .whenComplete((v, t) -> 
completedTasks.add(routineSc.getTraceId())));
+            tasks.add(mockedHelper.submit(cliSc, 6, TimeUnit.SECONDS)
+                    .whenComplete((v, t) -> 
completedTasks.add(cliSc.getTraceId())));
+            tasks.add(mockedHelper.submit(serviceSc, 6, TimeUnit.SECONDS)
+                    .whenComplete((v, t) -> 
completedTasks.add(serviceSc.getTraceId())));
+        });
+
+        CompletableFuture<?> initFuture = mockedHelper.submit(initSc, 6, 
TimeUnit.SECONDS);
+        CompletableFuture.allOf(initFuture).join();
+        CompletableFuture.allOf(tasks.toArray(new 
CompletableFuture[0])).join();
+
+        // initSc is the first routine task in to the working queue, and it 
should be the first to be executed.
+        // During cf1 executing, cf1/cf2/cf3 will be added into the working 
queue and sorted by cleaner tag,
+        // of which permutation should be [cf3, cf2, cf1], so after initSc 
completed, the other tasks will
+        // finally complete in sequence.
+        assertEquals(ImmutableList.of("3", "2", "1"), completedTasks);
+    }
+
+    private ExecutorService createPriorityPool() {
+        return PriorityExecutor.newWorkingThreadPool("test-pool", 
getTestConfig().getStorageCleanTaskConcurrency());
+    }
+
+    @Test
+    @Order(11)
+    void testDoubleRetriesToShutdown() throws NoSuchFieldException, 
IllegalAccessException {
+        overwriteSystemProp("kylin.storage.clean-tasks-concurrency", "1");
+        mockedHelper.bindWorkingPool(this::createPriorityPool);
+
+        AtomicBoolean shutdownTriggered = new AtomicBoolean(false);
+        CompletableFuture<?> future = mockedHelper.submit(new 
AbstractComparableCleanTask() {
+            @Override
+            protected void doRun() {
+                while (!Thread.currentThread().isInterrupted()) {
+                    try {
+                        await().atMost(10, TimeUnit.SECONDS).untilTrue(new 
AtomicBoolean(false));
+                    } catch (Exception e) {
+                        shutdownTriggered.set(true);
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+
+            @Override
+            public StorageCleaner.CleanerTag getCleanerTag() {
+                return StorageCleaner.CleanerTag.SERVICE;
+            }
+        }, 10, TimeUnit.SECONDS);
+
+        new Thread(() -> {
+            try {
+                await().atMost(1, TimeUnit.SECONDS).untilTrue(new 
AtomicBoolean(false));
+            } catch (Exception ignored) {
+                try {
+                    mockedHelper.close();
+                } catch (IOException ignored1) {
+                }
+            }
+        }).start();
+        future.join();
+        assertTrue(future.isDone() && shutdownTriggered.get());
+    }
+
+    private CleanTaskExecutorService getTempInstance() {
+        try {
+            Constructor<CleanTaskExecutorService> constructor = 
CleanTaskExecutorService.class.getDeclaredConstructor();
+            constructor.setAccessible(true);
+            return constructor.newInstance();
+        } catch (Exception ignored) {
+            return null;
+        }
+    }
+
+    private void resetInnerPool() throws NoSuchFieldException, 
IllegalAccessException {
+        Field poolField = 
CleanTaskExecutorService.class.getDeclaredField("pool");
+        poolField.setAccessible(true);
+        poolField.set(mockedHelper, null);
+
+        Field boundField = 
CleanTaskExecutorService.class.getDeclaredField("bound");
+        boundField.setAccessible(true);
+        boundField.set(mockedHelper, new AtomicBoolean(false));
+    }
+
+}

Reply via email to