This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 132f4367b1220dac0206e0054f4c4bf9c52dbca4
Author: Zhiting Guo <35057824+fre...@users.noreply.github.com>
AuthorDate: Mon Oct 30 15:01:10 2023 +0800

    KYLIN-5862 Fix jobScheduler when resourceGroup is enabled
    
    Co-authored-by: Xuecheng Shan <xuecheng.s...@kyligence.io>
    Co-authored-by: Zhiting Guo <zhiting....@kyligence.io>
---
 .../kap/secondstorage/SecondStorageUtil.java       |  2 +-
 .../apache/kylin/rest/service/ProjectService.java  |  9 +-
 .../config/initialize/MetricsRegistryTest.java     |  2 +-
 .../org/apache/kylin/common/KylinConfigBase.java   | 10 +--
 .../src/main/resources/metadata-jdbc-h2.properties |  3 +-
 .../apache/kylin/job/config/JobMybatisConfig.java  | 25 ++++--
 .../kylin/job/config/JobTableInterceptor.java      | 29 +++----
 .../java/org/apache/kylin/job/dao/JobInfoDao.java  | 16 +++-
 .../java/org/apache/kylin/job/domain/JobLock.java  |  5 +-
 .../org/apache/kylin/job/mapper/JobLockMapper.java |  3 +-
 .../kylin/job/scheduler/JdbcJobScheduler.java      | 16 ++--
 .../org/apache/kylin/job/util/JobContextUtil.java  | 89 +++++--------------
 .../org/apache/kylin/job/util/JobInfoUtil.java     |  6 +-
 .../resources/mybatis-mapper/JobLockMapper.xml     | 20 ++++-
 .../main/resources/script/schema_job_info_h2.sql   |  4 +-
 .../resources/script/schema_job_info_mysql.sql     |  4 +-
 .../script/schema_job_info_postgresql.sql          |  2 +-
 .../main/resources/script/schema_job_lock_h2.sql   |  3 +-
 .../resources/script/schema_job_lock_mysql.sql     |  3 +-
 .../script/schema_job_lock_postgresql.sql          |  1 +
 .../kylin/job/execution/JobMailUtilTest.java       |  2 +-
 .../job/impl/threadpool/BaseSchedulerTest.java     |  2 +-
 .../kylin/job/scheduler/JdbcJobSchedulerTest.java  | 14 +--
 .../org/apache/kylin/mapper/JobLockMapperTest.java |  4 +-
 .../apache/kylin/metadata/epoch/EpochManager.java  | 19 ++++-
 .../metadata/favorite/FavoriteRuleManager.java     |  4 +
 .../kylin/metadata/epoch/EpochManagerTest.java     | 41 +++++++++
 .../kylin/rest/controller/JobControllerTest.java   |  2 +-
 .../org/apache/kylin/job/util/JobFilterUtil.java   |  5 +-
 .../kylin/job/service/JobInfoServiceTest.java      |  2 +-
 .../rest/service/FusionModelServiceBuildTest.java  |  2 +-
 .../apache/kylin/rest/service/JobErrorTest.java    |  2 +-
 .../kylin/rest/service/JobResourceServiceTest.java |  2 +-
 .../kylin/rest/service/ModelServiceBuildTest.java  |  2 +-
 .../kylin/rest/service/SnapshotServiceTest.java    |  2 +-
 .../org/apache/kylin/rest/service/StageTest.java   |  2 +-
 .../rest/service/TableSamplingServiceTest.java     |  2 +-
 .../newten/BuildAndQueryEmptySegmentsTest.java     |  2 +-
 .../org/apache/kylin/newten/CharNColumnTest.java   |  2 +-
 .../kylin/newten/EnhancedAggPushDownTest.java      |  2 +-
 .../org/apache/kylin/newten/ExactlyMatchTest.java  |  2 +-
 .../kylin/newten/MultiPartitionPruningTest.java    |  2 +-
 .../org/apache/kylin/newten/NAggPushDownTest.java  |  2 +-
 .../apache/kylin/newten/NBitmapFunctionTest.java   |  2 +-
 .../kylin/newten/NBuildAndQuerySnapshotTest.java   |  2 +-
 .../apache/kylin/newten/NComputedColumnTest.java   |  2 +-
 .../newten/NCountDistinctWithoutEncodeTest.java    |  2 +-
 .../org/apache/kylin/newten/NFilePruningTest.java  |  2 +-
 .../apache/kylin/newten/NFilePruningV2Test.java    |  2 +-
 .../newten/NFlattableJoinWithoutLookupTest.java    |  2 +-
 .../java/org/apache/kylin/newten/NJoinOptTest.java |  2 +-
 .../newten/NManualBuildAndQueryCuboidTest.java     |  2 +-
 .../org/apache/kylin/newten/NMatchingTest.java     |  2 +-
 .../kylin/newten/NMultiPartitionJobTest.java       |  2 +-
 .../kylin/newten/NMultipleColumnsInTest.java       |  2 +-
 .../kylin/newten/NOptIntersectCountTest.java       |  2 +-
 .../apache/kylin/newten/NPartitionColumnTest.java  |  2 +-
 .../org/apache/kylin/newten/NTopNResultTest.java   |  2 +-
 .../apache/kylin/newten/NTopNWithChineseTest.java  |  2 +-
 .../apache/kylin/newten/ReuseFlatTableTest.java    |  2 +-
 .../kylin/newten/SimilarToEscapeFunctionTest.java  |  2 +-
 .../org/apache/kylin/newten/SumLCResultTest.java   |  2 +-
 .../org/apache/kylin/newten/TableIndexTest.java    |  2 +-
 .../org/apache/kylin/newten/TimeZoneQueryTest.java |  2 +-
 .../kylin/query/routing/QueryLayoutFilterTest.java |  2 +-
 .../apache/kylin/event/ITStorageCleanerTest.java   |  2 +-
 .../config/initialize/ModelBrokenListenerTest.java |  3 +-
 .../apache/kylin/rest/service/BaseIndexTest.java   |  2 +-
 .../kylin/rest/service/FusionIndexServiceTest.java |  2 +-
 .../kylin/rest/service/FusionModelServiceTest.java |  2 +-
 .../kylin/rest/service/IndexPlanServiceTest.java   |  2 +-
 .../service/ModelServiceSemanticUpdateTest.java    |  2 +-
 .../kylin/rest/service/ModelServiceTest.java       |  2 +-
 .../kylin/rest/service/ProjectServiceTest.java     |  2 +-
 .../kylin/rest/service/TableReloadServiceTest.java |  2 +-
 .../kylin/rest/service/TableServiceTest.java       |  2 +-
 .../util/SecondStorageJobUtilTest.java             | 99 ++++++++++++++++++++++
 .../spark/job/NSparkCubingJobOnYarnTest.java       |  2 +-
 .../org/apache/kylin/it/TestModelViewQuery.scala   |  6 +-
 .../org/apache/kylin/tool/AuditLogToolTest.java    |  2 +-
 .../org/apache/kylin/tool/JobDiagInfoToolTest.java |  2 +-
 .../org/apache/kylin/tool/StorageCleanerTest.java  |  2 +-
 .../org/apache/kylin/tool/SystemUsageToolTest.java |  4 +-
 .../apache/kylin/tool/YarnApplicationToolTest.java |  2 +-
 .../kylin/tool/garbage/ExecutableCleanerTest.java  |  2 +-
 .../kylin/tool/upgrade/MigrateJobToolTest.java     |  2 +-
 86 files changed, 364 insertions(+), 199 deletions(-)

diff --git 
a/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/SecondStorageUtil.java
 
b/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/SecondStorageUtil.java
index ce3eb656b2..5b92f57d36 100644
--- 
a/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/SecondStorageUtil.java
+++ 
b/outdated/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/SecondStorageUtil.java
@@ -91,7 +91,7 @@ import 
io.kyligence.kap.secondstorage.response.SecondStorageNode;
 // CALL FROM CORE
 public class SecondStorageUtil {
     public static final Set<ExecutableState> RUNNING_STATE = 
ImmutableSet.of(ExecutableState.RUNNING,
-            ExecutableState.READY, ExecutableState.PAUSED);
+            ExecutableState.READY, ExecutableState.PAUSED, 
ExecutableState.PENDING);
     public static final Set<JobTypeEnum> RELATED_JOBS = 
ImmutableSet.of(JobTypeEnum.INDEX_BUILD,
             JobTypeEnum.INDEX_REFRESH, JobTypeEnum.INC_BUILD, 
JobTypeEnum.INDEX_MERGE,
             JobTypeEnum.EXPORT_TO_SECOND_STORAGE, 
JobTypeEnum.SECOND_STORAGE_REFRESH_SECONDARY_INDEXES);
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index 1b00f17b8f..151fc595bd 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -96,6 +96,7 @@ import 
org.apache.kylin.metadata.cube.storage.ProjectStorageInfoCollector;
 import org.apache.kylin.metadata.cube.storage.StorageInfoEnum;
 import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.favorite.FavoriteRuleManager;
