This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 180ff07afeaf64da8e565b7aa5882c3a89f1c268 Author: fengguangyuan <qq272101...@gmail.com> AuthorDate: Sat Jul 1 12:07:43 2023 +0800 KYLIN-5745 Using a global thread pool to clean underlying storages 1. Using a global thread pool to clean underlying storages; 2. Launching cleaning tasks in the local thread and to ignore FileNotFoundException while collecting HDFS files. Co-authored-by: Guangyuan Feng <guangyuan.f...@kyligence.io> --- .../apache/kylin/rest/config/AppInitializer.java | 7 + .../kylin/rest/service/AsyncTaskService.java | 9 +- .../org/apache/kylin/common/KylinConfigBase.java | 8 + .../kylin/metadata/query/QueryHistoryDAO.java | 6 +- .../kylin/metadata/query/RDBMSQueryHistoryDAO.java | 13 +- .../metadata/query/RDBMSQueryHistoryDaoTest.java | 2 +- .../apache/kylin/rest/service/ScheduleService.java | 10 +- .../kylin/rest/service/MetaStoreService.java | 10 +- .../rest/service/MetaStoreTenantServiceTest.java | 10 +- .../org/apache/kylin/rest/InitConfiguration.java | 1 - .../org/apache/kylin/rest/HAConfigurationTest.java | 6 +- .../org/apache/kylin/helper/RoutineToolHelper.java | 55 +++- .../tool/garbage/AbstractComparableCleanTask.java | 93 ++++++ .../tool/garbage/CleanTaskExecutorService.java | 236 +++++++++++++++ .../kylin/tool/garbage/PriorityExecutor.java | 71 +++++ .../apache/kylin/tool/garbage/StorageCleaner.java | 28 +- .../kylin/tool/garbage/StorageCleanerCLI.java | 18 +- .../apache/kylin/tool/routine/FastRoutineTool.java | 1 + .../org/apache/kylin/tool/routine/RoutineTool.java | 24 +- .../org/apache/kylin/tool/util/MetadataUtil.java | 9 +- .../garbage/CleanTaskExecutorServiceTests.java | 324 +++++++++++++++++++++ 21 files changed, 889 insertions(+), 52 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java index 5a2a833305..35af91a415 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java @@ -60,6 +60,8 @@ import org.apache.kylin.rest.service.task.QueryHistoryTaskScheduler; import org.apache.kylin.rest.util.JStackDumpTask; import org.apache.kylin.streaming.jobs.StreamingJobListener; import org.apache.kylin.tool.daemon.KapGuardianHATask; +import org.apache.kylin.tool.garbage.CleanTaskExecutorService; +import org.apache.kylin.tool.garbage.PriorityExecutor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationContext; @@ -175,6 +177,11 @@ public class AppInitializer { log.info("KylinConfig in env, ID is {}", kylinConfig.hashCode()); log.info("KylinConfig in env, metadata is {}", kylinConfig.getMetadataUrl()); log.info("KylinConfig in env, working dir is {}", kylinConfig.getHdfsWorkingDirectory()); + + // Init global static instances + CleanTaskExecutorService.getInstance().bindWorkingPool( + () -> PriorityExecutor.newWorkingThreadPool( + "clean-storages-pool", kylinConfig.getStorageCleanTaskConcurrency())); } private void warmUpSystemCache() { diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java index a343a464c6..8046f0c62a 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java @@ -27,6 +27,7 @@ import java.time.ZoneOffset; import java.util.List; import java.util.Locale; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.servlet.ServletOutputStream; @@ -50,6 +51,7 @@ import org.apache.kylin.metadata.query.QueryHistoryInfo; import org.apache.kylin.metadata.query.QueryHistoryRequest; import org.apache.kylin.metadata.query.QueryHistorySql; import org.apache.kylin.metadata.query.util.QueryHistoryUtil; +import org.apache.kylin.tool.garbage.CleanTaskExecutorService; import org.apache.kylin.tool.garbage.StorageCleaner; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; @@ -69,8 +71,11 @@ public class AsyncTaskService implements AsyncTaskServiceSupporter { long startAt = System.currentTimeMillis(); try { - val storageCleaner = new StorageCleaner(); - storageCleaner.execute(); + CleanTaskExecutorService.getInstance() + .submit( + new StorageCleaner().withTag(StorageCleaner.CleanerTag.SERVICE), + KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(), TimeUnit.MILLISECONDS) + .get(); } catch (Exception e) { MetricsGroup.hostTagCounterInc(MetricsName.STORAGE_CLEAN_FAILED, MetricsCategory.GLOBAL, GLOBAL); throw e; diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index cbd82ed108..7f5f2fb9e3 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2576,6 +2576,14 @@ public abstract class KylinConfigBase implements Serializable { * 1024)).longValue(); } + public long getStorageCleanTaskTimeout() { + return TimeUtil.timeStringAs(getOptional("kylin.storage.clean-timeout", "4h"), TimeUnit.MILLISECONDS); + } + + public int getStorageCleanTaskConcurrency() { + return Integer.parseInt(getOptional("kylin.storage.clean-tasks-concurrency", "5")); + } + public long getSourceUsageQuota() { Double d = Double.parseDouble(getOptional("kylin.storage.source-quota-in-giga-bytes", "-1")); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java index 66f1d4aa22..2921098c70 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java @@ -51,11 +51,11 @@ public interface QueryHistoryDAO { String getQueryMetricMeasurement(); - void deleteQueryHistoriesIfMaxSizeReached(); + void deleteQueryHistoriesIfMaxSizeReached() throws InterruptedException; - void deleteQueryHistoriesIfRetainTimeReached(); + void deleteQueryHistoriesIfRetainTimeReached() throws InterruptedException; - void deleteOldestQueryHistoriesByProject(String project, int deleteCount); + void deleteOldestQueryHistoriesByProject(String project, int deleteCount) throws InterruptedException; long getQueryHistoriesSize(QueryHistoryRequest request, String project); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java index 3592dbec22..a6ed94675f 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java @@ -109,7 +109,7 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO { return jdbcQueryHisStore.queryByQueryId(queryId); } - public void deleteQueryHistoriesIfMaxSizeReached() { + public void deleteQueryHistoriesIfMaxSizeReached() throws InterruptedException { long maxSize = KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize(); long totalCount = jdbcQueryHisStore.getCountOnQueryHistory(); if (totalCount > maxSize) { @@ -117,14 +117,14 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO { } } - public void deleteQueryHistoriesIfRetainTimeReached() { + public void deleteQueryHistoriesIfRetainTimeReached() throws InterruptedException { long rangeOutCount = jdbcQueryHisStore.getCountOnQueryHistory(getRetainTime()); if (rangeOutCount > 0) { deleteQueryHistoryAndRealization((int) rangeOutCount); } } - public void deleteQueryHistoryAndRealization(int deleteCount) { + public void deleteQueryHistoryAndRealization(int deleteCount) throws InterruptedException { int singleLimit = KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize(); largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> { QueryHistory queryHistory = jdbcQueryHisStore.getOldestQueryHistory(currentCount); @@ -134,7 +134,7 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO { }, "Cleanup all query history"); } - public void deleteOldestQueryHistoriesByProject(String project, int deleteCount) { + public void deleteOldestQueryHistoriesByProject(String project, int deleteCount) throws InterruptedException { int singleLimit = KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize(); largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> { QueryHistory queryHistory = jdbcQueryHisStore.getOldestQueryHistory(project, currentCount); @@ -280,9 +280,12 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO { } public static void largeSplitToSmallTask(int totalCount, int singleSize, IntFunction<Integer> function, - String description) { + String description) throws InterruptedException { int retainCount = totalCount; while (retainCount > 0) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Interrupted during splitting a large query task!"); + } int currentCount = Math.min(retainCount, singleSize); int actualCount = function.apply(currentCount); if (currentCount != actualCount && logger.isWarnEnabled()) { diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java index 30cd2d37d0..2842fd111c 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java @@ -836,7 +836,7 @@ public class RDBMSQueryHistoryDaoTest extends NLocalFileMetadataTestCase { } @Test - public void testLargeSplitToSmallTask() { + public void testLargeSplitToSmallTask() throws InterruptedException { AtomicInteger executions = new AtomicInteger(0); AtomicInteger actualSize = new AtomicInteger(0); largeSplitToSmallTask(105, 10, currentCount -> { diff --git a/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java b/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java index 2e80515367..af489e9933 100644 --- a/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java +++ b/src/job-service/src/main/java/org/apache/kylin/rest/service/ScheduleService.java @@ -22,7 +22,6 @@ import static org.apache.kylin.common.constant.Constants.METADATA_FILE; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -55,7 +54,6 @@ import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.helper.MetadataToolHelper; import org.apache.kylin.helper.RoutineToolHelper; import org.apache.kylin.metadata.resourcegroup.KylinInstance; import org.apache.kylin.metadata.resourcegroup.RequestTypeEnum; @@ -107,7 +105,7 @@ public class ScheduleService { private String tmpMetadataBackupFilePath; private static final ThreadLocal<Future<?>> CURRENT_FUTURE = new ThreadLocal<>(); - private MetadataToolHelper metadataToolHelper = new MetadataToolHelper(); + private static final Map<Future<?>, Long> ASYNC_FUTURES = Maps.newConcurrentMap(); @Scheduled(cron = "${kylin.metadata.ops-cron:0 0 0 * * *}") @@ -126,7 +124,8 @@ public class ScheduleService { AtomicReference<Pair<String, String>> backupFolder = new AtomicReference<>(null); executeTask(() -> backupFolder.set(backupService.backupAll()), "MetadataBackup", startTime); executeMetadataBackupInTenantMode(kylinConfig, startTime, backupFolder); - executeTask(RoutineToolHelper::cleanQueryHistories, "QueryHistoriesCleanup", startTime); + executeTask(() -> RoutineToolHelper.cleanQueryHistoriesAsync(getRemainingTime(startTime), + TimeUnit.MILLISECONDS), "QueryHistoriesCleanup", startTime); executeTask(RoutineToolHelper::cleanStreamingStats, "StreamingStatsCleanup", startTime); executeTask(RoutineToolHelper::deleteRawRecItems, "RawRecItemsDeletion", startTime); executeTask(RoutineToolHelper::cleanGlobalSourceUsage, "SourceUsageCleanup", startTime); @@ -134,8 +133,7 @@ public class ScheduleService { } executeTask(() -> projectService.garbageCleanup(getRemainingTime(startTime)), "ProjectGarbageCleanup", startTime); - executeTask(() -> metadataToolHelper.cleanStorage(true, Collections.emptyList(), 0, 0), "HdfsCleanup", - startTime); + executeTask(RoutineToolHelper::cleanStorageForRoutine, "HdfsCleanup", startTime); log.info("Finish to work, cost {}ms", System.currentTimeMillis() - startTime); } } catch (InterruptedException e) { diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java index 4976131faa..0a45d90c11 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java @@ -79,7 +79,6 @@ import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.guava30.shaded.common.io.ByteSource; -import org.apache.kylin.helper.MetadataToolHelper; import org.apache.kylin.helper.RoutineToolHelper; import org.apache.kylin.metadata.cube.model.IndexEntity; import org.apache.kylin.metadata.cube.model.IndexPlan; @@ -101,7 +100,6 @@ import org.apache.kylin.metadata.model.schema.SchemaNodeType; import org.apache.kylin.metadata.model.schema.SchemaUtil; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.query.util.QueryHisStoreUtil; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore; import org.apache.kylin.metadata.recommendation.candidate.RawRecItem; @@ -122,6 +120,7 @@ import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.AclPermissionUtil; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.tool.garbage.CleanTaskExecutorService; import org.apache.kylin.tool.util.HashFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,8 +160,6 @@ public class MetaStoreService extends BasicService { @Autowired(required = false) private List<ModelChangeSupporter> modelChangeSupporters = Lists.newArrayList(); - MetadataToolHelper metadataToolHelper = new MetadataToolHelper(); - public List<ModelPreviewResponse> getPreviewModels(String project, List<String> ids) { aclEvaluate.checkProjectWritePermission(project); return modelService.getManager(NDataflowManager.class, project).listAllDataflows(true).stream() @@ -806,14 +803,15 @@ public class MetaStoreService extends BasicService { public void cleanupMeta(String project) { if (project.equals(UnitOfWork.GLOBAL_UNIT)) { RoutineToolHelper.cleanGlobalSourceUsage(); - QueryHisStoreUtil.cleanQueryHistory(); + RoutineToolHelper.cleanQueryHistoriesAsync().join(); } else { RoutineToolHelper.cleanMetaByProject(project); } } public void cleanupStorage(String[] projectsToClean, boolean cleanupStorage) { - metadataToolHelper.cleanStorage(cleanupStorage, Arrays.asList(projectsToClean), 0D, 0); + CleanTaskExecutorService.getInstance().cleanStorageForService(cleanupStorage, Arrays.asList(projectsToClean), + 0D, 0); } public void cleanupStorage(StorageCleanupRequest request, HttpServletRequest servletRequest) { diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreTenantServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreTenantServiceTest.java index 1c34c4b841..94ad1e8d3c 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreTenantServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreTenantServiceTest.java @@ -19,17 +19,18 @@ package org.apache.kylin.rest.service; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.response.RestResponse; import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.helper.MetadataToolHelper; import org.apache.kylin.metadata.resourcegroup.ResourceGroup; import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager; import org.apache.kylin.metadata.streaming.ReflectionUtils; import org.apache.kylin.rest.request.StorageCleanupRequest; +import org.apache.kylin.tool.garbage.CleanTaskExecutorService; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -63,8 +64,6 @@ public class MetaStoreTenantServiceTest { private ResourceGroupManager rgManager = Mockito.mock(ResourceGroupManager.class); @Mock private KylinConfig kylinConfig = Mockito.mock(KylinConfig.class); - @Mock - private MetadataToolHelper metadataToolHelper = Mockito.spy(MetadataToolHelper.class); @Test public void cleanupStorageMultiTenantMode() throws IOException { @@ -76,7 +75,6 @@ public class MetaStoreTenantServiceTest { Mockito.when(kylinConfig.isKylinMultiTenantEnabled()).thenReturn(false); ReflectionUtils.setField(metaStoreService, "routeService", routeService); - ReflectionUtils.setField(metaStoreService, "metadataToolHelper", metadataToolHelper); ReflectionUtils.setField(routeService, "restTemplate", restTemplate); val restResult = JsonUtil.writeValueAsBytes(RestResponse.ok(true)); @@ -89,8 +87,8 @@ public class MetaStoreTenantServiceTest { Mockito.when(rgManager.getResourceGroup()) .thenReturn(JsonUtil.readValue(resourceGroupJson, new TypeReference<ResourceGroup>() { })); - Mockito.doNothing().when(metadataToolHelper).cleanStorage(ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), - ArgumentMatchers.anyDouble(), ArgumentMatchers.anyInt()); + CleanTaskExecutorService.getInstance().cleanStorageForRoutine(ArgumentMatchers.anyBoolean(), Collections.emptyList(), + ArgumentMatchers.anyDouble(), ArgumentMatchers.anyInt()); val storageCleanupRequest = new StorageCleanupRequest(); storageCleanupRequest.setCleanupStorage(false); diff --git a/src/server/src/main/java/org/apache/kylin/rest/InitConfiguration.java b/src/server/src/main/java/org/apache/kylin/rest/InitConfiguration.java index 99e675154c..aeaa76ad5c 100644 --- a/src/server/src/main/java/org/apache/kylin/rest/InitConfiguration.java +++ b/src/server/src/main/java/org/apache/kylin/rest/InitConfiguration.java @@ -31,7 +31,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; - import lombok.val; import lombok.extern.slf4j.Slf4j; diff --git a/src/server/src/test/java/org/apache/kylin/rest/HAConfigurationTest.java b/src/server/src/test/java/org/apache/kylin/rest/HAConfigurationTest.java index d0e35ad355..3b25cd6717 100644 --- a/src/server/src/test/java/org/apache/kylin/rest/HAConfigurationTest.java +++ b/src/server/src/test/java/org/apache/kylin/rest/HAConfigurationTest.java @@ -23,9 +23,9 @@ import javax.sql.DataSource; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; -import org.apache.kylin.helper.MetadataToolHelper; import org.apache.kylin.junit.annotation.MetadataInfo; import org.apache.kylin.junit.annotation.OverwriteProp; +import org.apache.kylin.tool.util.MetadataUtil; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -48,13 +48,11 @@ class HAConfigurationTest { @Mock SessionProperties sessionProperties; - MetadataToolHelper metadataToolHelper = new MetadataToolHelper(); - DataSource dataSource; @BeforeEach public void setup() throws Exception { - dataSource = Mockito.spy(metadataToolHelper.getDataSource(getTestConfig())); + dataSource = Mockito.spy(MetadataUtil.getDataSource(getTestConfig())); ReflectionTestUtils.setField(configuration, "dataSource", dataSource); } diff --git a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java index 8c9b06d441..7d796992b4 100644 --- a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java +++ b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java @@ -18,8 +18,12 @@ package org.apache.kylin.helper; -import lombok.extern.slf4j.Slf4j; -import lombok.val; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.SetThreadName; @@ -30,12 +34,15 @@ import org.apache.kylin.metadata.query.util.QueryHisStoreUtil; import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore; import org.apache.kylin.metadata.streaming.util.StreamingJobRecordStoreUtil; import org.apache.kylin.metadata.streaming.util.StreamingJobStatsStoreUtil; +import org.apache.kylin.tool.garbage.AbstractComparableCleanTask; +import org.apache.kylin.tool.garbage.CleanTaskExecutorService; import org.apache.kylin.tool.garbage.GarbageCleaner; +import org.apache.kylin.tool.garbage.PriorityExecutor; import org.apache.kylin.tool.garbage.SourceUsageCleaner; import org.apache.kylin.tool.garbage.StorageCleaner; -import java.util.List; -import java.util.stream.Collectors; +import lombok.val; +import lombok.extern.slf4j.Slf4j; /* * this class is only for removing dependency of kylin-tool module, and should be refactor later @@ -46,8 +53,42 @@ public class RoutineToolHelper { private RoutineToolHelper() { } - public static void cleanQueryHistories() { - QueryHisStoreUtil.cleanQueryHistory(); + public static CompletableFuture<Void> cleanQueryHistoriesAsync(long timeout, TimeUnit timeUnit) { + tryInitCleanTaskExecutorService(); + return CleanTaskExecutorService.getInstance().submit(new AbstractComparableCleanTask() { + @Override + public String getName() { + return "cleanQueryHistoriesForAllProjects"; + } + + @Override + protected void doRun() { + QueryHisStoreUtil.cleanQueryHistory(); + } + + @Override + public StorageCleaner.CleanerTag getCleanerTag() { + return StorageCleaner.CleanerTag.ROUTINE; + } + }, timeout, timeUnit); + } + + public static CompletableFuture<Void> cleanQueryHistoriesAsync() { + return cleanQueryHistoriesAsync(KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(), + TimeUnit.MILLISECONDS); + } + + public static void cleanStorageForRoutine() { + tryInitCleanTaskExecutorService(); + CleanTaskExecutorService.getInstance().cleanStorageForRoutine(true, Collections.emptyList(), 0, 0); + } + + private static void tryInitCleanTaskExecutorService() { + CleanTaskExecutorService.getInstance().bindWorkingPool(() -> { + log.warn("Init the cleaning task thread pool from thread {}.", Thread.currentThread().getName()); + return PriorityExecutor.newWorkingThreadPool("routine-tool-helper-pool", + KylinConfig.getInstanceFromEnv().getStorageCleanTaskConcurrency()); + }); } public static void cleanStreamingStats() { @@ -100,7 +141,7 @@ public class RoutineToolHelper { for (String projName : projectsToCleanup) { cleanMetaByProject(projName); } - cleanQueryHistories(); + cleanQueryHistoriesAsync(); cleanStreamingStats(); deleteRawRecItems(); System.out.println("Metadata cleanup finished"); diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/AbstractComparableCleanTask.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/AbstractComparableCleanTask.java new file mode 100644 index 0000000000..35bc86c16f --- /dev/null +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/AbstractComparableCleanTask.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.garbage; + +import javax.validation.constraints.NotNull; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +/** + * The priority of the task to be executed is: + * ROUTINE < CLI < API + */ +public abstract class AbstractComparableCleanTask implements Runnable, Comparable<AbstractComparableCleanTask> { + private final CompletableFuture<Void> watcher = new CompletableFuture<>(); + + public String getName() { + return getClass().getName(); + } + + public String getBrief() { + return String.format("Task-%s: {%s}", getName(), details()); + } + + protected String details() { + return String.format("tag: %s, class: %s", getCleanerTag(), getClass().getName()); + } + + public CompletableFuture<Void> getWatcher() { + return watcher; + } + + public abstract StorageCleaner.CleanerTag getCleanerTag(); + + @Override + public void run() { + try { + doRun(); + watcher.complete(null); + } catch (Throwable t) { + watcher.completeExceptionally(t); + } + } + + protected void doRun() { + } + + /** + * Note: this class has a natural ordering that is inconsistent with equals. + */ + @Override + public int compareTo(@NotNull AbstractComparableCleanTask t) { + if (this == t || getCleanerTag() == t.getCleanerTag()) { + return 0; + } else if (getCleanerTag().ordinal() > t.getCleanerTag().ordinal()) { + return -1; + } else { + return 1; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AbstractComparableCleanTask that = (AbstractComparableCleanTask) o; + return Objects.equals(watcher, that.watcher) && Objects.equals(getCleanerTag(), that.getCleanerTag()); + } + + @Override + public int hashCode() { + return Objects.hash(watcher, getCleanerTag()); + } +} diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java new file mode 100644 index 0000000000..da12dd15bb --- /dev/null +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/CleanTaskExecutorService.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.garbage; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import javax.annotation.concurrent.ThreadSafe; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinRuntimeException; +import org.apache.kylin.common.util.DaemonThreadFactory; +import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ThreadSafe +public class CleanTaskExecutorService implements Closeable { + + private static final Logger LOGGER = LoggerFactory.getLogger(CleanTaskExecutorService.class); + + private static final CleanTaskExecutorService INSTANCE = new CleanTaskExecutorService(); + + private static final int SHUTDOWN_TIMEOUT_SECONDS = 2; + + private final ScheduledExecutorService timeoutCheckerPool = Executors + .newSingleThreadScheduledExecutor(new DaemonThreadFactory("storage-cleaner-timeout-checker")); + + private final AtomicBoolean bound = new AtomicBoolean(false); + // Should move it outside to gain a more sensible composition code. + private ExecutorService pool; + + static { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + LOGGER.info("[{}] Shutdown when JVM is to shutdown.", CleanTaskExecutorService.class.getName()); + INSTANCE.close(); + LOGGER.info("[{}] Shutdown successfully.", CleanTaskExecutorService.class.getName()); + } catch (Exception e) { + LOGGER.error("[{}] Occurring exceptions to shutdown!", CleanTaskExecutorService.class.getName()); + } + })); + } + + private CleanTaskExecutorService() { + } + + public static CleanTaskExecutorService getInstance() { + return INSTANCE; + } + + /** + * True if the instance binds nothing, else false. + */ + public boolean bindWorkingPool(Supplier<ExecutorService> workingPoolProvider) { + Preconditions.checkArgument(workingPoolProvider != null, "workingPool is null"); + if (bound.compareAndSet(false, true)) { + pool = workingPoolProvider.get(); + return true; + } + return false; + } + + public CompletableFuture<Void> cleanStorageForService(boolean storageCleanup, List<String> projects, + double requestFSRate, int retryTimes) { + return cleanStorageAsync(RandomUtil.randomUUIDStr(), StorageCleaner.CleanerTag.SERVICE, storageCleanup, + projects, requestFSRate, retryTimes); + } + + public void cleanStorageForRoutine(boolean storageCleanup, List<String> projects, double requestFSRate, + int retryTimes) { + cleanStorageSync(StorageCleaner.CleanerTag.ROUTINE, storageCleanup, projects, requestFSRate, retryTimes); + } + + private void cleanStorageSync(StorageCleaner.CleanerTag tag, boolean storageCleanup, List<String> projects, + double requestFSRate, int retryTimes) { + cleanStorageAsync(RandomUtil.randomUUIDStr(), tag, storageCleanup, projects, requestFSRate, retryTimes).join(); + } + + private CompletableFuture<Void> cleanStorageAsync(String traceId, StorageCleaner.CleanerTag tag, + boolean storageCleanup, List<String> projects, double requestFSRate, int retryTimes) { + StorageCleaner cleaner = makeCleaner(traceId, tag, projects, requestFSRate, retryTimes, storageCleanup); + if (cleaner != null) { + return cleanStorageAsync(cleaner); + } + return CompletableFuture.completedFuture(null); + } + + private StorageCleaner makeCleaner(String traceId, StorageCleaner.CleanerTag tag, List<String> projects, + double requestFSRate, int retryTimes, boolean storageCleanup) { + Preconditions.checkArgument(projects != null, "projects is null"); + StorageCleaner storageCleaner = null; + try { + storageCleaner = new StorageCleaner(storageCleanup, projects, requestFSRate, retryTimes); + storageCleaner.withTag(tag); + storageCleaner.withTraceId(traceId); + } catch (Exception e) { + LOGGER.error( + "Failed to create storage cleaner for projects: {}. TraceId: {}", projects, traceId, + e); + } + return storageCleaner; + } + + private CompletableFuture<Void> cleanStorageAsync(StorageCleaner cleaner) { + LOGGER.info("To submit cleaning hdfs files task. TraceId: {}.", cleaner.getTraceId()); + CompletableFuture<Void> f = submit(cleaner, KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(), + TimeUnit.MILLISECONDS); + f.whenComplete((v, t) -> { + if (t == null) { + LOGGER.info("HDFS files cleaning task has successfully completed. TraceId: {}", cleaner.getTraceId()); + return; + } + LOGGER.error(StorageCleaner.ANSI_RED + + "cleanup HDFS failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" + + StorageCleaner.ANSI_RESET + ". TraceId: " + cleaner.getTraceId(), t); + }); + return f; + } + + public CompletableFuture<Void> submit(StorageCleaner cleaner, long timeout, TimeUnit timeUnit) { + return submit(new AbstractComparableCleanTask() { + @Override + public String getName() { + return cleaner.getTraceId(); + } + + @Override + protected String details() { + return String.format("traceId: %s, tag: %s, projects: %s", cleaner.getTraceId(), cleaner.getTag(), + cleaner.getProjectNames().toString()); + } + + @Override + public StorageCleaner.CleanerTag getCleanerTag() { + return cleaner.getTag(); + } + + @Override + protected void doRun() { + try { + cleaner.execute(); + } catch (Exception e) { + throw new KylinRuntimeException(e); + } + } + }, timeout, timeUnit); + } + + public CompletableFuture<Void> submit(AbstractComparableCleanTask task, long timeout, TimeUnit timeUnit) { + if (pool == null) { + throw new RejectedExecutionException("The working pool has not been initialized, please bind it at first!"); + } + + LOGGER.debug("To submit storage cleaning task {}.", task.getBrief()); + CompletableFuture<Void> resultFuture = task.getWatcher(); + AtomicReference<Future<?>> cancelFuture = new AtomicReference<>(null); + try { + if (!resultFuture.isDone()) { + Future<?> workingFuture = pool.submit(task); + LOGGER.info("Submitted storage cleaning task {}.", task.getBrief()); + + cancelFuture.set(timeoutCheckerPool.schedule(() -> { + if (!workingFuture.isDone() && !workingFuture.cancel(true)) { + LOGGER.warn("You may have leaked threads, failed to cancel task {}!", task.getBrief()); + } + resultFuture.completeExceptionally(new TimeoutException("Timeout for cleaning!")); + }, timeout, timeUnit)); + } + } catch (RejectedExecutionException re) { + resultFuture.completeExceptionally(re); + } + + resultFuture.whenComplete((v, t) -> { + if (cancelFuture.get() != null) { + cancelFuture.get().cancel(true); + } + if (t == null) { + LOGGER.info("Cleaning task {} successfully completed!", task.getBrief()); + } else { + throw new CompletionException( + "To cancel task watcher because of " + "encountering exceptions for task " + task.getBrief(), + t); + } + }); + + return resultFuture; + } + + @Override + public void close() throws IOException { + if (pool != null) { + try { + pool.shutdown(); + if (!pool.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + LOGGER.warn("The working thread pool couldn't shutdown before timeout {}s!", + SHUTDOWN_TIMEOUT_SECONDS); + pool.shutdownNow(); + } + } catch (InterruptedException ie) { + pool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/PriorityExecutor.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/PriorityExecutor.java new file mode 100644 index 0000000000..8ec0d3d51b --- /dev/null +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/PriorityExecutor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.garbage; + +import lombok.EqualsAndHashCode; +import org.apache.kylin.common.util.DaemonThreadFactory; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.FutureTask; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class PriorityExecutor extends ThreadPoolExecutor { + + @EqualsAndHashCode + public static class ComparableFutureTask<T extends Comparable<?>, R> extends FutureTask<R> + implements Comparable<ComparableFutureTask<T, R>> { + + private final Comparable<T> task; + + public ComparableFutureTask(Runnable runnable, R result) { + super(runnable, result); + Preconditions.checkArgument(runnable instanceof Comparable, "runnable should also be comparable!"); + this.task = (Comparable<T>) runnable; + } + + /** + * Note: this class has a natural ordering that is inconsistent with equals. + */ + @Override + public int compareTo(ComparableFutureTask<T, R> o) { + return task.compareTo((T) o.task); + } + } + + private PriorityExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public static PriorityExecutor newWorkingThreadPool(String name, int maxPoolSize) { + return new PriorityExecutor(maxPoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), + new DaemonThreadFactory(name)); + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { + return new ComparableFutureTask<>(runnable, null); + } + +} diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java index 249814b9f4..30d052cf10 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleaner.java @@ -98,6 +98,8 @@ public class StorageCleaner { private final boolean cleanup; private final boolean timeMachineEnabled; + + @Getter private final Collection<String> projectNames; private final KylinConfig kylinConfig; @@ -108,6 +110,16 @@ public class StorageCleaner { private final Map<String, String> trashRecord; private final ResourceStore resourceStore; + public enum CleanerTag { + ROUTINE, CLI, SERVICE + } + + @Getter + private CleanerTag tag = CleanerTag.ROUTINE; + + @Getter + private String traceId = ""; + public StorageCleaner() throws Exception { this(true); } @@ -138,6 +150,16 @@ public class StorageCleaner { } } + public StorageCleaner withTag(CleanerTag tag) { + this.tag = tag; + return this; + } + + public StorageCleaner withTraceId(String id) { + this.traceId = id; + return this; + } + @Getter private Set<StorageItem> outdatedItems = Sets.newHashSet(); @@ -171,7 +193,11 @@ public class StorageCleaner { log.info("all file systems are {}", allFileSystems); for (StorageItem allFileSystem : allFileSystems) { log.debug("start to collect HDFS from {}", allFileSystem.getPath()); - collectFromHDFS(allFileSystem); + try { + collectFromHDFS(allFileSystem); + } catch (FileNotFoundException e) { + log.warn("No garbage files collected from {}", allFileSystem.getPath()); + } log.debug("folder {} is collected,detailed -> {}", allFileSystem.getPath(), allFileSystems); } UnitOfWork.doInTransactionWithRetry(() -> { diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleanerCLI.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleanerCLI.java index 0b2b7bc2b1..865f5c570c 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleanerCLI.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/StorageCleanerCLI.java @@ -17,19 +17,29 @@ */ package org.apache.kylin.tool.garbage; +import java.util.concurrent.TimeUnit; + +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Unsafe; -import lombok.val; import lombok.extern.slf4j.Slf4j; +import org.apache.kylin.guava30.shaded.common.util.concurrent.MoreExecutors; @Slf4j public class StorageCleanerCLI { public static void main(String[] args) { + System.out.println("start to cleanup HDFS."); try { - val storageCleaner = new StorageCleaner(); - log.info("Start cleanup HDFS."); - storageCleaner.execute(); + log.info("Init cleaning task thread pool as the direct executor service."); + CleanTaskExecutorService.getInstance().bindWorkingPool(MoreExecutors::newDirectExecutorService); + + StorageCleaner cleaner = new StorageCleaner().withTag(StorageCleaner.CleanerTag.CLI); + CleanTaskExecutorService.getInstance() + .submit( + cleaner, + KylinConfig.getInstanceFromEnv().getStorageCleanTaskTimeout(), TimeUnit.MILLISECONDS) + .get(); } catch (Exception e) { log.warn("cleanup HDFS failed.", e); } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java b/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java index a623ef8046..a9ef41094c 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/routine/FastRoutineTool.java @@ -38,6 +38,7 @@ public class FastRoutineTool extends RoutineTool { return; } initOptionValues(optionsHelper); + List<String> projectsToCleanup = getProjectsToCleanup(); try { if (isMetadataCleanup()) { diff --git a/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java b/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java index a1f0500f8e..0de65f1780 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/routine/RoutineTool.java @@ -27,11 +27,14 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ExecutableApplication; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.guava30.shaded.common.util.concurrent.MoreExecutors; import org.apache.kylin.helper.MetadataToolHelper; import org.apache.kylin.helper.RoutineToolHelper; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.tool.MaintainModeTool; +import org.apache.kylin.tool.garbage.CleanTaskExecutorService; +import org.apache.kylin.tool.garbage.StorageCleaner; import org.apache.kylin.tool.util.ToolMainWrapper; import org.apache.kylin.metadata.epoch.EpochManager; @@ -56,6 +59,11 @@ public class RoutineTool extends ExecutableApplication { private MetadataToolHelper helper = new MetadataToolHelper(); + static { + log.info("Init cleaning task thread pool as the direct executor service."); + CleanTaskExecutorService.getInstance().bindWorkingPool(MoreExecutors::newDirectExecutorService); + } + public static void main(String[] args) { ToolMainWrapper.wrap(args, () -> { RoutineTool tool = new RoutineTool(); @@ -69,7 +77,7 @@ public class RoutineTool extends ExecutableApplication { } public static void cleanQueryHistories() { - RoutineToolHelper.cleanQueryHistories(); + RoutineToolHelper.cleanQueryHistoriesAsync(); } public static void cleanStreamingStats() { @@ -105,6 +113,7 @@ public class RoutineTool extends ExecutableApplication { return; } initOptionValues(optionsHelper); + System.out.println("Start to cleanup metadata"); List<String> projectsToCleanup = getProjectsToCleanup(); MaintainModeTool maintainModeTool = new MaintainModeTool("routine tool"); @@ -113,6 +122,7 @@ public class RoutineTool extends ExecutableApplication { if (EpochManager.getInstance().isMaintenanceMode()) { Runtime.getRuntime().addShutdownHook(new Thread(maintainModeTool::releaseEpochs)); } + doCleanup(projectsToCleanup); } @@ -124,11 +134,21 @@ public class RoutineTool extends ExecutableApplication { cleanStorage(); } catch (Exception e) { log.error("Failed to execute routintool", e); + throw e; } } public void cleanStorage() { - helper.cleanStorage(storageCleanup, Arrays.asList(projects), requestFSRate, retryTimes); + try { + System.out.println("Start to cleanup HDFS"); + CleanTaskExecutorService.getInstance().cleanStorageForRoutine( + storageCleanup, Arrays.asList(projects), requestFSRate, retryTimes); + System.out.println("cleanup HDFS finished"); + } catch (Exception e) { + System.out.println(StorageCleaner.ANSI_RED + + "cleanup HDFS failed. Detailed Message is at ${KYLIN_HOME}/logs/shell.stderr" + + StorageCleaner.ANSI_RESET); + } } protected boolean printUsage(OptionsHelper optionsHelper) { diff --git a/src/tool/src/main/java/org/apache/kylin/tool/util/MetadataUtil.java b/src/tool/src/main/java/org/apache/kylin/tool/util/MetadataUtil.java index ae8cb074a1..4a58bc2903 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/util/MetadataUtil.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/util/MetadataUtil.java @@ -36,18 +36,17 @@ import org.apache.commons.dbcp2.BasicDataSource; import org.apache.ibatis.jdbc.ScriptRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.logging.LogOutputStream; +import org.apache.kylin.common.persistence.metadata.JdbcDataSource; import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; -import org.apache.kylin.helper.MetadataToolHelper; - import org.apache.kylin.guava30.shaded.common.collect.Lists; +import lombok.val; import lombok.extern.slf4j.Slf4j; @Slf4j public class MetadataUtil { private static final Charset DEFAULT_CHARSET = Charset.defaultCharset(); - private static MetadataToolHelper metadataToolHelper = new MetadataToolHelper(); private MetadataUtil() { } @@ -60,7 +59,9 @@ public class MetadataUtil { } public static DataSource getDataSource(KylinConfig kylinConfig) throws Exception { - return metadataToolHelper.getDataSource(kylinConfig); + val url = kylinConfig.getMetadataUrl(); + val props = JdbcUtil.datasourceParameters(url); + return JdbcDataSource.getDataSource(props); } public static void createTableIfNotExist(BasicDataSource dataSource, String tableName, String tableSql, diff --git a/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java b/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java new file mode 100644 index 0000000000..1b0e368eca --- /dev/null +++ b/src/tool/src/test/java/org/apache/kylin/tool/garbage/CleanTaskExecutorServiceTests.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.garbage; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class CleanTaskExecutorServiceTests extends NLocalFileMetadataTestCase { + + CleanTaskExecutorService mockedHelper = getTempInstance(); + + @BeforeEach + public void setup() { + createTestMetadata(); + } + + static class TestStorageCleaner extends StorageCleaner { + + private long timeout = 1000; + + private Runnable runnable; + + public TestStorageCleaner(Runnable runnable) throws Exception { + this.runnable = runnable; + } + + public TestStorageCleaner(long timeout) throws Exception { + this.timeout = timeout; + } + + @Override + public void execute() throws Exception { + if (runnable != null) { + runnable.run(); + } + try { + await().atMost(timeout, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(false)); + } catch (ConditionTimeoutException cte) { + // Ignore + } + } + } + + static class TestComparableCleanTask extends AbstractComparableCleanTask { + private final StorageCleaner.CleanerTag tag; + public TestComparableCleanTask(StorageCleaner.CleanerTag tag) { + this.tag = tag; + } + + @Override + public StorageCleaner.CleanerTag getCleanerTag() { + return tag; + } + } + + @Test + @Order(1) + void testCreateCleanTask() { + AbstractComparableCleanTask t1 = new TestComparableCleanTask(StorageCleaner.CleanerTag.ROUTINE); + assertTrue(t1.getName().startsWith(CleanTaskExecutorServiceTests.class.getName())); + assertEquals(String.format("Task-%s: {tag: %s, class: %s}", t1.getName(), StorageCleaner.CleanerTag.ROUTINE, + t1.getClass().getName()), t1.getBrief()); + + AbstractComparableCleanTask t2 = new TestComparableCleanTask(StorageCleaner.CleanerTag.SERVICE); + assertEquals(TestComparableCleanTask.class.getName(), t2.getName()); + assertEquals(String.format("Task-%s: {tag: %s, class: %s}", t2.getName(), StorageCleaner.CleanerTag.SERVICE, + t2.getClass().getName()), t2.getBrief()); + + // t2 has a higher priority + assertTrue(t1.compareTo(t2) > 0); + + AbstractComparableCleanTask t3 = new TestComparableCleanTask(StorageCleaner.CleanerTag.SERVICE); + AbstractComparableCleanTask t4 = t3; + + assertNotEquals(null, t1); + assertEquals(0, t2.compareTo(t3)); + assertEquals(0, t3.compareTo(t4)); + assertNotEquals(t2, t3); + assertEquals(t3, t4); + } + + @Test + @Order(1) + void testComparableFutureTask() { + AbstractComparableCleanTask ct1 = new AbstractComparableCleanTask() { + @Override + public StorageCleaner.CleanerTag getCleanerTag() { + return StorageCleaner.CleanerTag.ROUTINE; + } + }; + + PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, Boolean> t1 = + new PriorityExecutor.ComparableFutureTask<>(ct1, true); + PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, Boolean> t11 = + new PriorityExecutor.ComparableFutureTask<>(ct1, true); + + assertNotEquals(null, t1); + + PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, Boolean> t1Ref = t1; + assertEquals(t1, t1Ref); + assertEquals(t1, t11); + + AbstractComparableCleanTask ct2 = new AbstractComparableCleanTask() { + @Override + public StorageCleaner.CleanerTag getCleanerTag() { + return StorageCleaner.CleanerTag.ROUTINE; + } + }; + + PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, Boolean> t21 = + new PriorityExecutor.ComparableFutureTask<>(ct2, true); + + assertNotEquals(t11, t21); + + assertEquals(0, t21.compareTo(t11)); + assertNotEquals(t11, t21); + + AbstractComparableCleanTask ct3 = new AbstractComparableCleanTask() { + @Override + public StorageCleaner.CleanerTag getCleanerTag() { + return StorageCleaner.CleanerTag.SERVICE; + } + }; + + PriorityExecutor.ComparableFutureTask<AbstractComparableCleanTask, Boolean> t31 = + new PriorityExecutor.ComparableFutureTask<>(ct3, true); + + assertTrue(t31.compareTo(t21) < 0, "t31 should be the predecessor task"); + } + + @Test + @Order(1) + void testRunWithoutThreadPool() { + assertThrows(RejectedExecutionException.class, + () -> mockedHelper.submit(new TestStorageCleaner(1), 1, TimeUnit.MILLISECONDS)); + } + + @Test + @Order(1) + void testBindThreadPool() throws NoSuchFieldException, IllegalAccessException { + resetInnerPool(); + assertTrue(mockedHelper.bindWorkingPool(this::createPriorityPool)); + assertFalse(mockedHelper.bindWorkingPool(this::createPriorityPool)); + } + + @Test + @Order(1) + void testCleanTaskTimeout() throws Exception { + StorageCleaner routineSc = new TestStorageCleaner(5000).withTag(StorageCleaner.CleanerTag.ROUTINE) + .withTraceId("1"); + + overwriteSystemProp("kylin.storage.clean-tasks-concurrency", "1"); + + mockedHelper.bindWorkingPool(this::createPriorityPool); + CompletableFuture<?> f = mockedHelper.submit(routineSc, 1, TimeUnit.SECONDS); + CompletionException ce = assertThrows(CompletionException.class, f::join); + + assertTrue(ce.getCause() instanceof TimeoutException); + } + + @Test + @Order(10) + void testCleanTaskRejected() throws Exception { + StorageCleaner routineSc = new TestStorageCleaner(10000).withTag(StorageCleaner.CleanerTag.ROUTINE) + .withTraceId("1"); + + overwriteSystemProp("kylin.storage.clean-tasks-concurrency", "1"); + + mockedHelper.bindWorkingPool(this::createPriorityPool); + mockedHelper.close(); + + CompletableFuture<?> f = mockedHelper.submit(routineSc, 1, TimeUnit.SECONDS); + CompletionException ce = assertThrows(CompletionException.class, f::join); + + assertTrue(ce.getCause() instanceof RejectedExecutionException); + assertTrue(f.isDone()); + resetInnerPool(); + } + + @Test + @Order(1) + void testCleanTasksCompletedInOrderByTag() throws Exception { + StorageCleaner routineSc = new TestStorageCleaner(1000).withTag(StorageCleaner.CleanerTag.ROUTINE) + .withTraceId("1"); + StorageCleaner cliSc = new TestStorageCleaner(200).withTag(StorageCleaner.CleanerTag.CLI).withTraceId("2"); + StorageCleaner serviceSc = new TestStorageCleaner(200).withTag(StorageCleaner.CleanerTag.SERVICE) + .withTraceId("3"); + + overwriteSystemProp("kylin.storage.clean-tasks-concurrency", "1"); + + resetInnerPool(); + mockedHelper.bindWorkingPool(this::createPriorityPool); + + List<String> completedTasks = new ArrayList<>(); + List<CompletableFuture<?>> tasks = new ArrayList<>(); + + StorageCleaner initSc = new TestStorageCleaner(() -> { + tasks.add(mockedHelper.submit(routineSc, 6, TimeUnit.SECONDS) + .whenComplete((v, t) -> completedTasks.add(routineSc.getTraceId()))); + tasks.add(mockedHelper.submit(cliSc, 6, TimeUnit.SECONDS) + .whenComplete((v, t) -> completedTasks.add(cliSc.getTraceId()))); + tasks.add(mockedHelper.submit(serviceSc, 6, TimeUnit.SECONDS) + .whenComplete((v, t) -> completedTasks.add(serviceSc.getTraceId()))); + }); + + CompletableFuture<?> initFuture = mockedHelper.submit(initSc, 6, TimeUnit.SECONDS); + CompletableFuture.allOf(initFuture).join(); + CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join(); + + // initSc is the first routine task in to the working queue, and it should be the first to be executed. + // During cf1 executing, cf1/cf2/cf3 will be added into the working queue and sorted by cleaner tag, + // of which permutation should be [cf3, cf2, cf1], so after initSc completed, the other tasks will + // finally complete in sequence. + assertEquals(ImmutableList.of("3", "2", "1"), completedTasks); + } + + private ExecutorService createPriorityPool() { + return PriorityExecutor.newWorkingThreadPool("test-pool", getTestConfig().getStorageCleanTaskConcurrency()); + } + + @Test + @Order(11) + void testDoubleRetriesToShutdown() throws NoSuchFieldException, IllegalAccessException { + overwriteSystemProp("kylin.storage.clean-tasks-concurrency", "1"); + mockedHelper.bindWorkingPool(this::createPriorityPool); + + AtomicBoolean shutdownTriggered = new AtomicBoolean(false); + CompletableFuture<?> future = mockedHelper.submit(new AbstractComparableCleanTask() { + @Override + protected void doRun() { + while (!Thread.currentThread().isInterrupted()) { + try { + await().atMost(10, TimeUnit.SECONDS).untilTrue(new AtomicBoolean(false)); + } catch (Exception e) { + shutdownTriggered.set(true); + Thread.currentThread().interrupt(); + } + } + } + + @Override + public StorageCleaner.CleanerTag getCleanerTag() { + return StorageCleaner.CleanerTag.SERVICE; + } + }, 10, TimeUnit.SECONDS); + + new Thread(() -> { + try { + await().atMost(1, TimeUnit.SECONDS).untilTrue(new AtomicBoolean(false)); + } catch (Exception ignored) { + try { + mockedHelper.close(); + } catch (IOException ignored1) { + } + } + }).start(); + future.join(); + assertTrue(future.isDone() && shutdownTriggered.get()); + } + + private CleanTaskExecutorService getTempInstance() { + try { + Constructor<CleanTaskExecutorService> constructor = CleanTaskExecutorService.class.getDeclaredConstructor(); + constructor.setAccessible(true); + return constructor.newInstance(); + } catch (Exception ignored) { + return null; + } + } + + private void resetInnerPool() throws NoSuchFieldException, IllegalAccessException { + Field poolField = CleanTaskExecutorService.class.getDeclaredField("pool"); + poolField.setAccessible(true); + poolField.set(mockedHelper, null); + + Field boundField = CleanTaskExecutorService.class.getDeclaredField("bound"); + boundField.setAccessible(true); + boundField.set(mockedHelper, new AtomicBoolean(false)); + } + +}