This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 132f4367b1220dac0206e0054f4c4bf9c52dbca4 Author: Zhiting Guo <35057824+fre...@users.noreply.github.com> AuthorDate: Mon Oct 30 15:01:10 2023 +0800 KYLIN-5862 Fix jobScheduler when resourceGroup is enabled Co-authored-by: Xuecheng Shan <xuecheng.s...@kyligence.io> Co-authored-by: Zhiting Guo <zhiting....@kyligence.io> --- .../kap/secondstorage/SecondStorageUtil.java | 2 +- .../apache/kylin/rest/service/ProjectService.java | 9 +- .../config/initialize/MetricsRegistryTest.java | 2 +- .../org/apache/kylin/common/KylinConfigBase.java | 10 +-- .../src/main/resources/metadata-jdbc-h2.properties | 3 +- .../apache/kylin/job/config/JobMybatisConfig.java | 25 ++++-- .../kylin/job/config/JobTableInterceptor.java | 29 +++---- .../java/org/apache/kylin/job/dao/JobInfoDao.java | 16 +++- .../java/org/apache/kylin/job/domain/JobLock.java | 5 +- .../org/apache/kylin/job/mapper/JobLockMapper.java | 3 +- .../kylin/job/scheduler/JdbcJobScheduler.java | 16 ++-- .../org/apache/kylin/job/util/JobContextUtil.java | 89 +++++-------------- .../org/apache/kylin/job/util/JobInfoUtil.java | 6 +- .../resources/mybatis-mapper/JobLockMapper.xml | 20 ++++- .../main/resources/script/schema_job_info_h2.sql | 4 +- .../resources/script/schema_job_info_mysql.sql | 4 +- .../script/schema_job_info_postgresql.sql | 2 +- .../main/resources/script/schema_job_lock_h2.sql | 3 +- .../resources/script/schema_job_lock_mysql.sql | 3 +- .../script/schema_job_lock_postgresql.sql | 1 + .../kylin/job/execution/JobMailUtilTest.java | 2 +- .../job/impl/threadpool/BaseSchedulerTest.java | 2 +- .../kylin/job/scheduler/JdbcJobSchedulerTest.java | 14 +-- .../org/apache/kylin/mapper/JobLockMapperTest.java | 4 +- .../apache/kylin/metadata/epoch/EpochManager.java | 19 ++++- .../metadata/favorite/FavoriteRuleManager.java | 4 + .../kylin/metadata/epoch/EpochManagerTest.java | 41 +++++++++ .../kylin/rest/controller/JobControllerTest.java | 2 +- .../org/apache/kylin/job/util/JobFilterUtil.java | 5 +- .../kylin/job/service/JobInfoServiceTest.java | 2 +- .../rest/service/FusionModelServiceBuildTest.java | 2 +- .../apache/kylin/rest/service/JobErrorTest.java | 2 +- .../kylin/rest/service/JobResourceServiceTest.java | 2 +- .../kylin/rest/service/ModelServiceBuildTest.java | 2 +- .../kylin/rest/service/SnapshotServiceTest.java | 2 +- .../org/apache/kylin/rest/service/StageTest.java | 2 +- .../rest/service/TableSamplingServiceTest.java | 2 +- .../newten/BuildAndQueryEmptySegmentsTest.java | 2 +- .../org/apache/kylin/newten/CharNColumnTest.java | 2 +- .../kylin/newten/EnhancedAggPushDownTest.java | 2 +- .../org/apache/kylin/newten/ExactlyMatchTest.java | 2 +- .../kylin/newten/MultiPartitionPruningTest.java | 2 +- .../org/apache/kylin/newten/NAggPushDownTest.java | 2 +- .../apache/kylin/newten/NBitmapFunctionTest.java | 2 +- .../kylin/newten/NBuildAndQuerySnapshotTest.java | 2 +- .../apache/kylin/newten/NComputedColumnTest.java | 2 +- .../newten/NCountDistinctWithoutEncodeTest.java | 2 +- .../org/apache/kylin/newten/NFilePruningTest.java | 2 +- .../apache/kylin/newten/NFilePruningV2Test.java | 2 +- .../newten/NFlattableJoinWithoutLookupTest.java | 2 +- .../java/org/apache/kylin/newten/NJoinOptTest.java | 2 +- .../newten/NManualBuildAndQueryCuboidTest.java | 2 +- .../org/apache/kylin/newten/NMatchingTest.java | 2 +- .../kylin/newten/NMultiPartitionJobTest.java | 2 +- .../kylin/newten/NMultipleColumnsInTest.java | 2 +- .../kylin/newten/NOptIntersectCountTest.java | 2 +- .../apache/kylin/newten/NPartitionColumnTest.java | 2 +- .../org/apache/kylin/newten/NTopNResultTest.java | 2 +- .../apache/kylin/newten/NTopNWithChineseTest.java | 2 +- .../apache/kylin/newten/ReuseFlatTableTest.java | 2 +- .../kylin/newten/SimilarToEscapeFunctionTest.java | 2 +- .../org/apache/kylin/newten/SumLCResultTest.java | 2 +- .../org/apache/kylin/newten/TableIndexTest.java | 2 +- .../org/apache/kylin/newten/TimeZoneQueryTest.java | 2 +- .../kylin/query/routing/QueryLayoutFilterTest.java | 2 +- .../apache/kylin/event/ITStorageCleanerTest.java | 2 +- .../config/initialize/ModelBrokenListenerTest.java | 3 +- .../apache/kylin/rest/service/BaseIndexTest.java | 2 +- .../kylin/rest/service/FusionIndexServiceTest.java | 2 +- .../kylin/rest/service/FusionModelServiceTest.java | 2 +- .../kylin/rest/service/IndexPlanServiceTest.java | 2 +- .../service/ModelServiceSemanticUpdateTest.java | 2 +- .../kylin/rest/service/ModelServiceTest.java | 2 +- .../kylin/rest/service/ProjectServiceTest.java | 2 +- .../kylin/rest/service/TableReloadServiceTest.java | 2 +- .../kylin/rest/service/TableServiceTest.java | 2 +- .../util/SecondStorageJobUtilTest.java | 99 ++++++++++++++++++++++ .../spark/job/NSparkCubingJobOnYarnTest.java | 2 +- .../org/apache/kylin/it/TestModelViewQuery.scala | 6 +- .../org/apache/kylin/tool/AuditLogToolTest.java | 2 +- .../org/apache/kylin/tool/JobDiagInfoToolTest.java | 2 +- .../org/apache/kylin/tool/StorageCleanerTest.java | 2 +- .../org/apache/kylin/tool/SystemUsageToolTest.java | 4 +- .../apache/kylin/tool/YarnApplicationToolTest.java | 2 +- .../kylin/tool/garbage/ExecutableCleanerTest.java | 2 +- .../kylin/tool/upgrade/MigrateJobToolTest.java | 2 +- 86 files changed, 364 insertions(+), 199 deletions(-) diff --git a/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/SecondStorageUtil.java b/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/SecondStorageUtil.java index ce3eb656b2..5b92f57d36 100644 --- a/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/SecondStorageUtil.java +++ b/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/SecondStorageUtil.java @@ -91,7 +91,7 @@ import io.kyligence.kap.secondstorage.response.SecondStorageNode; // CALL FROM CORE public class SecondStorageUtil { public static final Set<ExecutableState> RUNNING_STATE = ImmutableSet.of(ExecutableState.RUNNING, - ExecutableState.READY, ExecutableState.PAUSED); + ExecutableState.READY, ExecutableState.PAUSED, ExecutableState.PENDING); public static final Set<JobTypeEnum> RELATED_JOBS = ImmutableSet.of(JobTypeEnum.INDEX_BUILD, JobTypeEnum.INDEX_REFRESH, JobTypeEnum.INC_BUILD, JobTypeEnum.INDEX_MERGE, JobTypeEnum.EXPORT_TO_SECOND_STORAGE, JobTypeEnum.SECOND_STORAGE_REFRESH_SECONDARY_INDEXES); diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java index 1b00f17b8f..151fc595bd 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java @@ -96,6 +96,7 @@ import org.apache.kylin.metadata.cube.storage.ProjectStorageInfoCollector; import org.apache.kylin.metadata.cube.storage.StorageInfoEnum; import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.favorite.FavoriteRuleManager; +import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.NTableMetadataManager; @@ -650,7 +651,7 @@ public class ProjectService extends BasicService { response.setJdbcSourceDriver(config.getJdbcDriver()); response.setOverrideKylinProps(projectInstance.getOverrideKylinProps()); - + Pair<String, String> infos = KylinVersion.getGitCommitInfo(); response.setGitCommit(infos.getFirst()); response.setPackageVersion(KylinVersion.getCurrentVersion().toString()); @@ -968,10 +969,16 @@ public class ProjectService extends BasicService { NProjectManager prjManager = getManager(NProjectManager.class); prjManager.forceDropProject(project); + UnitOfWork.get().doAfterUpdate(() -> deleteProjectRelatedMeta(project)); UnitOfWork.get().doAfterUnit(() -> new ProjectDropListener().onDelete(project, clusterManager, headers)); EventBusFactory.getInstance().postAsync(new SourceUsageUpdateNotifier()); } + private void deleteProjectRelatedMeta(String project) { + // delete query history id offset + QueryHistoryIdOffsetManager.getInstance(project).delete(); + } + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#project, 'ADMINISTRATION')") @Transaction(project = 0) public void updateDefaultDatabase(String project, String defaultDatabase) { diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java index 80322c83ff..1f911f2fdd 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java @@ -124,8 +124,8 @@ public class MetricsRegistryTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e7fc037491..04a39029ff 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1265,7 +1265,7 @@ public abstract class KylinConfigBase implements Serializable { } public Integer getSchedulerPollIntervalSecond() { - return Integer.parseInt(getOptional("kylin.job.scheduler.poll-interval-second", "30")); + return Integer.parseInt(getOptional("kylin.job.scheduler.poll-interval-second", "10")); } public boolean isFlatTableJoinWithoutLookup() { @@ -2349,7 +2349,7 @@ public abstract class KylinConfigBase implements Serializable { // ============================================================================ private boolean isMicroService() { - return Boolean.parseBoolean(this.getOptional("kylin.micro.service", TRUE)); + return Boolean.parseBoolean(this.getOptional("kylin.micro.service", FALSE)); } public String getServerMode() { @@ -3956,11 +3956,11 @@ public abstract class KylinConfigBase implements Serializable { } public long getJobSchedulerMasterPollIntervalSec() { - return Long.parseLong(this.getOptional("kylin.job.master-poll-interval-second", "30")); + return Long.parseLong(this.getOptional("kylin.job.master-poll-interval-second", "10")); } public int getJobSchedulerMasterPollBatchSize() { - return Integer.parseInt(this.getOptional("kylin.job.master-pull-batch-size", "10")); + return Integer.parseInt(this.getOptional("kylin.job.master-pull-batch-size", "30")); } public long getJobSchedulerJobRenewalSec() { @@ -3972,7 +3972,7 @@ public abstract class KylinConfigBase implements Serializable { } public int getJobSchedulerSlavePollBatchSize() { - return Integer.parseInt(this.getOptional("kylin.job.slave-pull-batch-size", "5")); + return Integer.parseInt(this.getOptional("kylin.job.slave-pull-batch-size", "20")); } public int getParallelJobCountThreshold() { diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties b/src/core-common/src/main/resources/metadata-jdbc-h2.properties index dd8d7dc3c2..83edd636ef 100644 --- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties +++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties @@ -157,7 +157,7 @@ create.job.info.table=CREATE TABLE IF NOT EXISTS `%s` ( \ `job_status` varchar(50) NOT NULL, \ `project` varchar(100) NOT NULL, \ `subject` varchar(200) NOT NULL, \ - `model_id` varchar(100) NOT NULL, \ + `model_id` varchar(200) NOT NULL, \ `priority` integer DEFAULT 3, \ `job_content` longblob NOT NULL, \ `create_time` bigint, \ @@ -169,6 +169,7 @@ create.job.info.table=CREATE TABLE IF NOT EXISTS `%s` ( \ create.job.lock.table=CREATE TABLE IF NOT EXISTS `%s` ( \ `id` bigint(10) NOT NULL AUTO_INCREMENT,\ + `project` varchar(100) NOT NULL,\ `lock_id` varchar(100) NOT NULL COMMENT 'what is locked', \ `lock_node` varchar(50) DEFAULT NULL COMMENT 'who locked it', \ `lock_expire_time` datetime DEFAULT NULL COMMENT 'when does the lock expire', \ diff --git a/src/core-job/src/main/java/org/apache/kylin/job/config/JobMybatisConfig.java b/src/core-job/src/main/java/org/apache/kylin/job/config/JobMybatisConfig.java index a92b71ae26..de1e1a8140 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/config/JobMybatisConfig.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/config/JobMybatisConfig.java @@ -37,6 +37,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.metadata.JdbcDataSource; import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; +import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.job.condition.JobModeCondition; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.annotation.Conditional; @@ -59,8 +60,8 @@ public class JobMybatisConfig implements InitializingBean { private String database; - public static String JOB_INFO_TABLE = "job_info"; - public static String JOB_LOCK_TABLE = "job_lock"; + private String jobInfoTableName = "job_info"; + private String jobLockTableName = "job_lock"; public String getDatabase() { return database; @@ -71,6 +72,14 @@ public class JobMybatisConfig implements InitializingBean { setupJobTables(); } + public String getJobInfoTableName() { + return jobInfoTableName; + } + + public String getJobLockTableName() { + return jobLockTableName; + } + public void setupJobTables() throws Exception { val url = KylinConfig.getInstanceFromEnv().getMetadataUrl(); val props = JdbcUtil.datasourceParameters(url); @@ -80,9 +89,13 @@ public class JobMybatisConfig implements InitializingBean { if (StringUtils.isEmpty(url.getScheme())) { log.info("metadata from file"); keIdentified = "file"; + if (KylinConfig.getInstanceFromEnv().isUTEnv()) { + String uuid = RandomUtil.randomUUIDStr().replace("-", "_"); + keIdentified = "UT_" + uuid; + } } - JOB_INFO_TABLE = keIdentified + "_job_info"; - JOB_LOCK_TABLE = keIdentified + "_job_lock"; + jobInfoTableName = keIdentified + "_job_info"; + jobLockTableName = keIdentified + "_job_lock"; database = Database.MYSQL.databaseId; String jobInfoFile = "script/schema_job_info_mysql.sql"; @@ -117,11 +130,11 @@ public class JobMybatisConfig implements InitializingBean { } } try { - if (!isTableExists(dataSource.getConnection(), JOB_INFO_TABLE)) { + if (!isTableExists(dataSource.getConnection(), jobInfoTableName)) { createTableIfNotExist(keIdentified, jobInfoFile); } - if (!isTableExists(dataSource.getConnection(), JOB_LOCK_TABLE)) { + if (!isTableExists(dataSource.getConnection(), jobLockTableName)) { createTableIfNotExist(keIdentified, jobLockFile); } } catch (SQLException | IOException e) { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/config/JobTableInterceptor.java b/src/core-job/src/main/java/org/apache/kylin/job/config/JobTableInterceptor.java index 4dfdb6f25e..5beb14bc39 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/config/JobTableInterceptor.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/config/JobTableInterceptor.java @@ -18,7 +18,6 @@ package org.apache.kylin.job.config; -import java.lang.reflect.Method; import java.util.List; import java.util.Objects; @@ -69,11 +68,11 @@ public class JobTableInterceptor implements Interceptor { @Override public Object intercept(Invocation invocation) throws Throwable { - Object target = invocation.getTarget(); - Method method = invocation.getMethod(); Object[] args = invocation.getArgs(); - if (JobMybatisConfig.JOB_INFO_TABLE == null || JobMybatisConfig.JOB_LOCK_TABLE == null) { + String jobInfoTableName = jobMybatisConfig.getJobInfoTableName(); + String jobLockTableName = jobMybatisConfig.getJobLockTableName(); + if (jobInfoTableName == null || jobLockTableName == null) { logger.info("mybatis table not init, skip"); return null; } @@ -95,24 +94,24 @@ public class JobTableInterceptor implements Interceptor { } if (args[1] == null) { MapperMethod.ParamMap map = new MapperMethod.ParamMap(); - map.put("jobLockTable", JobMybatisConfig.JOB_LOCK_TABLE); - map.put("jobInfoTable", JobMybatisConfig.JOB_INFO_TABLE); + map.put("jobLockTable", jobLockTableName); + map.put("jobInfoTable", jobInfoTableName); map.put("database", database); invocation.getArgs()[1] = map; - } else if (args[1].getClass() == MapperMethod.ParamMap.class) { + } else if (args[1] instanceof MapperMethod.ParamMap) { MapperMethod.ParamMap map = (MapperMethod.ParamMap) args[1]; - map.put("jobLockTable", JobMybatisConfig.JOB_LOCK_TABLE); - map.put("jobInfoTable", JobMybatisConfig.JOB_INFO_TABLE); + map.put("jobLockTable", jobLockTableName); + map.put("jobInfoTable", jobInfoTableName); map.put("database", database); - } else if (args[1].getClass() == JobMapperFilter.class) { + } else if (args[1] instanceof JobMapperFilter) { JobMapperFilter mapperFilter = (JobMapperFilter) args[1]; - mapperFilter.setJobInfoTable(JobMybatisConfig.JOB_INFO_TABLE); - } else if (args[1].getClass() == JobInfo.class) { + mapperFilter.setJobInfoTable(jobInfoTableName); + } else if (args[1] instanceof JobInfo) { JobInfo jobInfo = (JobInfo) args[1]; - jobInfo.setJobInfoTable(JobMybatisConfig.JOB_INFO_TABLE); - } else if (args[1].getClass() == JobLock.class) { + jobInfo.setJobInfoTable(jobInfoTableName); + } else if (args[1] instanceof JobLock) { JobLock jobLock = (JobLock) args[1]; - jobLock.setJobLockTable(JobMybatisConfig.JOB_LOCK_TABLE); + jobLock.setJobLockTable(jobLockTableName); jobLock.setDatabase(database); } else { logger.error("miss type of param {}", args[1].getClass()); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java index 94b53737df..dce995d971 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java @@ -19,6 +19,7 @@ package org.apache.kylin.job.dao; import static org.apache.kylin.job.util.JobInfoUtil.JOB_SERIALIZER; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -29,11 +30,12 @@ import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; +import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.job.config.JobMybatisConfig; import org.apache.kylin.job.domain.JobInfo; import org.apache.kylin.job.domain.JobLock; +import org.apache.kylin.job.exception.ExecuteRuntimeException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; @@ -196,7 +198,7 @@ public class JobInfoDao { jobInfo.setModelId(executablePO.getTargetModel()); jobInfo.setCreateTime(executablePO.getCreateTime()); jobInfo.setUpdateTime(executablePO.getLastModified()); - jobInfo.setJobContent(JobInfoUtil.serializeExecutablePO(executablePO)); + jobInfo.setJobContent(checkAndCompressJobContent(JobInfoUtil.serializeExecutablePO(executablePO))); ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), executablePO.getProject()); @@ -207,6 +209,14 @@ public class JobInfoDao { return jobInfo; } + private byte[] checkAndCompressJobContent(byte[] jobContent) { + try { + return CompressionUtils.compress(jobContent); + } catch (IOException e) { + throw new ExecuteRuntimeException("Compress job content failed.", e); + } + } + public void deleteJobsByProject(String project) { int count = jobInfoMapper.deleteByProject(project); logger.info("delete {} jobs for project {}", count, project); @@ -222,8 +232,8 @@ public class JobInfoDao { jobInfoMapper.deleteByProject(project); } for (JobInfo jobInfo : jobInfos) { + jobInfo.setJobContent(checkAndCompressJobContent(jobInfo.getJobContent())); JobInfo currentJobInfo = jobInfoMapper.selectByJobId(jobInfo.getJobId()); - jobInfo.setJobInfoTable(JobMybatisConfig.JOB_INFO_TABLE); if (currentJobInfo == null) { jobInfoMapper.insert(jobInfo); } else { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java b/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java index 7083c50159..1d41510ba3 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java @@ -34,6 +34,8 @@ public class JobLock { private String lockId; + private String project; + private String lockNode; private Date lockExpireTime; @@ -49,8 +51,9 @@ public class JobLock { private String database; - public JobLock(String lockId, int priority) { + public JobLock(String lockId, String project, int priority) { this.lockId = lockId; + this.project = project; this.priority = priority; this.createTime = System.currentTimeMillis(); this.updateTime = System.currentTimeMillis(); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java b/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java index c0b3c4f785..89dba3ebe7 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java @@ -51,7 +51,8 @@ public interface JobLockMapper { int batchRemoveLock(@Param("jobIdList") List<String> jobIdList); - List<PriorityFistRandomOrderJob> findNonLockIdList(@Param("batchSize") int batchSize); + List<PriorityFistRandomOrderJob> findNonLockIdList(@Param("batchSize") int batchSize, + @Param("projects") List<String> projects); List<String> findExpiredORNonLockIdList(@Param("batchSize") int batchSize); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java index 0d453c2844..f4940f68ec 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java @@ -32,6 +32,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.logging.SetLogCategory; +import org.apache.kylin.common.util.ExecutorServiceUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.ThreadUtils; import org.apache.kylin.guava30.shaded.common.collect.Lists; @@ -53,6 +54,7 @@ import org.apache.kylin.job.runners.JobCheckUtil; import org.apache.kylin.job.util.JobContextUtil; import org.apache.kylin.job.util.JobInfoUtil; import org.apache.kylin.metadata.cube.utils.StreamingUtils; +import org.apache.kylin.metadata.epoch.EpochManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,7 +159,7 @@ public class JdbcJobScheduler implements JobScheduler { } if (Objects.nonNull(executorPool)) { - executorPool.shutdownNow(); + ExecutorServiceUtil.shutdownGracefully(executorPool, 60); } } @@ -165,7 +167,7 @@ public class JdbcJobScheduler implements JobScheduler { // init master lock try { if (jobContext.getJobLockMapper().selectByJobId(JobScheduler.MASTER_SCHEDULER) == null) { - jobContext.getJobLockMapper().insertSelective(new JobLock(JobScheduler.MASTER_SCHEDULER, 0)); + jobContext.getJobLockMapper().insertSelective(new JobLock(JobScheduler.MASTER_SCHEDULER, "_global", 0)); } } catch (Exception e) { logger.error("Try insert 'master_scheduler' failed.", e); @@ -218,7 +220,7 @@ public class JdbcJobScheduler implements JobScheduler { JobLock lock = jobContext.getJobLockMapper().selectByJobId(jobId); JobInfo jobInfo = jobContext.getJobInfoMapper().selectByJobId(jobId); if (lock == null && jobContext.getJobLockMapper() - .insertSelective(new JobLock(jobId, jobInfo.getPriority())) == 0) { + .insertSelective(new JobLock(jobId, jobInfo.getProject(), jobInfo.getPriority())) == 0) { logger.error("Create job lock for [{}] failed!", jobId); return null; } @@ -276,7 +278,8 @@ public class JdbcJobScheduler implements JobScheduler { if (exeFreeSlots < batchSize) { batchSize = exeFreeSlots; } - List<String> jobIdList = findNonLockIdListInOrder(batchSize); + List<String> projects = EpochManager.getInstance().listProjectWithPermissionForScheduler(); + List<String> jobIdList = findNonLockIdListInOrder(batchSize, projects); if (CollectionUtils.isEmpty(jobIdList)) { return; @@ -313,8 +316,9 @@ public class JdbcJobScheduler implements JobScheduler { } } - public List<String> findNonLockIdListInOrder(int batchSize) { - List<PriorityFistRandomOrderJob> jobIdList = jobContext.getJobLockMapper().findNonLockIdList(batchSize); + public List<String> findNonLockIdListInOrder(int batchSize, List<String> projects) { + List<PriorityFistRandomOrderJob> jobIdList = jobContext.getJobLockMapper().findNonLockIdList(batchSize, + projects); // Shuffle jobs avoiding jobLock conflict. // At the same time, we should ensure the overall order. if (hasRunningJob()) { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java index 94b3966d92..c1aadc1dca 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java @@ -18,28 +18,19 @@ package org.apache.kylin.job.util; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.nio.charset.Charset; -import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Properties; import javax.sql.DataSource; -import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.builder.xml.XMLMapperBuilder; -import org.apache.ibatis.jdbc.ScriptRunner; import org.apache.ibatis.mapping.Environment; import org.apache.ibatis.plugin.Interceptor; import org.apache.ibatis.session.Configuration; @@ -49,7 +40,6 @@ import org.apache.ibatis.transaction.TransactionFactory; import org.apache.ibatis.type.JdbcType; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; -import org.apache.kylin.common.logging.LogOutputStream; import org.apache.kylin.common.persistence.metadata.JdbcDataSource; import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil; import org.apache.kylin.common.util.AddressUtil; @@ -68,6 +58,7 @@ import org.mybatis.spring.transaction.SpringManagedTransactionFactory; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import lombok.SneakyThrows; @@ -80,12 +71,6 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class JobContextUtil { - private static final Charset DEFAULT_CHARSET = Charset.defaultCharset(); - - private static final String CREATE_JOB_INFO_TABLE = "create.job.info.table"; - - private static final String CREATE_JOB_LOCK_TABLE = "create.job.lock.table"; - private static JobInfoMapper jobInfoMapper; private static JobLockMapper jobLockMapper; @@ -135,43 +120,30 @@ public class JobContextUtil { if (null != jobInfoMapper && null != jobLockMapper) { return; } - boolean isUTEnv = config.isUTEnv(); - StorageURL url = config.getMetadataUrl(); Properties props = JdbcUtil.datasourceParameters(url); - DataSource dataSource = null; try { - dataSource = JdbcDataSource.getDataSource(props); - if (!isUTEnv) { - // setup job table names - jobMybatisConfig.setDataSource(dataSource); - jobMybatisConfig.setupJobTables(); - jobTableInterceptor.setJobMybatisConfig(jobMybatisConfig); - } + DataSource dataSource = JdbcDataSource.getDataSource(props); transactionManager = JdbcDataSource.getTransactionManager(dataSource); SqlSessionFactory sqlSessionFactory = getSqlSessionFactory(dataSource); - addPluginForSqlSessionManager(sqlSessionFactory); + addPluginForSqlSessionManager(dataSource, sqlSessionFactory); sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory); jobInfoMapper = sqlSessionTemplate.getMapper(JobInfoMapper.class); jobLockMapper = sqlSessionTemplate.getMapper(JobLockMapper.class); - if (isUTEnv) { - createTableIfNotExist((BasicDataSource) dataSource, "job_info"); - createTableIfNotExist((BasicDataSource) dataSource, "job_lock"); - jobInfoMapper.deleteAllJob(); - jobLockMapper.deleteAllJobLock(); - } } catch (Exception e) { throw new RuntimeException("initialize mybatis mappers failed", e); } } - private static void addPluginForSqlSessionManager(SqlSessionFactory sqlSessionFactory) { + private static void addPluginForSqlSessionManager(DataSource dataSource, SqlSessionFactory sqlSessionFactory) + throws Exception { + jobMybatisConfig.setDataSource(dataSource); + jobMybatisConfig.setupJobTables(); + jobTableInterceptor.setJobMybatisConfig(jobMybatisConfig); List<Interceptor> interceptors = sqlSessionFactory.getConfiguration().getInterceptors(); - if (!interceptors.contains(jobTableInterceptor)) { sqlSessionFactory.getConfiguration().addInterceptor(jobTableInterceptor); } - } public static SqlSessionFactory getSqlSessionFactory(DataSource dataSource) throws SQLException, IOException { @@ -199,39 +171,6 @@ public class JobContextUtil { } } - private static void createTableIfNotExist(BasicDataSource dataSource, String tableName) - throws IOException, SQLException { - if (JdbcUtil.isTableExists(dataSource.getConnection(), tableName)) { - log.info("{} already existed in database", tableName); - return; - } - - String createTableStmtProp = ""; - if ("job_info".equals(tableName)) { - createTableStmtProp = CREATE_JOB_INFO_TABLE; - } else { - createTableStmtProp = CREATE_JOB_LOCK_TABLE; - } - - Properties properties = JdbcUtil.getProperties(dataSource); - String createTableStmt = String.format(Locale.ROOT, properties.getProperty(createTableStmtProp), tableName); - try (Connection connection = dataSource.getConnection()) { - ScriptRunner sr = new ScriptRunner(connection); - sr.setLogWriter(new PrintWriter(new OutputStreamWriter(new LogOutputStream(log), DEFAULT_CHARSET))); - log.debug("start to create table({})", tableName); - sr.runScript(new InputStreamReader(new ByteArrayInputStream(createTableStmt.getBytes(DEFAULT_CHARSET)), - DEFAULT_CHARSET)); - log.debug("create table finished"); - } - - if (!JdbcUtil.isTableExists(dataSource.getConnection(), tableName)) { - log.debug("failed to create table({})", tableName); - throw new IllegalStateException(String.format(Locale.ROOT, "create table(%s) failed", tableName)); - } else { - log.debug("table({}) already exists.", tableName); - } - } - public static JobInfoDao getJobInfoDao(KylinConfig config) { if (config.isUTEnv() || isNoSpringContext()) { return getJobInfoDaoForTestOrTool(config); @@ -269,6 +208,7 @@ public class JobContextUtil { if (null != jobContext) { jobContext.destroy(); } + dropUTJobTable(); jobInfoMapper = null; jobLockMapper = null; jobInfoDao = null; @@ -281,6 +221,17 @@ public class JobContextUtil { } } + private static void dropUTJobTable() { + try { + if (null != transactionManager) { + JdbcTemplate jdbcTemplate = new JdbcTemplate(transactionManager.getDataSource()); + jdbcTemplate.execute("DROP ALL OBJECTS"); + } + } catch (Exception e) { + log.error("Drop UT job table failed.", e); + } + } + // for test only public static boolean hasStarted() { return jobContext != null; diff --git a/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java index 3b84a0284e..76b03350c9 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java @@ -22,9 +22,11 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.zip.DataFormatException; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.guava30.shaded.common.io.ByteSource; import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.domain.JobInfo; @@ -48,10 +50,10 @@ public class JobInfoUtil { } public static ExecutablePO deserializeExecutablePO(JobInfo jobInfo) { - ByteSource byteSource = ByteSource.wrap(jobInfo.getJobContent()); try { + ByteSource byteSource = ByteSource.wrap(CompressionUtils.decompress(jobInfo.getJobContent())); return deserializeExecutablePO(byteSource, jobInfo.getUpdateTime(), jobInfo.getProject()); - } catch (IOException e) { + } catch (IOException | DataFormatException e) { log.warn("Error when deserializing jobInfo, id: {} " + jobInfo.getJobId(), e); return null; } diff --git a/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml b/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml index 528f7ea425..95a6cd77ba 100644 --- a/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml +++ b/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml @@ -4,6 +4,7 @@ <resultMap id="BaseResultMap" type="org.apache.kylin.job.domain.JobLock"> <id column="id" jdbcType="BIGINT" property="id" /> <result column="lock_id" jdbcType="VARCHAR" property="lockId" /> + <result column="project" jdbcType="VARCHAR" property="project" /> <result column="lock_node" jdbcType="VARCHAR" property="lockNode" /> <result column="lock_expire_time" jdbcType="TIMESTAMP" property="lockExpireTime" /> <result column="priority" jdbcType="INTEGER" property="priority" /> @@ -37,10 +38,10 @@ delete from ${jobLockTable} </delete> <insert id="insert" parameterType="org.apache.kylin.job.domain.JobLock"> - insert into ${jobLockTable} (id, lock_id, lock_node, + insert into ${jobLockTable} (id, lock_id, project, lock_node, lock_expire_time, priority, create_time, update_time ) - values (#{id,jdbcType=BIGINT}, #{lockId,jdbcType=VARCHAR}, #{lockNode,jdbcType=VARCHAR}, + values (#{id,jdbcType=BIGINT}, #{lockId,jdbcType=VARCHAR}, #{project,jdbcType=VARCHAR}, #{lockNode,jdbcType=VARCHAR}, #{lockExpireTime,jdbcType=TIMESTAMP}, #{priority,jdbcType=INTEGER}, #{createTime,jdbcType=BIGINT}, #{updateTime,jdbcType=BIGINT} ) </insert> @@ -53,6 +54,9 @@ <if test="lockId != null"> lock_id, </if> + <if test="project != null"> + project, + </if> <if test="lockNode != null"> lock_node, </if> @@ -76,6 +80,9 @@ <if test="lockId != null"> #{lockId,jdbcType=VARCHAR}, </if> + <if test="project != null"> + #{project,jdbcType=VARCHAR}, + </if> <if test="lockNode != null"> #{lockNode,jdbcType=VARCHAR}, </if> @@ -134,7 +141,14 @@ <select id="findNonLockIdList" resultMap="PriorityFistRandomOrderJob"> SELECT lock_id, priority FROM ${jobLockTable} - WHERE lock_node IS NULL OR lock_expire_time <![CDATA[<]]> CURRENT_TIMESTAMP + WHERE + <if test="projects != null"> + <foreach close=")" collection="projects" index="index" item="item" open="project in (" separator=","> + #{item} + </foreach> + AND + </if> + (lock_node IS NULL OR lock_expire_time <![CDATA[<]]> CURRENT_TIMESTAMP) ORDER BY priority ASC <if test="batchSize>=0"> LIMIT #{batchSize,jdbcType=INTEGER} diff --git a/src/core-job/src/main/resources/script/schema_job_info_h2.sql b/src/core-job/src/main/resources/script/schema_job_info_h2.sql index 9335bd3502..2cbcdcce25 100644 --- a/src/core-job/src/main/resources/script/schema_job_info_h2.sql +++ b/src/core-job/src/main/resources/script/schema_job_info_h2.sql @@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info ( job_status varchar(50) NOT NULL, project varchar(100) NOT NULL, subject varchar(200) NOT NULL, - model_id varchar(100), + model_id varchar(200), priority integer DEFAULT 3, job_content longblob NOT NULL, mvcc bigint(10), @@ -32,4 +32,4 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info ( job_duration_millis bigint(10) NOT NULL DEFAULT '0' COMMENT 'total duration milliseconds', PRIMARY KEY (id), UNIQUE KEY uk_job_id (job_id) -) DEFAULT CHARSET=utf8; \ No newline at end of file +); \ No newline at end of file diff --git a/src/core-job/src/main/resources/script/schema_job_info_mysql.sql b/src/core-job/src/main/resources/script/schema_job_info_mysql.sql index 63187e29b2..af439d41d6 100644 --- a/src/core-job/src/main/resources/script/schema_job_info_mysql.sql +++ b/src/core-job/src/main/resources/script/schema_job_info_mysql.sql @@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info ( job_status varchar(50) NOT NULL, project varchar(100) NOT NULL, subject varchar(200) NOT NULL, - model_id varchar(100), + model_id varchar(200), priority integer DEFAULT 3, mvcc bigint(10), job_content longblob NOT NULL, @@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info ( job_duration_millis bigint(10) NOT NULL DEFAULT '0' COMMENT 'total duration milliseconds', PRIMARY KEY (id), UNIQUE KEY uk_job_id (job_id) -) AUTO_INCREMENT=10000 ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) AUTO_INCREMENT=10000 ENGINE=InnoDB; create index KE_IDENTIFIED_job_info_ix on KE_IDENTIFIED_job_info (project, job_status, job_type, subject); diff --git a/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql b/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql index 604a5d2f68..aee6329ede 100644 --- a/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql +++ b/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql @@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info ( job_status varchar(50) NOT NULL, project varchar(100) NOT NULL, subject varchar(200) NOT NULL, - model_id varchar(100), + model_id varchar(200), priority integer DEFAULT 3, mvcc bigint, job_content bytea NOT NULL, diff --git a/src/core-job/src/main/resources/script/schema_job_lock_h2.sql b/src/core-job/src/main/resources/script/schema_job_lock_h2.sql index 7f886da928..df854c3e03 100644 --- a/src/core-job/src/main/resources/script/schema_job_lock_h2.sql +++ b/src/core-job/src/main/resources/script/schema_job_lock_h2.sql @@ -19,6 +19,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock ( id bigint(10) NOT NULL AUTO_INCREMENT, + project varchar(100) NOT NULL, lock_id varchar(100) NOT NULL COMMENT 'what is locked', lock_node varchar(50) DEFAULT NULL COMMENT 'who locked it', lock_expire_time timestamp COMMENT 'when does the lock expire', @@ -27,4 +28,4 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock ( update_time bigint, PRIMARY KEY (id), UNIQUE KEY uk_lock_id (lock_id) -) DEFAULT CHARSET=utf8; +); diff --git a/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql b/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql index bd59896a94..7cd16c56c3 100644 --- a/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql +++ b/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql @@ -19,6 +19,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock ( id bigint(20) NOT NULL AUTO_INCREMENT, + project varchar(100) NOT NULL, lock_id varchar(100) NOT NULL COMMENT 'what is locked', lock_node varchar(50) DEFAULT NULL COMMENT 'who locked it', lock_expire_time timestamp COMMENT 'when does the lock expire', @@ -27,4 +28,4 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock ( update_time bigint, PRIMARY KEY (id), UNIQUE KEY uk_lock_id (lock_id) -) AUTO_INCREMENT=10000 ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) AUTO_INCREMENT=10000 ENGINE=InnoDB; diff --git a/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql b/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql index 4142a73fdf..004616961e 100644 --- a/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql +++ b/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql @@ -18,6 +18,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock ( id SERIAL PRIMARY KEY, + project varchar(100) NOT NULL, lock_id varchar(100) UNIQUE NOT NULL, lock_node varchar(50) DEFAULT NULL, lock_expire_time timestamptz DEFAULT NULL, diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java index 86fd300783..9ead83b465 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java @@ -50,8 +50,8 @@ public class JobMailUtilTest extends NLocalFileMetadataTestCase { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index 449efd4d4c..7191e29735 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -75,8 +75,8 @@ public abstract class BaseSchedulerTest extends NLocalFileMetadataTestCase { @After public void after() throws Exception { JobContext jobContext = JobContextUtil.getJobContext(KylinConfig.getInstanceFromEnv()); - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); await().atMost(30, TimeUnit.SECONDS).until(() -> jobContext.getJobScheduler().getRunningJob().size() == 0); } diff --git a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java index ea2fbb89ab..daa5de0348 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java @@ -20,6 +20,7 @@ package org.apache.kylin.job.scheduler; import static org.apache.kylin.common.util.TestUtils.getTestConfig; import static org.awaitility.Awaitility.await; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; @@ -117,7 +118,7 @@ class JdbcJobSchedulerTest { @Test void testLockExpiredAndJobNotFinal() { String jobId = mockJob(); - JobLock lock = new JobLock(jobId, 1); + JobLock lock = new JobLock(jobId, PROJECT, 1); lock.setLockNode("mock_node"); lock.setLockExpireTime(new Date()); int expect = jobContext.getJobLockMapper().insert(lock); @@ -168,6 +169,7 @@ class JdbcJobSchedulerTest { JobLock lock = new JobLock(); String id = "mock_lock_id_" + i; lock.setLockId(id); + lock.setProject(PROJECT); lock.setLockNode("mock_node"); lock.setPriority(p); lock.setLockExpireTime(new Date()); @@ -179,8 +181,10 @@ class JdbcJobSchedulerTest { JdbcJobScheduler jobScheduler = Mockito.spy(originScheduler); Mockito.when(jobScheduler.hasRunningJob()).thenReturn(true); jobContext.setJobScheduler(jobScheduler); - List<String> order1 = jobContext.getJobScheduler().findNonLockIdListInOrder(20); - List<String> order2 = jobContext.getJobScheduler().findNonLockIdListInOrder(20); + List<String> order1 = jobContext.getJobScheduler().findNonLockIdListInOrder(20, + Collections.singletonList(PROJECT)); + List<String> order2 = jobContext.getJobScheduler().findNonLockIdListInOrder(20, + Collections.singletonList(PROJECT)); boolean hasDiff = false; int currentPriority = 0; for (int i = 0; i < order1.size(); i++) { @@ -202,7 +206,7 @@ class JdbcJobSchedulerTest { AbstractExecutable job = mockExecutable(); // insert job lock, without lock node String jobId = job.getJobId(); - JobLock lock = new JobLock(jobId, 1); + JobLock lock = new JobLock(jobId, PROJECT, 1); int expect = jobContext.getJobLockMapper().insert(lock); Assertions.assertEquals(1, expect); await().atMost(60, TimeUnit.SECONDS).until(() -> jobContext.getJobLockMapper().selectByJobId(jobId) == null); @@ -225,7 +229,7 @@ class JdbcJobSchedulerTest { job.getTasks().forEach(task -> task.getOutput().setStatus(ExecutableState.PENDING.name())); return true; }); - mapper.insertSelective(new JobLock(jobId, 3)); + mapper.insertSelective(new JobLock(jobId, PROJECT, 3)); // init schedule JobContextUtil.getJobContext(config); diff --git a/src/core-job/src/test/java/org/apache/kylin/mapper/JobLockMapperTest.java b/src/core-job/src/test/java/org/apache/kylin/mapper/JobLockMapperTest.java index ceed191e89..7645b8d1f8 100644 --- a/src/core-job/src/test/java/org/apache/kylin/mapper/JobLockMapperTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/mapper/JobLockMapperTest.java @@ -21,6 +21,7 @@ package org.apache.kylin.mapper; import static org.apache.kylin.common.util.TestUtils.getTestConfig; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.concurrent.TimeUnit; @@ -56,6 +57,7 @@ public class JobLockMapperTest { private JobLock generateJobLock() { JobLock jobLock = new JobLock(); + jobLock.setProject("default"); jobLock.setLockId("mock_lock_id"); jobLock.setLockNode("mock_lock_node"); long renewalSec = getTestConfig().getJobSchedulerJobRenewalSec(); @@ -80,7 +82,7 @@ public class JobLockMapperTest { long renewSec = getTestConfig().getJobSchedulerJobRenewalSec(); Awaitility.await().atMost(renewSec + 1, TimeUnit.SECONDS) - .until(() -> jobLockMapper.findNonLockIdList(10).size() > 0); + .until(() -> jobLockMapper.findNonLockIdList(10, Collections.singletonList("default")).size() > 0); // update (h2 no support mysql-dialect) // int updateAffect = jobLockMapper.upsertLock("mock_job_id", "mock_node_id", 10000L); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java index 5f8e868886..9ea32d0cb8 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java @@ -404,12 +404,25 @@ public class EpochManager { } private List<String> listProjectWithPermission() { - List<String> projects = epochCheckEnabled ? getProjectsToMarkOwner() - : NProjectManager.getInstance(config).listAllProjects().stream().map(ProjectInstance::getName) - .collect(Collectors.toList()); + List<String> projects = listRealProjectWithPermission(); projects.add(GLOBAL); return projects; } + + public List<String> listProjectWithPermissionForScheduler() { + List<String> projects = listRealProjectWithPermission(); + if (projects.size() == NProjectManager.getInstance(config).listAllProjects().size()) { + // Returning null indicates that filtering items is not required during scheduling. + return null; + } + return projects; + } + + private List<String> listRealProjectWithPermission() { + return epochCheckEnabled ? getProjectsToMarkOwner() + : NProjectManager.getInstance(config).listAllProjects().stream().map(ProjectInstance::getName) + .collect(Collectors.toList()); + } //for test public Epoch getGlobalEpoch() { diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/favorite/FavoriteRuleManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/favorite/FavoriteRuleManager.java index 2dbc143ab1..cb72f63e39 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/favorite/FavoriteRuleManager.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/favorite/FavoriteRuleManager.java @@ -129,6 +129,10 @@ public class FavoriteRuleManager { favoriteRuleStore.deleteByName(project, favoriteRule.getName()); } + public void deleteByProject() { + favoriteRuleStore.deleteByProject(project); + } + @VisibleForTesting public void createRule(final FavoriteRule rule) { FavoriteRule copy = copyForWrite(rule); diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java index c9c62aeb34..c41ed73a39 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java @@ -22,6 +22,7 @@ import static org.apache.kylin.common.util.TestUtils.getTestConfig; import static org.awaitility.Awaitility.await; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -32,11 +33,17 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.metadata.Epoch; import org.apache.kylin.common.persistence.metadata.EpochStore; import org.apache.kylin.common.persistence.transaction.UnitOfWork; +import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.junit.annotation.MetadataInfo; import org.apache.kylin.junit.annotation.OverwriteProp; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.resourcegroup.KylinInstance; +import org.apache.kylin.metadata.resourcegroup.RequestTypeEnum; +import org.apache.kylin.metadata.resourcegroup.ResourceGroupEntity; import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager; +import org.apache.kylin.metadata.resourcegroup.ResourceGroupMappingInfo; +import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.test.util.ReflectionTestUtils; @@ -577,4 +584,38 @@ class EpochManagerTest { Assertions.assertTrue(isEpochLegal); } + @Test + @MetadataInfo + void testListProjectForScheduling() { + KylinConfig config = getTestConfig(); + List<ProjectInstance> allProjects = NProjectManager.getInstance(config).listAllProjects(); + EpochManager epochManager = EpochManager.getInstance(); + ResourceGroupManager resourceGroupManager = ResourceGroupManager.getInstance(config); + + // Resource is disabled. + Assert.assertNull(epochManager.listProjectWithPermissionForScheduler()); + + // ResourceGroup is enabled, but no projects are bound. + resourceGroupManager.updateResourceGroup(copyForWrite -> copyForWrite.setResourceGroupEnabled(true)); + Assert.assertEquals(0, epochManager.listProjectWithPermissionForScheduler().size()); + + // ResourceGroup is enabled, and project 'default' is bound. + resourceGroupManager.updateResourceGroup(copyForWrite -> { + ResourceGroupEntity group = new ResourceGroupEntity(); + ReflectionTestUtils.setField(group, "id", "rg_001"); + KylinInstance kylinInstance = new KylinInstance(); + ReflectionTestUtils.setField(kylinInstance, "resourceGroupId", "rg_001"); + ReflectionTestUtils.setField(kylinInstance, "instance", AddressUtil.getLocalInstance()); + ResourceGroupMappingInfo resourceGroupMappingInfo = new ResourceGroupMappingInfo(); + ReflectionTestUtils.setField(resourceGroupMappingInfo, "project", "default"); + ReflectionTestUtils.setField(resourceGroupMappingInfo, "resourceGroupId", "rg_001"); + ReflectionTestUtils.setField(resourceGroupMappingInfo, "requestType", RequestTypeEnum.BUILD); + + copyForWrite.setResourceGroupEntities(Collections.singletonList(group)); + copyForWrite.setKylinInstances(Collections.singletonList(kylinInstance)); + copyForWrite.setResourceGroupMappingInfoList(Collections.singletonList(resourceGroupMappingInfo)); + }); + Assert.assertEquals(1, epochManager.listProjectWithPermissionForScheduler().size()); + Assert.assertEquals("default", epochManager.listProjectWithPermissionForScheduler().get(0)); + } } diff --git a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java index 10b9202092..9620de66ab 100644 --- a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java +++ b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java @@ -115,8 +115,8 @@ public class JobControllerTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java b/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java index 0d0adad48f..7cbecb7ae4 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java @@ -36,7 +36,6 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.Message; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.job.config.JobMybatisConfig; import org.apache.kylin.job.constant.JobStatusUtil; import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.job.execution.ExecutableState; @@ -90,12 +89,10 @@ public class JobFilterUtil { jobFilter.getStatuses() .forEach(jobStatus -> scheduleStates.addAll(JobStatusUtil.mapJobStatusToScheduleState(jobStatus))); } - List<String> scheduleStateNames = scheduleStates.stream().map(executableState -> executableState.name()) - .collect(Collectors.toList()); return new JobMapperFilter(scheduleStates, jobFilter.getJobNames(), queryStartTime.getTime(), Lists.newArrayList(subjects), null, jobId, null, jobFilter.getProject(), orderByField, orderType, - offset, limit, JobMybatisConfig.JOB_INFO_TABLE, null); + offset, limit, null, null); } private static Date getQueryStartTime(int timeFilter) { diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/job/service/JobInfoServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/job/service/JobInfoServiceTest.java index 91274a76d2..f97bdfe600 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/job/service/JobInfoServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/job/service/JobInfoServiceTest.java @@ -163,8 +163,8 @@ public class JobInfoServiceTest extends LogOutputTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceBuildTest.java index c22d02bda5..a42f6b98c3 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceBuildTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceBuildTest.java @@ -122,8 +122,8 @@ public class FusionModelServiceBuildTest extends SourceTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java index e4853174e2..06eee4367d 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java @@ -129,8 +129,8 @@ public class JobErrorTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } private String getProject() { diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java index 5446304f68..ab96972e4a 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java @@ -65,8 +65,8 @@ public class JobResourceServiceTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java index 107e997bd3..0411300f20 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java @@ -248,8 +248,8 @@ public class ModelServiceBuildTest extends SourceTestCase { EventBusFactory.getInstance().unregister(eventListener); EventBusFactory.getInstance().unregister(modelBrokenListener); EventBusFactory.getInstance().restart(); - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); if (!TimeZone.getDefault().equals(defaultTimeZone)) { TimeZone.setDefault(defaultTimeZone); diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/SnapshotServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/SnapshotServiceTest.java index f8b76530cb..3e3f4dbabb 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/SnapshotServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/SnapshotServiceTest.java @@ -128,8 +128,8 @@ public class SnapshotServiceTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java index ceca44bcaa..e9e9a6e0f5 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java @@ -121,8 +121,8 @@ public class StageTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } private String getProject() { diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/TableSamplingServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/TableSamplingServiceTest.java index 6336fc954f..db5a80447d 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/TableSamplingServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/TableSamplingServiceTest.java @@ -95,8 +95,8 @@ public class TableSamplingServiceTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/BuildAndQueryEmptySegmentsTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/BuildAndQueryEmptySegmentsTest.java index 5d094fb07b..9034add0f4 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/BuildAndQueryEmptySegmentsTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/BuildAndQueryEmptySegmentsTest.java @@ -92,8 +92,8 @@ public class BuildAndQueryEmptySegmentsTest extends NLocalWithSparkSessionTest { @After public void cleanup() throws Exception { - super.cleanupTestMetadata(); JobContextUtil.cleanUp(); + super.cleanupTestMetadata(); } @Test diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/CharNColumnTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/CharNColumnTest.java index d6a7796c79..27d6ebf753 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/CharNColumnTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/CharNColumnTest.java @@ -44,8 +44,8 @@ public class CharNColumnTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java index 9ed827a094..2ee6a3cddd 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java @@ -46,8 +46,8 @@ public class EnhancedAggPushDownTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/ExactlyMatchTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/ExactlyMatchTest.java index c09a45fd99..6be3462460 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/ExactlyMatchTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/ExactlyMatchTest.java @@ -56,8 +56,8 @@ public class ExactlyMatchTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/MultiPartitionPruningTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/MultiPartitionPruningTest.java index 8702bb0922..b428794708 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/MultiPartitionPruningTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/MultiPartitionPruningTest.java @@ -89,8 +89,8 @@ public class MultiPartitionPruningTest extends NLocalWithSparkSessionTest implem @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggPushDownTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggPushDownTest.java index b74de3f3ff..60f8f60bad 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggPushDownTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggPushDownTest.java @@ -52,8 +52,8 @@ public class NAggPushDownTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBitmapFunctionTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBitmapFunctionTest.java index 1ca7d24369..762ad6591b 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBitmapFunctionTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBitmapFunctionTest.java @@ -48,9 +48,9 @@ public class NBitmapFunctionTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { + JobContextUtil.cleanUp(); cleanupTestMetadata(); FileUtils.deleteQuietly(new File("../kylin-it/metastore_db")); - JobContextUtil.cleanUp(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBuildAndQuerySnapshotTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBuildAndQuerySnapshotTest.java index e7f99d748f..b95e35d12d 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBuildAndQuerySnapshotTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBuildAndQuerySnapshotTest.java @@ -69,8 +69,8 @@ public class NBuildAndQuerySnapshotTest extends NLocalWithSparkSessionTest { @After public void cleanup() throws Exception { - super.cleanupTestMetadata(); JobContextUtil.cleanUp(); + super.cleanupTestMetadata(); } @Test diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NComputedColumnTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NComputedColumnTest.java index b2ae9784b8..c05d1e8044 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NComputedColumnTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NComputedColumnTest.java @@ -51,8 +51,8 @@ public class NComputedColumnTest extends NLocalWithSparkSessionTest { @After public void after() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NCountDistinctWithoutEncodeTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NCountDistinctWithoutEncodeTest.java index 83811aee3b..cc8413ac6d 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NCountDistinctWithoutEncodeTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NCountDistinctWithoutEncodeTest.java @@ -48,8 +48,8 @@ public class NCountDistinctWithoutEncodeTest extends NLocalWithSparkSessionTest @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningTest.java index 260fbed2ef..4954ad1f59 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningTest.java @@ -103,8 +103,8 @@ public class NFilePruningTest extends NLocalWithSparkSessionTest implements Adap @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningV2Test.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningV2Test.java index ca1035728f..3c903a101a 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningV2Test.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningV2Test.java @@ -98,8 +98,8 @@ public class NFilePruningV2Test extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFlattableJoinWithoutLookupTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFlattableJoinWithoutLookupTest.java index efed13b498..6988121c8c 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFlattableJoinWithoutLookupTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFlattableJoinWithoutLookupTest.java @@ -58,9 +58,9 @@ public class NFlattableJoinWithoutLookupTest extends NLocalWithSparkSessionTest @After public void after() throws Exception { + JobContextUtil.cleanUp(); cleanupTestMetadata(); FileUtils.deleteQuietly(new File("../kap-it/metastore_db")); - JobContextUtil.cleanUp(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NJoinOptTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NJoinOptTest.java index 96a19b9d99..da5a1b25b0 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NJoinOptTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NJoinOptTest.java @@ -85,8 +85,8 @@ public class NJoinOptTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Ignore("KE-30387") diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NManualBuildAndQueryCuboidTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NManualBuildAndQueryCuboidTest.java index b00559d438..0626143412 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NManualBuildAndQueryCuboidTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NManualBuildAndQueryCuboidTest.java @@ -80,8 +80,8 @@ public class NManualBuildAndQueryCuboidTest extends NManualBuildAndQueryTest { @After public void after() throws Exception { - super.cleanupTestMetadata(); JobContextUtil.cleanUp(); + super.cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMatchingTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMatchingTest.java index 6d0de04dbe..324b16caae 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMatchingTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMatchingTest.java @@ -50,8 +50,8 @@ public class NMatchingTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultiPartitionJobTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultiPartitionJobTest.java index e16dc83990..c6bfa7855f 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultiPartitionJobTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultiPartitionJobTest.java @@ -52,8 +52,8 @@ public class NMultiPartitionJobTest extends NLocalWithSparkSessionTest { @After public void after() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultipleColumnsInTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultipleColumnsInTest.java index bf941a2815..821861cd25 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultipleColumnsInTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultipleColumnsInTest.java @@ -47,8 +47,8 @@ public class NMultipleColumnsInTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NOptIntersectCountTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NOptIntersectCountTest.java index dd01b2438c..35c92ac856 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NOptIntersectCountTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NOptIntersectCountTest.java @@ -57,8 +57,8 @@ public class NOptIntersectCountTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NPartitionColumnTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NPartitionColumnTest.java index 80293e1c6a..9960e77396 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NPartitionColumnTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NPartitionColumnTest.java @@ -47,8 +47,8 @@ public class NPartitionColumnTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNResultTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNResultTest.java index 520df87766..7b3e8543f5 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNResultTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNResultTest.java @@ -47,8 +47,8 @@ public class NTopNResultTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNWithChineseTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNWithChineseTest.java index 2fb3487828..14ac5caf43 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNWithChineseTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNWithChineseTest.java @@ -47,8 +47,8 @@ public class NTopNWithChineseTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/ReuseFlatTableTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/ReuseFlatTableTest.java index 90da5edf0e..35d180c662 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/ReuseFlatTableTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/ReuseFlatTableTest.java @@ -50,8 +50,8 @@ public class ReuseFlatTableTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SimilarToEscapeFunctionTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SimilarToEscapeFunctionTest.java index 4bffd9acee..de53bb4fa6 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SimilarToEscapeFunctionTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SimilarToEscapeFunctionTest.java @@ -67,8 +67,8 @@ public class SimilarToEscapeFunctionTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SumLCResultTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SumLCResultTest.java index 7cdfd073a0..db42a556f9 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SumLCResultTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SumLCResultTest.java @@ -53,8 +53,8 @@ public class SumLCResultTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/TableIndexTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/TableIndexTest.java index c3a91c4ed7..4a3bf6fa7a 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/TableIndexTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/TableIndexTest.java @@ -46,9 +46,9 @@ public class TableIndexTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { + JobContextUtil.cleanUp(); cleanupTestMetadata(); FileUtils.deleteQuietly(new File("../kylin-it/metastore_db")); - JobContextUtil.cleanUp(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/TimeZoneQueryTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/TimeZoneQueryTest.java index eceb39be21..8cfbdad70d 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/TimeZoneQueryTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/TimeZoneQueryTest.java @@ -99,8 +99,8 @@ public class TimeZoneQueryTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Override diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/QueryLayoutFilterTest.java b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/QueryLayoutFilterTest.java index 868b6db57b..f9620c3e89 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/QueryLayoutFilterTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/QueryLayoutFilterTest.java @@ -86,8 +86,8 @@ public class QueryLayoutFilterTest extends NLocalWithSparkSessionTest { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/kylin-server-it/src/test/java/org/apache/kylin/event/ITStorageCleanerTest.java b/src/kylin-server-it/src/test/java/org/apache/kylin/event/ITStorageCleanerTest.java index 2ea193d385..177290cb03 100644 --- a/src/kylin-server-it/src/test/java/org/apache/kylin/event/ITStorageCleanerTest.java +++ b/src/kylin-server-it/src/test/java/org/apache/kylin/event/ITStorageCleanerTest.java @@ -79,8 +79,8 @@ public class ITStorageCleanerTest extends NLocalWithSparkSessionTest { @After public void tearDown() throws Exception { - this.cleanupTestMetadata(); JobContextUtil.cleanUp(); + this.cleanupTestMetadata(); } @Test diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/config/initialize/ModelBrokenListenerTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/config/initialize/ModelBrokenListenerTest.java index 73d6b16b9b..7fefc33c60 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/config/initialize/ModelBrokenListenerTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/config/initialize/ModelBrokenListenerTest.java @@ -102,9 +102,8 @@ public class ModelBrokenListenerTest extends SourceTestCase { logger.info("ModelBrokenListenerTest cleanup"); EventBusFactory.getInstance().unregister(modelBrokenListener); EventBusFactory.getInstance().restart(); - super.cleanup(); - JobContextUtil.cleanUp(); + super.cleanup(); } private void generateJob(String modelId, String project) { diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/BaseIndexTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/BaseIndexTest.java index 9b5ee39054..9574fb444c 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/BaseIndexTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/BaseIndexTest.java @@ -104,8 +104,8 @@ public class BaseIndexTest extends SourceTestCase { @After public void tearDown() { getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", "false"); - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java index 9a95f4c00f..75761b6732 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java @@ -111,8 +111,8 @@ public class FusionIndexServiceTest extends SourceTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } private UpdateRuleBasedCuboidRequest createUpdateRuleRequest(String project, String modelId, diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceTest.java index a3dc78b8d2..b3e4b39145 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceTest.java @@ -136,8 +136,8 @@ public class FusionModelServiceTest extends SourceTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java index 9cb9d72138..2b298b1191 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java @@ -115,8 +115,8 @@ public class IndexPlanServiceTest extends SourceTestCase { @After public void tearDown() { getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", "false"); - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Before diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java index 8b42b5725b..85df728cda 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java @@ -166,8 +166,8 @@ public class ModelServiceSemanticUpdateTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java index 4c43db6a55..d7f635697c 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java @@ -316,8 +316,8 @@ public class ModelServiceTest extends SourceTestCase { EventBusFactory.getInstance().unregister(eventListener); EventBusFactory.getInstance().unregister(modelBrokenListener); EventBusFactory.getInstance().restart(); - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java index 226eb7a9b5..a0f7244eb2 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java @@ -171,8 +171,8 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java index fe4e053312..a8ec2a6e4b 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java @@ -158,8 +158,8 @@ public class TableReloadServiceTest extends CSVSourceTestCase { } EventBusFactory.getInstance().unregister(modelBrokenListener); EventBusFactory.getInstance().restart(); - super.cleanup(); JobContextUtil.cleanUp(); + super.cleanup(); } @Test diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java index c8773e1c2e..3075b4e94d 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java @@ -217,9 +217,9 @@ public class TableServiceTest extends CSVSourceTestCase { @After public void tearDown() { EventBusFactory.getInstance().unregister(eventListener); + JobContextUtil.cleanUp(); cleanupTestMetadata(); FileUtils.deleteQuietly(new File("metastore_db")); - JobContextUtil.cleanUp(); FileUtils.deleteQuietly(new File("../modeling-service/metastore_db")); } diff --git a/src/second-storage/core/src/test/java/io/kyligence/kap/secondstorage/util/SecondStorageJobUtilTest.java b/src/second-storage/core/src/test/java/io/kyligence/kap/secondstorage/util/SecondStorageJobUtilTest.java new file mode 100644 index 0000000000..c10fd27789 --- /dev/null +++ b/src/second-storage/core/src/test/java/io/kyligence/kap/secondstorage/util/SecondStorageJobUtilTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.kyligence.kap.secondstorage.util; + +import static org.junit.Assert.assertThrows; + +import java.util.Collections; +import java.util.List; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.JobTypeEnum; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ ExecutableManager.class }) +@PowerMockIgnore({ "com.sun.security.*", "org.w3c.*", "javax.xml.*", "org.xml.*", "org.w3c.dom.*", "org.apache.cxf.*", + "javax.management.*", "javax.script.*", "org.apache.hadoop.*", "javax.security.*", "java.security.*", + "javax.crypto.*", "javax.net.ssl.*", "org.apache.kylin.common.asyncprofiler.AsyncProfiler" }) +public class SecondStorageJobUtilTest extends NLocalFileMetadataTestCase { + private ExecutableManager executableManager = Mockito.mock(ExecutableManager.class); + + @Before + public void setUp() throws Exception { + createTestMetadata(); + } + + @After + public void tearDown() throws Exception { + cleanupTestMetadata(); + } + + private void prepareManger() { + PowerMockito.stub(PowerMockito.method(ExecutableManager.class, "getInstance", KylinConfig.class, String.class)) + .toReturn(executableManager); + } + + @Test + public void testValidateModel() { + prepareManger(); + PowerMockito.stub(PowerMockito.method(ExecutableManager.class, "getInstance", KylinConfig.class, String.class)) + .toReturn(executableManager); + + List<String> jobs = Collections.singletonList("job1"); + AbstractExecutable job1 = PowerMockito.mock(AbstractExecutable.class); + PowerMockito.when(job1.getStatus()).thenReturn(ExecutableState.READY, ExecutableState.PENDING, + ExecutableState.RUNNING, ExecutableState.PAUSED, ExecutableState.SUCCEED); + + PowerMockito.when(job1.getJobType()).thenReturn(JobTypeEnum.EXPORT_TO_SECOND_STORAGE); + Mockito.when(job1.getProject()).thenReturn("project"); + Mockito.when(job1.getTargetModelId()).thenReturn("modelId"); + Mockito.when(executableManager.getJobs()).thenReturn(jobs); + Mockito.when(executableManager.getJob("job1")).thenReturn(job1); + + //job status: Ready + assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), KylinException.class, + () -> SecondStorageJobUtil.validateModel("project", "modelId")); + //job status: Pending + assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), KylinException.class, + () -> SecondStorageJobUtil.validateModel("project", "modelId")); + //job status: Running + assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), KylinException.class, + () -> SecondStorageJobUtil.validateModel("project", "modelId")); + //job status: Paused + assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), KylinException.class, + () -> SecondStorageJobUtil.validateModel("project", "modelId")); + //job status: Succeed + SecondStorageJobUtil.validateModel("project", "modelId"); + } +} diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobOnYarnTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobOnYarnTest.java index 653e9e0a1f..9d149deb4b 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobOnYarnTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobOnYarnTest.java @@ -54,8 +54,8 @@ public class NSparkCubingJobOnYarnTest extends NLocalFileMetadataTestCase { @After public void after() throws Exception { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/it/TestModelViewQuery.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/it/TestModelViewQuery.scala index a5a7d969c0..48058bcf89 100644 --- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/it/TestModelViewQuery.scala +++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/it/TestModelViewQuery.scala @@ -21,10 +21,7 @@ import java.sql.SQLException import java.util.TimeZone import org.apache.kylin.common._ -import org.apache.commons.io.IOUtils -import org.apache.commons.lang3.StringUtils -import org.apache.kylin.common.KylinConfig -import org.apache.kylin.common.persistence.{JsonSerializer, RootPersistentEntity} +import org.apache.kylin.common.util.TimeZoneUtils import org.apache.kylin.engine.spark.utils.LogEx import org.apache.kylin.metadata.realization.NoRealizationFoundException import org.apache.kylin.query.QueryFetcher @@ -67,6 +64,7 @@ class TestModelViewQuery overwriteSystemProp("calcite.keep-in-clause", "true") overwriteSystemProp("kylin.dictionary.null-encoding-opt-threshold", "1") overwriteSystemProp("kylin.web.timezone", "GMT+8") + TimeZoneUtils.setDefaultTimeZone(KylinConfig.getInstanceFromEnv) overwriteSystemProp("kylin.query.pushdown.runner-class-name", "") overwriteSystemProp("kylin.query.pushdown-enabled", "false") overwriteSystemProp("kylin.snapshot.parallel-build-enabled", "true") diff --git a/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java b/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java index 6a092af254..d37f3ac2d6 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java @@ -108,8 +108,8 @@ public class AuditLogToolTest extends NLocalFileMetadataTestCase { public void teardown() { val jdbcTemplate = getJdbcTemplate(); jdbcTemplate.batchUpdate("DROP ALL OBJECTS"); - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/tool/src/test/java/org/apache/kylin/tool/JobDiagInfoToolTest.java b/src/tool/src/test/java/org/apache/kylin/tool/JobDiagInfoToolTest.java index 76b7e331d6..3c1d72ad1b 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/JobDiagInfoToolTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/JobDiagInfoToolTest.java @@ -83,8 +83,8 @@ public class JobDiagInfoToolTest extends NLocalFileMetadataTestCase { @After public void tearDown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @After diff --git a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java index 417b4bc272..720eef3719 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java @@ -84,8 +84,8 @@ public class StorageCleanerTest extends NLocalFileMetadataTestCase { @After public void teardown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/tool/src/test/java/org/apache/kylin/tool/SystemUsageToolTest.java b/src/tool/src/test/java/org/apache/kylin/tool/SystemUsageToolTest.java index e018f375f9..8390bdc1f8 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/SystemUsageToolTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/SystemUsageToolTest.java @@ -51,18 +51,18 @@ public class SystemUsageToolTest extends NLocalFileMetadataTestCase { @Before public void setup() throws Exception { + JobContextUtil.cleanUp(); createTestMetadata(); queryHistoryDAO = RDBMSQueryHistoryDAO.getInstance(); - JobContextUtil.cleanUp(); JobContextUtil.getJobInfoDao(getTestConfig()); } @After public void teardown() { queryHistoryDAO.deleteAllQueryHistory(); - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/tool/src/test/java/org/apache/kylin/tool/YarnApplicationToolTest.java b/src/tool/src/test/java/org/apache/kylin/tool/YarnApplicationToolTest.java index 1651fe3a21..5031867c2e 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/YarnApplicationToolTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/YarnApplicationToolTest.java @@ -69,8 +69,8 @@ public class YarnApplicationToolTest extends NLocalFileMetadataTestCase { @After public void teardown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java b/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java index 9a8bb67bdc..29a57a9355 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java @@ -48,8 +48,8 @@ public class ExecutableCleanerTest extends NLocalFileMetadataTestCase { @After public void destroy() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test diff --git a/src/tool/src/test/java/org/apache/kylin/tool/upgrade/MigrateJobToolTest.java b/src/tool/src/test/java/org/apache/kylin/tool/upgrade/MigrateJobToolTest.java index 54d4546767..1db75d7455 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/upgrade/MigrateJobToolTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/upgrade/MigrateJobToolTest.java @@ -47,8 +47,8 @@ public class MigrateJobToolTest extends NLocalFileMetadataTestCase { @After public void teardown() { - cleanupTestMetadata(); JobContextUtil.cleanUp(); + cleanupTestMetadata(); } @Test