+import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
@@ -650,7 +651,7 @@ public class ProjectService extends BasicService {
         response.setJdbcSourceDriver(config.getJdbcDriver());
 
         
response.setOverrideKylinProps(projectInstance.getOverrideKylinProps());
-        
+
         Pair<String, String> infos = KylinVersion.getGitCommitInfo();
         response.setGitCommit(infos.getFirst());
         
response.setPackageVersion(KylinVersion.getCurrentVersion().toString());
@@ -968,10 +969,16 @@ public class ProjectService extends BasicService {
 
         NProjectManager prjManager = getManager(NProjectManager.class);
         prjManager.forceDropProject(project);
+        UnitOfWork.get().doAfterUpdate(() -> 
deleteProjectRelatedMeta(project));
         UnitOfWork.get().doAfterUnit(() -> new 
ProjectDropListener().onDelete(project, clusterManager, headers));
         EventBusFactory.getInstance().postAsync(new 
SourceUsageUpdateNotifier());
     }
 
+    private void deleteProjectRelatedMeta(String project) {
+        // delete query history id offset
+        QueryHistoryIdOffsetManager.getInstance(project).delete();
+    }
+
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or 
hasPermission(#project, 'ADMINISTRATION')")
     @Transaction(project = 0)
     public void updateDefaultDatabase(String project, String defaultDatabase) {
diff --git 
a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
 
b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
index 80322c83ff..1f911f2fdd 100644
--- 
a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
+++ 
b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
@@ -124,8 +124,8 @@ public class MetricsRegistryTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e7fc037491..04a39029ff 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1265,7 +1265,7 @@ public abstract class KylinConfigBase implements 
Serializable {
     }
 
     public Integer getSchedulerPollIntervalSecond() {
-        return 
Integer.parseInt(getOptional("kylin.job.scheduler.poll-interval-second", "30"));
+        return 
Integer.parseInt(getOptional("kylin.job.scheduler.poll-interval-second", "10"));
     }
 
     public boolean isFlatTableJoinWithoutLookup() {
@@ -2349,7 +2349,7 @@ public abstract class KylinConfigBase implements 
Serializable {
     // 
============================================================================
 
     private boolean isMicroService() {
-        return Boolean.parseBoolean(this.getOptional("kylin.micro.service", 
TRUE));
+        return Boolean.parseBoolean(this.getOptional("kylin.micro.service", 
FALSE));
     }
 
     public String getServerMode() {
@@ -3956,11 +3956,11 @@ public abstract class KylinConfigBase implements 
Serializable {
     }
 
     public long getJobSchedulerMasterPollIntervalSec() {
-        return 
Long.parseLong(this.getOptional("kylin.job.master-poll-interval-second", "30"));
+        return 
Long.parseLong(this.getOptional("kylin.job.master-poll-interval-second", "10"));
     }
 
     public int getJobSchedulerMasterPollBatchSize() {
-        return 
Integer.parseInt(this.getOptional("kylin.job.master-pull-batch-size", "10"));
+        return 
Integer.parseInt(this.getOptional("kylin.job.master-pull-batch-size", "30"));
     }
 
     public long getJobSchedulerJobRenewalSec() {
@@ -3972,7 +3972,7 @@ public abstract class KylinConfigBase implements 
Serializable {
     }
 
     public int getJobSchedulerSlavePollBatchSize() {
-        return 
Integer.parseInt(this.getOptional("kylin.job.slave-pull-batch-size", "5"));
+        return 
Integer.parseInt(this.getOptional("kylin.job.slave-pull-batch-size", "20"));
     }
 
     public int getParallelJobCountThreshold() {
diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties 
b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
index dd8d7dc3c2..83edd636ef 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
@@ -157,7 +157,7 @@ create.job.info.table=CREATE TABLE IF NOT EXISTS `%s` ( \
   `job_status` varchar(50) NOT NULL, \
   `project` varchar(100) NOT NULL, \
   `subject` varchar(200) NOT NULL, \
-  `model_id` varchar(100) NOT NULL, \
+  `model_id` varchar(200) NOT NULL, \
   `priority` integer DEFAULT 3, \
   `job_content` longblob NOT NULL, \
   `create_time` bigint, \
@@ -169,6 +169,7 @@ create.job.info.table=CREATE TABLE IF NOT EXISTS `%s` ( \
 
 create.job.lock.table=CREATE TABLE IF NOT EXISTS `%s` ( \
   `id` bigint(10) NOT NULL AUTO_INCREMENT,\
+  `project` varchar(100) NOT NULL,\
   `lock_id` varchar(100) NOT NULL COMMENT 'what is locked', \
   `lock_node` varchar(50) DEFAULT NULL COMMENT 'who locked it', \
   `lock_expire_time` datetime DEFAULT NULL COMMENT 'when does the lock 
expire', \
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/config/JobMybatisConfig.java 
b/src/core-job/src/main/java/org/apache/kylin/job/config/JobMybatisConfig.java
index a92b71ae26..de1e1a8140 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/config/JobMybatisConfig.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/config/JobMybatisConfig.java
@@ -37,6 +37,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.metadata.JdbcDataSource;
 import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
+import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.job.condition.JobModeCondition;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.context.annotation.Conditional;
@@ -59,8 +60,8 @@ public class JobMybatisConfig implements InitializingBean {
 
     private String database;
 
-    public static String JOB_INFO_TABLE = "job_info";
-    public static String JOB_LOCK_TABLE = "job_lock";
+    private String jobInfoTableName = "job_info";
+    private String jobLockTableName = "job_lock";
 
     public String getDatabase() {
         return database;
@@ -71,6 +72,14 @@ public class JobMybatisConfig implements InitializingBean {
         setupJobTables();
     }
 
+    public String getJobInfoTableName() {
+        return jobInfoTableName;
+    }
+
+    public String getJobLockTableName() {
+        return jobLockTableName;
+    }
+
     public void setupJobTables() throws Exception {
         val url = KylinConfig.getInstanceFromEnv().getMetadataUrl();
         val props = JdbcUtil.datasourceParameters(url);
@@ -80,9 +89,13 @@ public class JobMybatisConfig implements InitializingBean {
         if (StringUtils.isEmpty(url.getScheme())) {
             log.info("metadata from file");
             keIdentified = "file";
+            if (KylinConfig.getInstanceFromEnv().isUTEnv()) {
+                String uuid = RandomUtil.randomUUIDStr().replace("-", "_");
+                keIdentified = "UT_" + uuid;
+            }
         }
-        JOB_INFO_TABLE = keIdentified + "_job_info";
-        JOB_LOCK_TABLE = keIdentified + "_job_lock";
+        jobInfoTableName = keIdentified + "_job_info";
+        jobLockTableName = keIdentified + "_job_lock";
         database = Database.MYSQL.databaseId;
 
         String jobInfoFile = "script/schema_job_info_mysql.sql";
@@ -117,11 +130,11 @@ public class JobMybatisConfig implements InitializingBean 
{
             }
         }
         try {
-            if (!isTableExists(dataSource.getConnection(), JOB_INFO_TABLE)) {
+            if (!isTableExists(dataSource.getConnection(), jobInfoTableName)) {
                 createTableIfNotExist(keIdentified, jobInfoFile);
             }
 
-            if (!isTableExists(dataSource.getConnection(), JOB_LOCK_TABLE)) {
+            if (!isTableExists(dataSource.getConnection(), jobLockTableName)) {
                 createTableIfNotExist(keIdentified, jobLockFile);
             }
         } catch (SQLException | IOException e) {
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/config/JobTableInterceptor.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/config/JobTableInterceptor.java
index 4dfdb6f25e..5beb14bc39 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/config/JobTableInterceptor.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/config/JobTableInterceptor.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.job.config;
 
-import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Objects;
 
@@ -69,11 +68,11 @@ public class JobTableInterceptor implements Interceptor {
     @Override
     public Object intercept(Invocation invocation) throws Throwable {
 
-        Object target = invocation.getTarget();
-        Method method = invocation.getMethod();
         Object[] args = invocation.getArgs();
 
-        if (JobMybatisConfig.JOB_INFO_TABLE == null || 
JobMybatisConfig.JOB_LOCK_TABLE == null) {
+        String jobInfoTableName = jobMybatisConfig.getJobInfoTableName();
+        String jobLockTableName = jobMybatisConfig.getJobLockTableName();
+        if (jobInfoTableName == null || jobLockTableName == null) {
             logger.info("mybatis table not init, skip");
             return null;
         }
@@ -95,24 +94,24 @@ public class JobTableInterceptor implements Interceptor {
         }
         if (args[1] == null) {
             MapperMethod.ParamMap map = new MapperMethod.ParamMap();
-            map.put("jobLockTable", JobMybatisConfig.JOB_LOCK_TABLE);
-            map.put("jobInfoTable", JobMybatisConfig.JOB_INFO_TABLE);
+            map.put("jobLockTable", jobLockTableName);
+            map.put("jobInfoTable", jobInfoTableName);
             map.put("database", database);
             invocation.getArgs()[1] = map;
-        } else if (args[1].getClass() == MapperMethod.ParamMap.class) {
+        } else if (args[1] instanceof MapperMethod.ParamMap) {
             MapperMethod.ParamMap map = (MapperMethod.ParamMap) args[1];
-            map.put("jobLockTable", JobMybatisConfig.JOB_LOCK_TABLE);
-            map.put("jobInfoTable", JobMybatisConfig.JOB_INFO_TABLE);
+            map.put("jobLockTable", jobLockTableName);
+            map.put("jobInfoTable", jobInfoTableName);
             map.put("database", database);
-        } else if (args[1].getClass() == JobMapperFilter.class) {
+        } else if (args[1] instanceof JobMapperFilter) {
             JobMapperFilter mapperFilter = (JobMapperFilter) args[1];
-            mapperFilter.setJobInfoTable(JobMybatisConfig.JOB_INFO_TABLE);
-        } else if (args[1].getClass() == JobInfo.class) {
+            mapperFilter.setJobInfoTable(jobInfoTableName);
+        } else if (args[1] instanceof JobInfo) {
             JobInfo jobInfo = (JobInfo) args[1];
-            jobInfo.setJobInfoTable(JobMybatisConfig.JOB_INFO_TABLE);
-        } else if (args[1].getClass() == JobLock.class) {
+            jobInfo.setJobInfoTable(jobInfoTableName);
+        } else if (args[1] instanceof JobLock) {
             JobLock jobLock = (JobLock) args[1];
-            jobLock.setJobLockTable(JobMybatisConfig.JOB_LOCK_TABLE);
+            jobLock.setJobLockTable(jobLockTableName);
             jobLock.setDatabase(database);
         } else {
             logger.error("miss type of param {}", args[1].getClass());
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java
index 94b53737df..dce995d971 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java
@@ -19,6 +19,7 @@ package org.apache.kylin.job.dao;
 
 import static org.apache.kylin.job.util.JobInfoUtil.JOB_SERIALIZER;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -29,11 +30,12 @@ import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
+import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.job.config.JobMybatisConfig;
 import org.apache.kylin.job.domain.JobInfo;
 import org.apache.kylin.job.domain.JobLock;
+import org.apache.kylin.job.exception.ExecuteRuntimeException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -196,7 +198,7 @@ public class JobInfoDao {
         jobInfo.setModelId(executablePO.getTargetModel());
         jobInfo.setCreateTime(executablePO.getCreateTime());
         jobInfo.setUpdateTime(executablePO.getLastModified());
-        jobInfo.setJobContent(JobInfoUtil.serializeExecutablePO(executablePO));
+        
jobInfo.setJobContent(checkAndCompressJobContent(JobInfoUtil.serializeExecutablePO(executablePO)));
 
         ExecutableManager executableManager = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(),
                 executablePO.getProject());
@@ -207,6 +209,14 @@ public class JobInfoDao {
         return jobInfo;
     }
 
+    private byte[] checkAndCompressJobContent(byte[] jobContent) {
+        try {
+            return CompressionUtils.compress(jobContent);
+        } catch (IOException e) {
+            throw new ExecuteRuntimeException("Compress job content failed.", 
e);
+        }
+    }
+
     public void deleteJobsByProject(String project) {
         int count = jobInfoMapper.deleteByProject(project);
         logger.info("delete {} jobs for project {}", count, project);
@@ -222,8 +232,8 @@ public class JobInfoDao {
                 jobInfoMapper.deleteByProject(project);
             }
             for (JobInfo jobInfo : jobInfos) {
+                
jobInfo.setJobContent(checkAndCompressJobContent(jobInfo.getJobContent()));
                 JobInfo currentJobInfo = 
jobInfoMapper.selectByJobId(jobInfo.getJobId());
-                jobInfo.setJobInfoTable(JobMybatisConfig.JOB_INFO_TABLE);
                 if (currentJobInfo == null) {
                     jobInfoMapper.insert(jobInfo);
                 } else {
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java 
b/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java
index 7083c50159..1d41510ba3 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/domain/JobLock.java
@@ -34,6 +34,8 @@ public class JobLock {
 
     private String lockId;
 
+    private String project;
+
     private String lockNode;
 
     private Date lockExpireTime;
@@ -49,8 +51,9 @@ public class JobLock {
 
     private String database;
 
-    public JobLock(String lockId, int priority) {
+    public JobLock(String lockId, String project, int priority) {
         this.lockId = lockId;
+        this.project = project;
         this.priority = priority;
         this.createTime = System.currentTimeMillis();
         this.updateTime = System.currentTimeMillis();
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java 
b/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java
index c0b3c4f785..89dba3ebe7 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/mapper/JobLockMapper.java
@@ -51,7 +51,8 @@ public interface JobLockMapper {
 
     int batchRemoveLock(@Param("jobIdList") List<String> jobIdList);
 
-    List<PriorityFistRandomOrderJob> findNonLockIdList(@Param("batchSize") int 
batchSize);
+    List<PriorityFistRandomOrderJob> findNonLockIdList(@Param("batchSize") int 
batchSize,
+            @Param("projects") List<String> projects);
 
     List<String> findExpiredORNonLockIdList(@Param("batchSize") int batchSize);
 
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java
index 0d453c2844..f4940f68ec 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java
@@ -32,6 +32,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.constant.LogConstant;
 import org.apache.kylin.common.logging.SetLogCategory;
+import org.apache.kylin.common.util.ExecutorServiceUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.ThreadUtils;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
@@ -53,6 +54,7 @@ import org.apache.kylin.job.runners.JobCheckUtil;
 import org.apache.kylin.job.util.JobContextUtil;
 import org.apache.kylin.job.util.JobInfoUtil;
 import org.apache.kylin.metadata.cube.utils.StreamingUtils;
+import org.apache.kylin.metadata.epoch.EpochManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -157,7 +159,7 @@ public class JdbcJobScheduler implements JobScheduler {
         }
 
         if (Objects.nonNull(executorPool)) {
-            executorPool.shutdownNow();
+            ExecutorServiceUtil.shutdownGracefully(executorPool, 60);
         }
     }
 
@@ -165,7 +167,7 @@ public class JdbcJobScheduler implements JobScheduler {
         // init master lock
         try {
             if 
(jobContext.getJobLockMapper().selectByJobId(JobScheduler.MASTER_SCHEDULER) == 
null) {
-                jobContext.getJobLockMapper().insertSelective(new 
JobLock(JobScheduler.MASTER_SCHEDULER, 0));
+                jobContext.getJobLockMapper().insertSelective(new 
JobLock(JobScheduler.MASTER_SCHEDULER, "_global", 0));
             }
         } catch (Exception e) {
             logger.error("Try insert 'master_scheduler' failed.", e);
@@ -218,7 +220,7 @@ public class JdbcJobScheduler implements JobScheduler {
                     JobLock lock = 
jobContext.getJobLockMapper().selectByJobId(jobId);
                     JobInfo jobInfo = 
jobContext.getJobInfoMapper().selectByJobId(jobId);
                     if (lock == null && jobContext.getJobLockMapper()
-                            .insertSelective(new JobLock(jobId, 
jobInfo.getPriority())) == 0) {
+                            .insertSelective(new JobLock(jobId, 
jobInfo.getProject(), jobInfo.getPriority())) == 0) {
                         logger.error("Create job lock for [{}] failed!", 
jobId);
                         return null;
                     }
@@ -276,7 +278,8 @@ public class JdbcJobScheduler implements JobScheduler {
             if (exeFreeSlots < batchSize) {
                 batchSize = exeFreeSlots;
             }
-            List<String> jobIdList = findNonLockIdListInOrder(batchSize);
+            List<String> projects = 
EpochManager.getInstance().listProjectWithPermissionForScheduler();
+            List<String> jobIdList = findNonLockIdListInOrder(batchSize, 
projects);
 
             if (CollectionUtils.isEmpty(jobIdList)) {
                 return;
@@ -313,8 +316,9 @@ public class JdbcJobScheduler implements JobScheduler {
         }
     }
 
-    public List<String> findNonLockIdListInOrder(int batchSize) {
-        List<PriorityFistRandomOrderJob> jobIdList = 
jobContext.getJobLockMapper().findNonLockIdList(batchSize);
+    public List<String> findNonLockIdListInOrder(int batchSize, List<String> 
projects) {
+        List<PriorityFistRandomOrderJob> jobIdList = 
jobContext.getJobLockMapper().findNonLockIdList(batchSize,
+                projects);
         // Shuffle jobs avoiding jobLock conflict.
         // At the same time, we should ensure the overall order.
         if (hasRunningJob()) {
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java 
b/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java
index 94b3966d92..c1aadc1dca 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java
@@ -18,28 +18,19 @@
 
 package org.apache.kylin.job.util;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.nio.charset.Charset;
-import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 
 import javax.sql.DataSource;
 
-import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ibatis.builder.xml.XMLMapperBuilder;
-import org.apache.ibatis.jdbc.ScriptRunner;
 import org.apache.ibatis.mapping.Environment;
 import org.apache.ibatis.plugin.Interceptor;
 import org.apache.ibatis.session.Configuration;
@@ -49,7 +40,6 @@ import org.apache.ibatis.transaction.TransactionFactory;
 import org.apache.ibatis.type.JdbcType;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
-import org.apache.kylin.common.logging.LogOutputStream;
 import org.apache.kylin.common.persistence.metadata.JdbcDataSource;
 import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
 import org.apache.kylin.common.util.AddressUtil;
@@ -68,6 +58,7 @@ import 
org.mybatis.spring.transaction.SpringManagedTransactionFactory;
 import org.springframework.core.io.Resource;
 import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
 import org.springframework.core.io.support.ResourcePatternResolver;
+import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 
 import lombok.SneakyThrows;
@@ -80,12 +71,6 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class JobContextUtil {
 
-    private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();
-
-    private static final String CREATE_JOB_INFO_TABLE = 
"create.job.info.table";
-
-    private static final String CREATE_JOB_LOCK_TABLE = 
"create.job.lock.table";
-
     private static JobInfoMapper jobInfoMapper;
 
     private static JobLockMapper jobLockMapper;
@@ -135,43 +120,30 @@ public class JobContextUtil {
         if (null != jobInfoMapper && null != jobLockMapper) {
             return;
         }
-        boolean isUTEnv = config.isUTEnv();
-
         StorageURL url = config.getMetadataUrl();
         Properties props = JdbcUtil.datasourceParameters(url);
-        DataSource dataSource = null;
         try {
-            dataSource = JdbcDataSource.getDataSource(props);
-            if (!isUTEnv) {
-                // setup job table names
-                jobMybatisConfig.setDataSource(dataSource);
-                jobMybatisConfig.setupJobTables();
-                jobTableInterceptor.setJobMybatisConfig(jobMybatisConfig);
-            }
+            DataSource dataSource = JdbcDataSource.getDataSource(props);
             transactionManager = 
JdbcDataSource.getTransactionManager(dataSource);
             SqlSessionFactory sqlSessionFactory = 
getSqlSessionFactory(dataSource);
-            addPluginForSqlSessionManager(sqlSessionFactory);
+            addPluginForSqlSessionManager(dataSource, sqlSessionFactory);
             sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory);
             jobInfoMapper = sqlSessionTemplate.getMapper(JobInfoMapper.class);
             jobLockMapper = sqlSessionTemplate.getMapper(JobLockMapper.class);
-            if (isUTEnv) {
-                createTableIfNotExist((BasicDataSource) dataSource, 
"job_info");
-                createTableIfNotExist((BasicDataSource) dataSource, 
"job_lock");
-                jobInfoMapper.deleteAllJob();
-                jobLockMapper.deleteAllJobLock();
-            }
         } catch (Exception e) {
             throw new RuntimeException("initialize mybatis mappers failed", e);
         }
     }
 
-    private static void addPluginForSqlSessionManager(SqlSessionFactory 
sqlSessionFactory) {
+    private static void addPluginForSqlSessionManager(DataSource dataSource, 
SqlSessionFactory sqlSessionFactory)
+            throws Exception {
+        jobMybatisConfig.setDataSource(dataSource);
+        jobMybatisConfig.setupJobTables();
+        jobTableInterceptor.setJobMybatisConfig(jobMybatisConfig);
         List<Interceptor> interceptors = 
sqlSessionFactory.getConfiguration().getInterceptors();
-
         if (!interceptors.contains(jobTableInterceptor)) {
             
sqlSessionFactory.getConfiguration().addInterceptor(jobTableInterceptor);
         }
-
     }
 
     public static SqlSessionFactory getSqlSessionFactory(DataSource 
dataSource) throws SQLException, IOException {
@@ -199,39 +171,6 @@ public class JobContextUtil {
         }
     }
 
-    private static void createTableIfNotExist(BasicDataSource dataSource, 
String tableName)
-            throws IOException, SQLException {
-        if (JdbcUtil.isTableExists(dataSource.getConnection(), tableName)) {
-            log.info("{} already existed in database", tableName);
-            return;
-        }
-
-        String createTableStmtProp = "";
-        if ("job_info".equals(tableName)) {
-            createTableStmtProp = CREATE_JOB_INFO_TABLE;
-        } else {
-            createTableStmtProp = CREATE_JOB_LOCK_TABLE;
-        }
-
-        Properties properties = JdbcUtil.getProperties(dataSource);
-        String createTableStmt = String.format(Locale.ROOT, 
properties.getProperty(createTableStmtProp), tableName);
-        try (Connection connection = dataSource.getConnection()) {
-            ScriptRunner sr = new ScriptRunner(connection);
-            sr.setLogWriter(new PrintWriter(new OutputStreamWriter(new 
LogOutputStream(log), DEFAULT_CHARSET)));
-            log.debug("start to create table({})", tableName);
-            sr.runScript(new InputStreamReader(new 
ByteArrayInputStream(createTableStmt.getBytes(DEFAULT_CHARSET)),
-                    DEFAULT_CHARSET));
-            log.debug("create table finished");
-        }
-
-        if (!JdbcUtil.isTableExists(dataSource.getConnection(), tableName)) {
-            log.debug("failed to create table({})", tableName);
-            throw new IllegalStateException(String.format(Locale.ROOT, "create 
table(%s) failed", tableName));
-        } else {
-            log.debug("table({}) already exists.", tableName);
-        }
-    }
-
     public static JobInfoDao getJobInfoDao(KylinConfig config) {
         if (config.isUTEnv() || isNoSpringContext()) {
             return getJobInfoDaoForTestOrTool(config);
@@ -269,6 +208,7 @@ public class JobContextUtil {
             if (null != jobContext) {
                 jobContext.destroy();
             }
+            dropUTJobTable();
             jobInfoMapper = null;
             jobLockMapper = null;
             jobInfoDao = null;
@@ -281,6 +221,17 @@ public class JobContextUtil {
         }
     }
 
+    private static void dropUTJobTable() {
+        try {
+            if (null != transactionManager) {
+                JdbcTemplate jdbcTemplate = new 
JdbcTemplate(transactionManager.getDataSource());
+                jdbcTemplate.execute("DROP ALL OBJECTS");
+            }
+        } catch (Exception e) {
+            log.error("Drop UT job table failed.", e);
+        }
+    }
+
     // for test only
     public static boolean hasStarted() {
         return jobContext != null;
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java 
b/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java
index 3b84a0284e..76b03350c9 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/util/JobInfoUtil.java
@@ -22,9 +22,11 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.zip.DataFormatException;
 
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.guava30.shaded.common.io.ByteSource;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.domain.JobInfo;
@@ -48,10 +50,10 @@ public class JobInfoUtil {
     }
 
     public static ExecutablePO deserializeExecutablePO(JobInfo jobInfo) {
-        ByteSource byteSource = ByteSource.wrap(jobInfo.getJobContent());
         try {
+            ByteSource byteSource = 
ByteSource.wrap(CompressionUtils.decompress(jobInfo.getJobContent()));
             return deserializeExecutablePO(byteSource, 
jobInfo.getUpdateTime(), jobInfo.getProject());
-        } catch (IOException e) {
+        } catch (IOException | DataFormatException e) {
             log.warn("Error when deserializing jobInfo, id: {} " + 
jobInfo.getJobId(), e);
             return null;
         }
diff --git a/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml 
b/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml
index 528f7ea425..95a6cd77ba 100644
--- a/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml
+++ b/src/core-job/src/main/resources/mybatis-mapper/JobLockMapper.xml
@@ -4,6 +4,7 @@
   <resultMap id="BaseResultMap" type="org.apache.kylin.job.domain.JobLock">
     <id column="id" jdbcType="BIGINT" property="id" />
     <result column="lock_id" jdbcType="VARCHAR" property="lockId" />
+    <result column="project" jdbcType="VARCHAR" property="project" />
     <result column="lock_node" jdbcType="VARCHAR" property="lockNode" />
     <result column="lock_expire_time" jdbcType="TIMESTAMP" 
property="lockExpireTime" />
     <result column="priority" jdbcType="INTEGER" property="priority" />
@@ -37,10 +38,10 @@
     delete from ${jobLockTable}
   </delete>
   <insert id="insert" parameterType="org.apache.kylin.job.domain.JobLock">
-    insert into ${jobLockTable} (id, lock_id, lock_node, 
+    insert into ${jobLockTable} (id, lock_id, project, lock_node,
       lock_expire_time, priority, create_time, update_time
       )
-    values (#{id,jdbcType=BIGINT}, #{lockId,jdbcType=VARCHAR}, 
#{lockNode,jdbcType=VARCHAR}, 
+    values (#{id,jdbcType=BIGINT}, #{lockId,jdbcType=VARCHAR}, 
#{project,jdbcType=VARCHAR}, #{lockNode,jdbcType=VARCHAR},
       #{lockExpireTime,jdbcType=TIMESTAMP}, #{priority,jdbcType=INTEGER}, 
#{createTime,jdbcType=BIGINT}, #{updateTime,jdbcType=BIGINT}
       )
   </insert>
@@ -53,6 +54,9 @@
       <if test="lockId != null">
         lock_id,
       </if>
+      <if test="project != null">
+        project,
+      </if>
       <if test="lockNode != null">
         lock_node,
       </if>
@@ -76,6 +80,9 @@
       <if test="lockId != null">
         #{lockId,jdbcType=VARCHAR},
       </if>
+      <if test="project != null">
+        #{project,jdbcType=VARCHAR},
+      </if>
       <if test="lockNode != null">
         #{lockNode,jdbcType=VARCHAR},
       </if>
@@ -134,7 +141,14 @@
   <select id="findNonLockIdList" resultMap="PriorityFistRandomOrderJob">
     SELECT lock_id, priority
     FROM ${jobLockTable}
-    WHERE lock_node IS NULL OR lock_expire_time <![CDATA[<]]> CURRENT_TIMESTAMP
+    WHERE
+    <if test="projects != null">
+      <foreach close=")" collection="projects" index="index" item="item" 
open="project in (" separator=",">
+        #{item}
+      </foreach>
+      AND
+    </if>
+    (lock_node IS NULL OR lock_expire_time <![CDATA[<]]> CURRENT_TIMESTAMP)
     ORDER BY priority ASC
     <if test="batchSize&gt;=0">
       LIMIT #{batchSize,jdbcType=INTEGER}
diff --git a/src/core-job/src/main/resources/script/schema_job_info_h2.sql 
b/src/core-job/src/main/resources/script/schema_job_info_h2.sql
index 9335bd3502..2cbcdcce25 100644
--- a/src/core-job/src/main/resources/script/schema_job_info_h2.sql
+++ b/src/core-job/src/main/resources/script/schema_job_info_h2.sql
@@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info (
   job_status varchar(50) NOT NULL,
   project varchar(100) NOT NULL,
   subject varchar(200) NOT NULL,
-  model_id varchar(100),
+  model_id varchar(200),
   priority integer DEFAULT 3,
   job_content longblob NOT NULL,
   mvcc bigint(10),
@@ -32,4 +32,4 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info (
   job_duration_millis bigint(10) NOT NULL DEFAULT '0' COMMENT 'total duration 
milliseconds',
    PRIMARY KEY (id),
    UNIQUE KEY uk_job_id (job_id)
-) DEFAULT CHARSET=utf8;
\ No newline at end of file
+);
\ No newline at end of file
diff --git a/src/core-job/src/main/resources/script/schema_job_info_mysql.sql 
b/src/core-job/src/main/resources/script/schema_job_info_mysql.sql
index 63187e29b2..af439d41d6 100644
--- a/src/core-job/src/main/resources/script/schema_job_info_mysql.sql
+++ b/src/core-job/src/main/resources/script/schema_job_info_mysql.sql
@@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info (
   job_status varchar(50) NOT NULL,
   project varchar(100) NOT NULL,
   subject varchar(200) NOT NULL,
-  model_id varchar(100),
+  model_id varchar(200),
   priority integer DEFAULT 3,
   mvcc bigint(10),
   job_content longblob NOT NULL,
@@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info (
   job_duration_millis bigint(10) NOT NULL DEFAULT '0' COMMENT 'total duration 
milliseconds',
   PRIMARY KEY (id),
   UNIQUE KEY uk_job_id (job_id)
-) AUTO_INCREMENT=10000 ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+) AUTO_INCREMENT=10000 ENGINE=InnoDB;
 
 create index KE_IDENTIFIED_job_info_ix
     on KE_IDENTIFIED_job_info (project, job_status, job_type, subject);
diff --git 
a/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql 
b/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql
index 604a5d2f68..aee6329ede 100644
--- a/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql
+++ b/src/core-job/src/main/resources/script/schema_job_info_postgresql.sql
@@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_info (
   job_status varchar(50) NOT NULL,
   project varchar(100) NOT NULL,
   subject varchar(200) NOT NULL,
-  model_id varchar(100),
+  model_id varchar(200),
   priority integer DEFAULT 3,
   mvcc bigint,
   job_content bytea NOT NULL,
diff --git a/src/core-job/src/main/resources/script/schema_job_lock_h2.sql 
b/src/core-job/src/main/resources/script/schema_job_lock_h2.sql
index 7f886da928..df854c3e03 100644
--- a/src/core-job/src/main/resources/script/schema_job_lock_h2.sql
+++ b/src/core-job/src/main/resources/script/schema_job_lock_h2.sql
@@ -19,6 +19,7 @@
 
 CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock (
   id bigint(10) NOT NULL AUTO_INCREMENT,
+  project varchar(100) NOT NULL,
   lock_id varchar(100) NOT NULL COMMENT 'what is locked',
   lock_node varchar(50) DEFAULT NULL COMMENT 'who locked it',
   lock_expire_time timestamp COMMENT 'when does the lock expire',
@@ -27,4 +28,4 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock (
   update_time bigint,
   PRIMARY KEY (id),
   UNIQUE KEY uk_lock_id (lock_id)
-) DEFAULT CHARSET=utf8;
+);
diff --git a/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql 
b/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql
index bd59896a94..7cd16c56c3 100644
--- a/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql
+++ b/src/core-job/src/main/resources/script/schema_job_lock_mysql.sql
@@ -19,6 +19,7 @@
 
 CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock (
   id bigint(20) NOT NULL AUTO_INCREMENT,
+  project varchar(100) NOT NULL,
   lock_id varchar(100) NOT NULL COMMENT 'what is locked',
   lock_node varchar(50) DEFAULT NULL COMMENT 'who locked it',
   lock_expire_time timestamp COMMENT 'when does the lock expire',
@@ -27,4 +28,4 @@ CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock (
   update_time bigint,
   PRIMARY KEY (id),
   UNIQUE KEY uk_lock_id (lock_id)
-) AUTO_INCREMENT=10000 ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+) AUTO_INCREMENT=10000 ENGINE=InnoDB;
diff --git 
a/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql 
b/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql
index 4142a73fdf..004616961e 100644
--- a/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql
+++ b/src/core-job/src/main/resources/script/schema_job_lock_postgresql.sql
@@ -18,6 +18,7 @@
 
 CREATE TABLE IF NOT EXISTS KE_IDENTIFIED_job_lock (
   id SERIAL PRIMARY KEY,
+  project varchar(100) NOT NULL,
   lock_id varchar(100) UNIQUE NOT NULL,
   lock_node varchar(50) DEFAULT NULL,
   lock_expire_time timestamptz DEFAULT NULL,
diff --git 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java
 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java
index 86fd300783..9ead83b465 100644
--- 
a/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java
+++ 
b/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java
@@ -50,8 +50,8 @@ public class JobMailUtilTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
 
b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 449efd4d4c..7191e29735 100644
--- 
a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ 
b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -75,8 +75,8 @@ public abstract class BaseSchedulerTest extends 
NLocalFileMetadataTestCase {
     @After
     public void after() throws Exception {
         JobContext jobContext = 
JobContextUtil.getJobContext(KylinConfig.getInstanceFromEnv());
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
         await().atMost(30, TimeUnit.SECONDS).until(() -> 
jobContext.getJobScheduler().getRunningJob().size() == 0);
     }
 
diff --git 
a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java
 
b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java
index ea2fbb89ab..daa5de0348 100644
--- 
a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java
+++ 
b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.job.scheduler;
 import static org.apache.kylin.common.util.TestUtils.getTestConfig;
 import static org.awaitility.Awaitility.await;
 
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -117,7 +118,7 @@ class JdbcJobSchedulerTest {
     @Test
     void testLockExpiredAndJobNotFinal() {
         String jobId = mockJob();
-        JobLock lock = new JobLock(jobId, 1);
+        JobLock lock = new JobLock(jobId, PROJECT, 1);
         lock.setLockNode("mock_node");
         lock.setLockExpireTime(new Date());
         int expect = jobContext.getJobLockMapper().insert(lock);
@@ -168,6 +169,7 @@ class JdbcJobSchedulerTest {
             JobLock lock = new JobLock();
             String id = "mock_lock_id_" + i;
             lock.setLockId(id);
+            lock.setProject(PROJECT);
             lock.setLockNode("mock_node");
             lock.setPriority(p);
             lock.setLockExpireTime(new Date());
@@ -179,8 +181,10 @@ class JdbcJobSchedulerTest {
         JdbcJobScheduler jobScheduler = Mockito.spy(originScheduler);
         Mockito.when(jobScheduler.hasRunningJob()).thenReturn(true);
         jobContext.setJobScheduler(jobScheduler);
-        List<String> order1 = 
jobContext.getJobScheduler().findNonLockIdListInOrder(20);
-        List<String> order2 = 
jobContext.getJobScheduler().findNonLockIdListInOrder(20);
+        List<String> order1 = 
jobContext.getJobScheduler().findNonLockIdListInOrder(20,
+                Collections.singletonList(PROJECT));
+        List<String> order2 = 
jobContext.getJobScheduler().findNonLockIdListInOrder(20,
+                Collections.singletonList(PROJECT));
         boolean hasDiff = false;
         int currentPriority = 0;
         for (int i = 0; i < order1.size(); i++) {
@@ -202,7 +206,7 @@ class JdbcJobSchedulerTest {
         AbstractExecutable job = mockExecutable();
         // insert job lock, without lock node
         String jobId = job.getJobId();
-        JobLock lock = new JobLock(jobId, 1);
+        JobLock lock = new JobLock(jobId, PROJECT, 1);
         int expect = jobContext.getJobLockMapper().insert(lock);
         Assertions.assertEquals(1, expect);
         await().atMost(60, TimeUnit.SECONDS).until(() -> 
jobContext.getJobLockMapper().selectByJobId(jobId) == null);
@@ -225,7 +229,7 @@ class JdbcJobSchedulerTest {
             job.getTasks().forEach(task -> 
task.getOutput().setStatus(ExecutableState.PENDING.name()));
             return true;
         });
-        mapper.insertSelective(new JobLock(jobId, 3));
+        mapper.insertSelective(new JobLock(jobId, PROJECT, 3));
         // init schedule
         JobContextUtil.getJobContext(config);
 
diff --git 
a/src/core-job/src/test/java/org/apache/kylin/mapper/JobLockMapperTest.java 
b/src/core-job/src/test/java/org/apache/kylin/mapper/JobLockMapperTest.java
index ceed191e89..7645b8d1f8 100644
--- a/src/core-job/src/test/java/org/apache/kylin/mapper/JobLockMapperTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/mapper/JobLockMapperTest.java
@@ -21,6 +21,7 @@ package org.apache.kylin.mapper;
 import static org.apache.kylin.common.util.TestUtils.getTestConfig;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
 
@@ -56,6 +57,7 @@ public class JobLockMapperTest {
     private JobLock generateJobLock() {
 
         JobLock jobLock = new JobLock();
+        jobLock.setProject("default");
         jobLock.setLockId("mock_lock_id");
         jobLock.setLockNode("mock_lock_node");
         long renewalSec = getTestConfig().getJobSchedulerJobRenewalSec();
@@ -80,7 +82,7 @@ public class JobLockMapperTest {
 
         long renewSec = getTestConfig().getJobSchedulerJobRenewalSec();
         Awaitility.await().atMost(renewSec + 1, TimeUnit.SECONDS)
-                .until(() -> jobLockMapper.findNonLockIdList(10).size() > 0);
+                .until(() -> jobLockMapper.findNonLockIdList(10, 
Collections.singletonList("default")).size() > 0);
 
         // update (h2 no support mysql-dialect)
         //        int updateAffect = jobLockMapper.upsertLock("mock_job_id", 
"mock_node_id", 10000L);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
index 5f8e868886..9ea32d0cb8 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
@@ -404,12 +404,25 @@ public class EpochManager {
     }
 
     private List<String> listProjectWithPermission() {
-        List<String> projects = epochCheckEnabled ? getProjectsToMarkOwner()
-                : 
NProjectManager.getInstance(config).listAllProjects().stream().map(ProjectInstance::getName)
-                        .collect(Collectors.toList());
+        List<String> projects = listRealProjectWithPermission();
         projects.add(GLOBAL);
         return projects;
     }
+    
+    public List<String> listProjectWithPermissionForScheduler() {
+        List<String> projects = listRealProjectWithPermission();
+        if (projects.size() == 
NProjectManager.getInstance(config).listAllProjects().size()) {
+            // Returning null indicates that filtering items is not required 
during scheduling.
+            return null;
+        }
+        return projects;
+    }
+
+    private List<String> listRealProjectWithPermission() {
+        return epochCheckEnabled ? getProjectsToMarkOwner()
+                : 
NProjectManager.getInstance(config).listAllProjects().stream().map(ProjectInstance::getName)
+                        .collect(Collectors.toList());
+    }
 
     //for test
     public Epoch getGlobalEpoch() {
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/favorite/FavoriteRuleManager.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/favorite/FavoriteRuleManager.java
index 2dbc143ab1..cb72f63e39 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/favorite/FavoriteRuleManager.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/favorite/FavoriteRuleManager.java
@@ -129,6 +129,10 @@ public class FavoriteRuleManager {
         favoriteRuleStore.deleteByName(project, favoriteRule.getName());
     }
 
+    public void deleteByProject() {
+        favoriteRuleStore.deleteByProject(project);
+    }
+
     @VisibleForTesting
     public void createRule(final FavoriteRule rule) {
         FavoriteRule copy = copyForWrite(rule);
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
index c9c62aeb34..c41ed73a39 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.kylin.common.util.TestUtils.getTestConfig;
 import static org.awaitility.Awaitility.await;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -32,11 +33,17 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.metadata.Epoch;
 import org.apache.kylin.common.persistence.metadata.EpochStore;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.junit.annotation.MetadataInfo;
 import org.apache.kylin.junit.annotation.OverwriteProp;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.resourcegroup.KylinInstance;
+import org.apache.kylin.metadata.resourcegroup.RequestTypeEnum;
+import org.apache.kylin.metadata.resourcegroup.ResourceGroupEntity;
 import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager;
+import org.apache.kylin.metadata.resourcegroup.ResourceGroupMappingInfo;
+import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.test.util.ReflectionTestUtils;
@@ -577,4 +584,38 @@ class EpochManagerTest {
         Assertions.assertTrue(isEpochLegal);
     }
 
+    @Test
+    @MetadataInfo
+    void testListProjectForScheduling() {
+        KylinConfig config = getTestConfig();
+        List<ProjectInstance> allProjects = 
NProjectManager.getInstance(config).listAllProjects();
+        EpochManager epochManager = EpochManager.getInstance();
+        ResourceGroupManager resourceGroupManager = 
ResourceGroupManager.getInstance(config);
+
+        // Resource is disabled.
+        
Assert.assertNull(epochManager.listProjectWithPermissionForScheduler());
+
+        // ResourceGroup is enabled, but no projects are bound.
+        resourceGroupManager.updateResourceGroup(copyForWrite -> 
copyForWrite.setResourceGroupEnabled(true));
+        Assert.assertEquals(0, 
epochManager.listProjectWithPermissionForScheduler().size());
+
+        // ResourceGroup is enabled, and project 'default' is bound.
+        resourceGroupManager.updateResourceGroup(copyForWrite -> {
+            ResourceGroupEntity group = new ResourceGroupEntity();
+            ReflectionTestUtils.setField(group, "id", "rg_001");
+            KylinInstance kylinInstance = new KylinInstance();
+            ReflectionTestUtils.setField(kylinInstance, "resourceGroupId", 
"rg_001");
+            ReflectionTestUtils.setField(kylinInstance, "instance", 
AddressUtil.getLocalInstance());
+            ResourceGroupMappingInfo resourceGroupMappingInfo = new 
ResourceGroupMappingInfo();
+            ReflectionTestUtils.setField(resourceGroupMappingInfo, "project", 
"default");
+            ReflectionTestUtils.setField(resourceGroupMappingInfo, 
"resourceGroupId", "rg_001");
+            ReflectionTestUtils.setField(resourceGroupMappingInfo, 
"requestType", RequestTypeEnum.BUILD);
+
+            
copyForWrite.setResourceGroupEntities(Collections.singletonList(group));
+            
copyForWrite.setKylinInstances(Collections.singletonList(kylinInstance));
+            
copyForWrite.setResourceGroupMappingInfoList(Collections.singletonList(resourceGroupMappingInfo));
+        });
+        Assert.assertEquals(1, 
epochManager.listProjectWithPermissionForScheduler().size());
+        Assert.assertEquals("default", 
epochManager.listProjectWithPermissionForScheduler().get(0));
+    }
 }
diff --git 
a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java
 
b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java
index 10b9202092..9620de66ab 100644
--- 
a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java
+++ 
b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java
@@ -115,8 +115,8 @@ public class JobControllerTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java
index 0d0adad48f..7cbecb7ae4 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/job/util/JobFilterUtil.java
@@ -36,7 +36,6 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.Message;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.job.config.JobMybatisConfig;
 import org.apache.kylin.job.constant.JobStatusUtil;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -90,12 +89,10 @@ public class JobFilterUtil {
             jobFilter.getStatuses()
                     .forEach(jobStatus -> 
scheduleStates.addAll(JobStatusUtil.mapJobStatusToScheduleState(jobStatus)));
         }
-        List<String> scheduleStateNames = 
scheduleStates.stream().map(executableState -> executableState.name())
-                .collect(Collectors.toList());
 
         return new JobMapperFilter(scheduleStates, jobFilter.getJobNames(), 
queryStartTime.getTime(),
                 Lists.newArrayList(subjects), null, jobId, null, 
jobFilter.getProject(), orderByField, orderType,
-                offset, limit, JobMybatisConfig.JOB_INFO_TABLE, null);
+                offset, limit, null, null);
     }
 
     private static Date getQueryStartTime(int timeFilter) {
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/job/service/JobInfoServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/job/service/JobInfoServiceTest.java
index 91274a76d2..f97bdfe600 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/job/service/JobInfoServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/job/service/JobInfoServiceTest.java
@@ -163,8 +163,8 @@ public class JobInfoServiceTest extends LogOutputTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceBuildTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceBuildTest.java
index c22d02bda5..a42f6b98c3 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceBuildTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceBuildTest.java
@@ -122,8 +122,8 @@ public class FusionModelServiceBuildTest extends 
SourceTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
index e4853174e2..06eee4367d 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java
@@ -129,8 +129,8 @@ public class JobErrorTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     private String getProject() {
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java
index 5446304f68..ab96972e4a 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobResourceServiceTest.java
@@ -65,8 +65,8 @@ public class JobResourceServiceTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index 107e997bd3..0411300f20 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -248,8 +248,8 @@ public class ModelServiceBuildTest extends SourceTestCase {
         EventBusFactory.getInstance().unregister(eventListener);
         EventBusFactory.getInstance().unregister(modelBrokenListener);
         EventBusFactory.getInstance().restart();
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
 
         if (!TimeZone.getDefault().equals(defaultTimeZone)) {
             TimeZone.setDefault(defaultTimeZone);
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/SnapshotServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/SnapshotServiceTest.java
index f8b76530cb..3e3f4dbabb 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/SnapshotServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/SnapshotServiceTest.java
@@ -128,8 +128,8 @@ public class SnapshotServiceTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
index ceca44bcaa..e9e9a6e0f5 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/StageTest.java
@@ -121,8 +121,8 @@ public class StageTest extends NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     private String getProject() {
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/TableSamplingServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/TableSamplingServiceTest.java
index 6336fc954f..db5a80447d 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/TableSamplingServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/TableSamplingServiceTest.java
@@ -95,8 +95,8 @@ public class TableSamplingServiceTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/BuildAndQueryEmptySegmentsTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/BuildAndQueryEmptySegmentsTest.java
index 5d094fb07b..9034add0f4 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/BuildAndQueryEmptySegmentsTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/BuildAndQueryEmptySegmentsTest.java
@@ -92,8 +92,8 @@ public class BuildAndQueryEmptySegmentsTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void cleanup() throws Exception {
-        super.cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        super.cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/CharNColumnTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/CharNColumnTest.java
index d6a7796c79..27d6ebf753 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/CharNColumnTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/CharNColumnTest.java
@@ -44,8 +44,8 @@ public class CharNColumnTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java
index 9ed827a094..2ee6a3cddd 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java
@@ -46,8 +46,8 @@ public class EnhancedAggPushDownTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/ExactlyMatchTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/ExactlyMatchTest.java
index c09a45fd99..6be3462460 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/ExactlyMatchTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/ExactlyMatchTest.java
@@ -56,8 +56,8 @@ public class ExactlyMatchTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/MultiPartitionPruningTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/MultiPartitionPruningTest.java
index 8702bb0922..b428794708 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/MultiPartitionPruningTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/MultiPartitionPruningTest.java
@@ -89,8 +89,8 @@ public class MultiPartitionPruningTest extends 
NLocalWithSparkSessionTest implem
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggPushDownTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggPushDownTest.java
index b74de3f3ff..60f8f60bad 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggPushDownTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggPushDownTest.java
@@ -52,8 +52,8 @@ public class NAggPushDownTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBitmapFunctionTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBitmapFunctionTest.java
index 1ca7d24369..762ad6591b 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBitmapFunctionTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBitmapFunctionTest.java
@@ -48,9 +48,9 @@ public class NBitmapFunctionTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
+        JobContextUtil.cleanUp();
         cleanupTestMetadata();
         FileUtils.deleteQuietly(new File("../kylin-it/metastore_db"));
-        JobContextUtil.cleanUp();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBuildAndQuerySnapshotTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBuildAndQuerySnapshotTest.java
index e7f99d748f..b95e35d12d 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBuildAndQuerySnapshotTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBuildAndQuerySnapshotTest.java
@@ -69,8 +69,8 @@ public class NBuildAndQuerySnapshotTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void cleanup() throws Exception {
-        super.cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        super.cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NComputedColumnTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NComputedColumnTest.java
index b2ae9784b8..c05d1e8044 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NComputedColumnTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NComputedColumnTest.java
@@ -51,8 +51,8 @@ public class NComputedColumnTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NCountDistinctWithoutEncodeTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NCountDistinctWithoutEncodeTest.java
index 83811aee3b..cc8413ac6d 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NCountDistinctWithoutEncodeTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NCountDistinctWithoutEncodeTest.java
@@ -48,8 +48,8 @@ public class NCountDistinctWithoutEncodeTest extends 
NLocalWithSparkSessionTest
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningTest.java
index 260fbed2ef..4954ad1f59 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningTest.java
@@ -103,8 +103,8 @@ public class NFilePruningTest extends 
NLocalWithSparkSessionTest implements Adap
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningV2Test.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningV2Test.java
index ca1035728f..3c903a101a 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningV2Test.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFilePruningV2Test.java
@@ -98,8 +98,8 @@ public class NFilePruningV2Test extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFlattableJoinWithoutLookupTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFlattableJoinWithoutLookupTest.java
index efed13b498..6988121c8c 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NFlattableJoinWithoutLookupTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NFlattableJoinWithoutLookupTest.java
@@ -58,9 +58,9 @@ public class NFlattableJoinWithoutLookupTest extends 
NLocalWithSparkSessionTest
 
     @After
     public void after() throws Exception {
+        JobContextUtil.cleanUp();
         cleanupTestMetadata();
         FileUtils.deleteQuietly(new File("../kap-it/metastore_db"));
-        JobContextUtil.cleanUp();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NJoinOptTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NJoinOptTest.java
index 96a19b9d99..da5a1b25b0 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NJoinOptTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NJoinOptTest.java
@@ -85,8 +85,8 @@ public class NJoinOptTest extends NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Ignore("KE-30387")
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NManualBuildAndQueryCuboidTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NManualBuildAndQueryCuboidTest.java
index b00559d438..0626143412 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NManualBuildAndQueryCuboidTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NManualBuildAndQueryCuboidTest.java
@@ -80,8 +80,8 @@ public class NManualBuildAndQueryCuboidTest extends 
NManualBuildAndQueryTest {
 
     @After
     public void after() throws Exception {
-        super.cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        super.cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMatchingTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMatchingTest.java
index 6d0de04dbe..324b16caae 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMatchingTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMatchingTest.java
@@ -50,8 +50,8 @@ public class NMatchingTest extends NLocalWithSparkSessionTest 
{
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultiPartitionJobTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultiPartitionJobTest.java
index e16dc83990..c6bfa7855f 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultiPartitionJobTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultiPartitionJobTest.java
@@ -52,8 +52,8 @@ public class NMultiPartitionJobTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultipleColumnsInTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultipleColumnsInTest.java
index bf941a2815..821861cd25 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultipleColumnsInTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NMultipleColumnsInTest.java
@@ -47,8 +47,8 @@ public class NMultipleColumnsInTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NOptIntersectCountTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NOptIntersectCountTest.java
index dd01b2438c..35c92ac856 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NOptIntersectCountTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NOptIntersectCountTest.java
@@ -57,8 +57,8 @@ public class NOptIntersectCountTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NPartitionColumnTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NPartitionColumnTest.java
index 80293e1c6a..9960e77396 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NPartitionColumnTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NPartitionColumnTest.java
@@ -47,8 +47,8 @@ public class NPartitionColumnTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNResultTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNResultTest.java
index 520df87766..7b3e8543f5 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNResultTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNResultTest.java
@@ -47,8 +47,8 @@ public class NTopNResultTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNWithChineseTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNWithChineseTest.java
index 2fb3487828..14ac5caf43 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNWithChineseTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/NTopNWithChineseTest.java
@@ -47,8 +47,8 @@ public class NTopNWithChineseTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/ReuseFlatTableTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/ReuseFlatTableTest.java
index 90da5edf0e..35d180c662 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/ReuseFlatTableTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/ReuseFlatTableTest.java
@@ -50,8 +50,8 @@ public class ReuseFlatTableTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SimilarToEscapeFunctionTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SimilarToEscapeFunctionTest.java
index 4bffd9acee..de53bb4fa6 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SimilarToEscapeFunctionTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SimilarToEscapeFunctionTest.java
@@ -67,8 +67,8 @@ public class SimilarToEscapeFunctionTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SumLCResultTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SumLCResultTest.java
index 7cdfd073a0..db42a556f9 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SumLCResultTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SumLCResultTest.java
@@ -53,8 +53,8 @@ public class SumLCResultTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/TableIndexTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/TableIndexTest.java
index c3a91c4ed7..4a3bf6fa7a 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/TableIndexTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/TableIndexTest.java
@@ -46,9 +46,9 @@ public class TableIndexTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
+        JobContextUtil.cleanUp();
         cleanupTestMetadata();
         FileUtils.deleteQuietly(new File("../kylin-it/metastore_db"));
-        JobContextUtil.cleanUp();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/TimeZoneQueryTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/TimeZoneQueryTest.java
index eceb39be21..8cfbdad70d 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/TimeZoneQueryTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/TimeZoneQueryTest.java
@@ -99,8 +99,8 @@ public class TimeZoneQueryTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Override
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/QueryLayoutFilterTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/QueryLayoutFilterTest.java
index 868b6db57b..f9620c3e89 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/QueryLayoutFilterTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/QueryLayoutFilterTest.java
@@ -86,8 +86,8 @@ public class QueryLayoutFilterTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/kylin-server-it/src/test/java/org/apache/kylin/event/ITStorageCleanerTest.java
 
b/src/kylin-server-it/src/test/java/org/apache/kylin/event/ITStorageCleanerTest.java
index 2ea193d385..177290cb03 100644
--- 
a/src/kylin-server-it/src/test/java/org/apache/kylin/event/ITStorageCleanerTest.java
+++ 
b/src/kylin-server-it/src/test/java/org/apache/kylin/event/ITStorageCleanerTest.java
@@ -79,8 +79,8 @@ public class ITStorageCleanerTest extends 
NLocalWithSparkSessionTest {
 
     @After
     public void tearDown() throws Exception {
-        this.cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        this.cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/config/initialize/ModelBrokenListenerTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/config/initialize/ModelBrokenListenerTest.java
index 73d6b16b9b..7fefc33c60 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/config/initialize/ModelBrokenListenerTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/config/initialize/ModelBrokenListenerTest.java
@@ -102,9 +102,8 @@ public class ModelBrokenListenerTest extends SourceTestCase 
{
         logger.info("ModelBrokenListenerTest cleanup");
         EventBusFactory.getInstance().unregister(modelBrokenListener);
         EventBusFactory.getInstance().restart();
-        super.cleanup();
-
         JobContextUtil.cleanUp();
+        super.cleanup();
     }
 
     private void generateJob(String modelId, String project) {
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/BaseIndexTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/BaseIndexTest.java
index 9b5ee39054..9574fb444c 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/BaseIndexTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/BaseIndexTest.java
@@ -104,8 +104,8 @@ public class BaseIndexTest extends SourceTestCase {
     @After
     public void tearDown() {
         getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", 
"false");
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java
index 9a95f4c00f..75761b6732 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java
@@ -111,8 +111,8 @@ public class FusionIndexServiceTest extends SourceTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     private UpdateRuleBasedCuboidRequest createUpdateRuleRequest(String 
project, String modelId,
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceTest.java
index a3dc78b8d2..b3e4b39145 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionModelServiceTest.java
@@ -136,8 +136,8 @@ public class FusionModelServiceTest extends SourceTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java
index 9cb9d72138..2b298b1191 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java
@@ -115,8 +115,8 @@ public class IndexPlanServiceTest extends SourceTestCase {
     @After
     public void tearDown() {
         getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", 
"false");
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Before
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
index 8b42b5725b..85df728cda 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceSemanticUpdateTest.java
@@ -166,8 +166,8 @@ public class ModelServiceSemanticUpdateTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
index 4c43db6a55..d7f635697c 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
@@ -316,8 +316,8 @@ public class ModelServiceTest extends SourceTestCase {
         EventBusFactory.getInstance().unregister(eventListener);
         EventBusFactory.getInstance().unregister(modelBrokenListener);
         EventBusFactory.getInstance().restart();
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
index 226eb7a9b5..a0f7244eb2 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
@@ -171,8 +171,8 @@ public class ProjectServiceTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
index fe4e053312..a8ec2a6e4b 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableReloadServiceTest.java
@@ -158,8 +158,8 @@ public class TableReloadServiceTest extends 
CSVSourceTestCase {
         }
         EventBusFactory.getInstance().unregister(modelBrokenListener);
         EventBusFactory.getInstance().restart();
-        super.cleanup();
         JobContextUtil.cleanUp();
+        super.cleanup();
     }
 
     @Test
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
index c8773e1c2e..3075b4e94d 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
@@ -217,9 +217,9 @@ public class TableServiceTest extends CSVSourceTestCase {
     @After
     public void tearDown() {
         EventBusFactory.getInstance().unregister(eventListener);
+        JobContextUtil.cleanUp();
         cleanupTestMetadata();
         FileUtils.deleteQuietly(new File("metastore_db"));
-        JobContextUtil.cleanUp();
         FileUtils.deleteQuietly(new File("../modeling-service/metastore_db"));
     }
 
diff --git 
a/src/second-storage/core/src/test/java/io/kyligence/kap/secondstorage/util/SecondStorageJobUtilTest.java
 
b/src/second-storage/core/src/test/java/io/kyligence/kap/secondstorage/util/SecondStorageJobUtilTest.java
new file mode 100644
index 0000000000..c10fd27789
--- /dev/null
+++ 
b/src/second-storage/core/src/test/java/io/kyligence/kap/secondstorage/util/SecondStorageJobUtilTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.kyligence.kap.secondstorage.util;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.JobTypeEnum;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ ExecutableManager.class })
+@PowerMockIgnore({ "com.sun.security.*", "org.w3c.*", "javax.xml.*", 
"org.xml.*", "org.w3c.dom.*", "org.apache.cxf.*",
+        "javax.management.*", "javax.script.*", "org.apache.hadoop.*", 
"javax.security.*", "java.security.*",
+        "javax.crypto.*", "javax.net.ssl.*", 
"org.apache.kylin.common.asyncprofiler.AsyncProfiler" })
+public class SecondStorageJobUtilTest extends NLocalFileMetadataTestCase {
+    private ExecutableManager executableManager = 
Mockito.mock(ExecutableManager.class);
+
+    @Before
+    public void setUp() throws Exception {
+        createTestMetadata();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    private void prepareManger() {
+        PowerMockito.stub(PowerMockito.method(ExecutableManager.class, 
"getInstance", KylinConfig.class, String.class))
+                .toReturn(executableManager);
+    }
+
+    @Test
+    public void testValidateModel() {
+        prepareManger();
+        PowerMockito.stub(PowerMockito.method(ExecutableManager.class, 
"getInstance", KylinConfig.class, String.class))
+                .toReturn(executableManager);
+
+        List<String> jobs = Collections.singletonList("job1");
+        AbstractExecutable job1 = PowerMockito.mock(AbstractExecutable.class);
+        PowerMockito.when(job1.getStatus()).thenReturn(ExecutableState.READY, 
ExecutableState.PENDING,
+                ExecutableState.RUNNING, ExecutableState.PAUSED, 
ExecutableState.SUCCEED);
+
+        
PowerMockito.when(job1.getJobType()).thenReturn(JobTypeEnum.EXPORT_TO_SECOND_STORAGE);
+        Mockito.when(job1.getProject()).thenReturn("project");
+        Mockito.when(job1.getTargetModelId()).thenReturn("modelId");
+        Mockito.when(executableManager.getJobs()).thenReturn(jobs);
+        Mockito.when(executableManager.getJob("job1")).thenReturn(job1);
+
+        //job status: Ready
+        assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), 
KylinException.class,
+                () -> SecondStorageJobUtil.validateModel("project", 
"modelId"));
+        //job status: Pending
+        assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), 
KylinException.class,
+                () -> SecondStorageJobUtil.validateModel("project", 
"modelId"));
+        //job status: Running
+        assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), 
KylinException.class,
+                () -> SecondStorageJobUtil.validateModel("project", 
"modelId"));
+        //job status: Paused
+        assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), 
KylinException.class,
+                () -> SecondStorageJobUtil.validateModel("project", 
"modelId"));
+        //job status: Succeed
+        SecondStorageJobUtil.validateModel("project", "modelId");
+    }
+}
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobOnYarnTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobOnYarnTest.java
index 653e9e0a1f..9d149deb4b 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobOnYarnTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobOnYarnTest.java
@@ -54,8 +54,8 @@ public class NSparkCubingJobOnYarnTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void after() throws Exception {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/it/TestModelViewQuery.scala
 
b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/it/TestModelViewQuery.scala
index a5a7d969c0..48058bcf89 100644
--- 
a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/it/TestModelViewQuery.scala
+++ 
b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/it/TestModelViewQuery.scala
@@ -21,10 +21,7 @@ import java.sql.SQLException
 import java.util.TimeZone
 
 import org.apache.kylin.common._
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-import org.apache.kylin.common.KylinConfig
-import org.apache.kylin.common.persistence.{JsonSerializer, 
RootPersistentEntity}
+import org.apache.kylin.common.util.TimeZoneUtils
 import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.metadata.realization.NoRealizationFoundException
 import org.apache.kylin.query.QueryFetcher
@@ -67,6 +64,7 @@ class TestModelViewQuery
     overwriteSystemProp("calcite.keep-in-clause", "true")
     overwriteSystemProp("kylin.dictionary.null-encoding-opt-threshold", "1")
     overwriteSystemProp("kylin.web.timezone", "GMT+8")
+    TimeZoneUtils.setDefaultTimeZone(KylinConfig.getInstanceFromEnv)
     overwriteSystemProp("kylin.query.pushdown.runner-class-name", "")
     overwriteSystemProp("kylin.query.pushdown-enabled", "false")
     overwriteSystemProp("kylin.snapshot.parallel-build-enabled", "true")
diff --git a/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java
index 6a092af254..d37f3ac2d6 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/AuditLogToolTest.java
@@ -108,8 +108,8 @@ public class AuditLogToolTest extends 
NLocalFileMetadataTestCase {
     public void teardown() {
         val jdbcTemplate = getJdbcTemplate();
         jdbcTemplate.batchUpdate("DROP ALL OBJECTS");
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/JobDiagInfoToolTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/JobDiagInfoToolTest.java
index 76b7e331d6..3c1d72ad1b 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/JobDiagInfoToolTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/JobDiagInfoToolTest.java
@@ -83,8 +83,8 @@ public class JobDiagInfoToolTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void tearDown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @After
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
index 417b4bc272..720eef3719 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/StorageCleanerTest.java
@@ -84,8 +84,8 @@ public class StorageCleanerTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void teardown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/SystemUsageToolTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/SystemUsageToolTest.java
index e018f375f9..8390bdc1f8 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/SystemUsageToolTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/SystemUsageToolTest.java
@@ -51,18 +51,18 @@ public class SystemUsageToolTest extends 
NLocalFileMetadataTestCase {
 
     @Before
     public void setup() throws Exception {
+        JobContextUtil.cleanUp();
         createTestMetadata();
         queryHistoryDAO = RDBMSQueryHistoryDAO.getInstance();
 
-        JobContextUtil.cleanUp();
         JobContextUtil.getJobInfoDao(getTestConfig());
     }
 
     @After
     public void teardown() {
         queryHistoryDAO.deleteAllQueryHistory();
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/YarnApplicationToolTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/YarnApplicationToolTest.java
index 1651fe3a21..5031867c2e 100644
--- a/src/tool/src/test/java/org/apache/kylin/tool/YarnApplicationToolTest.java
+++ b/src/tool/src/test/java/org/apache/kylin/tool/YarnApplicationToolTest.java
@@ -69,8 +69,8 @@ public class YarnApplicationToolTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void teardown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java
index 9a8bb67bdc..29a57a9355 100644
--- 
a/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/garbage/ExecutableCleanerTest.java
@@ -48,8 +48,8 @@ public class ExecutableCleanerTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void destroy() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/upgrade/MigrateJobToolTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/upgrade/MigrateJobToolTest.java
index 54d4546767..1db75d7455 100644
--- 
a/src/tool/src/test/java/org/apache/kylin/tool/upgrade/MigrateJobToolTest.java
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/upgrade/MigrateJobToolTest.java
@@ -47,8 +47,8 @@ public class MigrateJobToolTest extends 
NLocalFileMetadataTestCase {
 
     @After
     public void teardown() {
-        cleanupTestMetadata();
         JobContextUtil.cleanUp();
+        cleanupTestMetadata();
     }
 
     @Test

Reply via email to