This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d34d6761c718bd34fa83bfc7f00038aacaf5bc22 Author: Xuecheng Shan <shanxuech...@gmail.com> AuthorDate: Wed Oct 18 10:04:46 2023 +0800 KYLIN-5857 Fix jobScheduler related problems Stop job check before shutdown the system. Read the latest user metadata before updating. Mark the state to suicide for error or paused jobs. Allow setting max retry times for job transactions to avoid mvcc conflict. Co-authored-by: Xuecheng Shan <xuecheng.s...@kyligence.io> Co-authored-by: Zhiting Guo <zhiting....@kyligence.io> --- .../security/LimitLoginAuthenticationProvider.java | 12 ++++++ .../LimitLoginAuthenticationProviderTest.java | 21 +++++++++ .../org/apache/kylin/common/KylinConfigBase.java | 4 ++ .../persistence/metadata/FileMetadataStore.java | 7 ++- .../common/persistence/metadata/jdbc/JdbcUtil.java | 12 +++--- .../main/java/org/apache/kylin/job/JobContext.java | 2 + .../kylin/job/execution/ExecutableManager.java | 30 +++++++++---- .../kylin/job/execution/ExecutableThread.java | 3 +- .../apache/kylin/job/runners/JobCheckRunner.java | 36 ++++++++++++++++ .../org/apache/kylin/job/runners/JobCheckUtil.java | 6 ++- .../kylin/job/scheduler/JdbcJobScheduler.java | 37 ++++++++-------- .../apache/kylin/job/scheduler/JobExecutor.java | 4 +- .../org/apache/kylin/job/util/JobContextUtil.java | 4 +- .../kylin/job/execution/DagExecutableTest.java | 3 +- .../job/impl/threadpool/BaseSchedulerTest.java | 6 ++- .../job/impl/threadpool/NDefaultSchedulerTest.java | 2 - .../kylin/job/scheduler/JdbcJobSchedulerTest.java | 37 +++++++++++++++- .../kylin/rest/controller/JobControllerTest.java | 50 ++++++++++++++++++++++ .../apache/kylin/job/service/JobInfoService.java | 8 +++- .../org/apache/kylin/rest/QueryNodeFilter.java | 2 + .../java/org/apache/kylin/tool/KylinLogTool.java | 4 +- .../org/apache/kylin/tool/KylinLogToolTest.java | 21 +++++++++ 22 files changed, 264 insertions(+), 47 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/security/LimitLoginAuthenticationProvider.java b/src/common-service/src/main/java/org/apache/kylin/rest/security/LimitLoginAuthenticationProvider.java index d614a8d9b7..39d19d533f 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/security/LimitLoginAuthenticationProvider.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/security/LimitLoginAuthenticationProvider.java @@ -95,6 +95,8 @@ public class LimitLoginAuthenticationProvider extends DaoAuthenticationProvider updateUserLockStatus(managedUser, userName); Authentication auth = super.authenticate(authentication); + // Metadata modifications need to be based on the latest metadata copy. + managedUser = getUser(userName); if (managedUser != null && managedUser.getWrongTime() > 0 && !maintenanceModeService.isMaintenanceMode()) { managedUser.clearAuthenticateFailedRecord(); updateUser(managedUser); @@ -104,6 +106,7 @@ public class LimitLoginAuthenticationProvider extends DaoAuthenticationProvider return auth; } catch (BadCredentialsException e) { + managedUser = getUser(userName); authenticateFail(managedUser, userName); if (managedUser != null && managedUser.isLocked()) { if (UserLockRuleUtil.isLockedPermanently(managedUser)) { @@ -123,6 +126,15 @@ public class LimitLoginAuthenticationProvider extends DaoAuthenticationProvider } } + private ManagedUser getUser(String userName) { + NKylinUserManager userManager = NKylinUserManager.getInstance(KylinConfig.getInstanceFromEnv()); + ManagedUser managedUser = userManager.get(userName); + if (managedUser != null) { + return managedUser; + } + return (ManagedUser) userService.loadUserByUsername(userName); + } + private void buildBadCredentialsException(String userName, BadCredentialsException e) { String msg = String.format(Locale.ROOT, MsgPicker.getMsg().getUserInPermanentlyLockedStatus(), userName); limitLoginLogger.error(msg, new KylinException(USER_LOCKED, e)); diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/security/LimitLoginAuthenticationProviderTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/security/LimitLoginAuthenticationProviderTest.java index 5e3dc081e7..dc2d748078 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/security/LimitLoginAuthenticationProviderTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/security/LimitLoginAuthenticationProviderTest.java @@ -20,9 +20,11 @@ package org.apache.kylin.rest.security; import static org.apache.kylin.common.exception.code.ErrorCodeServer.USER_LOGIN_FAILED; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.user.ManagedUser; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.service.KylinUserService; +import org.apache.kylin.rest.service.MaintenanceModeService; import org.apache.kylin.rest.service.UserAclService; import org.junit.After; import org.junit.Assert; @@ -156,6 +158,25 @@ public class LimitLoginAuthenticationProviderTest extends NLocalFileMetadataTest } } + @Test + public void testAuthenticate_Unlocked() { + EpochManager.getInstance().tryUpdateEpoch(EpochManager.GLOBAL, true); + ReflectionTestUtils.setField(limitLoginAuthenticationProvider, "maintenanceModeService", + new MaintenanceModeService()); + + userAdmin.setLocked(true); + userAdmin.setLockedTime(System.currentTimeMillis() - 60 * 1000); + userAdmin.setWrongTime(3); + kylinUserService.updateUser(userAdmin); + UsernamePasswordAuthenticationToken token = new UsernamePasswordAuthenticationToken("ADMIN", "KYLIN", + userAdmin.getAuthorities()); + try { + limitLoginAuthenticationProvider.authenticate(token); + } catch (Exception e) { + Assert.fail(); + } + } + @Test public void testAuthenticate_Disabled_Exception() { userAdmin.setDisabled(true); diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 9ff94e764e..9285486698 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1154,6 +1154,10 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.job.remote-cli-working-dir"); } + public int getMaxTransactionRetry() { + return Integer.parseInt(getOptional("kylin.job.max-transaction-retry", "3")); + } + public int getMaxConcurrentJobLimit() { return Integer.parseInt(getOptional("kylin.job.max-concurrent-jobs", "20")); } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileMetadataStore.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileMetadataStore.java index ded2707454..055b2ee0db 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileMetadataStore.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileMetadataStore.java @@ -88,7 +88,12 @@ public class FileMetadataStore extends MetadataStore { @Override public NavigableSet<String> list(String subPath) { TreeSet<String> result = Sets.newTreeSet(); - val scanFolder = new File(root, subPath); + File scanFolder; + if (File.separator.equals(subPath)) { + scanFolder = root; + } else { + scanFolder = new File(root, subPath); + } if (!scanFolder.exists()) { return result; } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java index 295a357ef9..3531b6d950 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java @@ -29,7 +29,7 @@ import java.util.Properties; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSourceFactory; -import org.apache.hadoop.util.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.exception.KylinException; @@ -55,15 +55,17 @@ public class JdbcUtil { private static final ThreadLocal txThreadLocal = new ThreadLocal(); - public static <T> T withTxAndRetry(DataSourceTransactionManager transactionManager, Callback<T> consumer){ + public static <T> T withTxAndRetry(DataSourceTransactionManager transactionManager, Callback<T> consumer) { return withTxAndRetry(transactionManager, consumer, TransactionDefinition.ISOLATION_REPEATABLE_READ, 3); } - public static <T> T withTxAndRetry(DataSourceTransactionManager transactionManager, Callback<T> consumer, int retryLimit){ - return withTxAndRetry(transactionManager, consumer, TransactionDefinition.ISOLATION_REPEATABLE_READ, retryLimit); + public static <T> T withTxAndRetry(DataSourceTransactionManager transactionManager, Callback<T> consumer, + int retryLimit) { + return withTxAndRetry(transactionManager, consumer, TransactionDefinition.ISOLATION_REPEATABLE_READ, + retryLimit); } - public static boolean isInExistingTx(){ + public static boolean isInExistingTx() { return txThreadLocal.get() != null; } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/JobContext.java b/src/core-job/src/main/java/org/apache/kylin/job/JobContext.java index 2a4889b60e..fdc66ac4d2 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/JobContext.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/JobContext.java @@ -116,6 +116,8 @@ public class JobContext implements InitializingBean, DisposableBean, ISmartAppli lockClient.destroy(); } + JobCheckUtil.stopJobCheckScheduler(); + } // for ut only diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index ece1ea7ea7..802191013d 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -83,6 +83,7 @@ import org.apache.kylin.guava30.shaded.common.base.Preconditions; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.job.JobContext; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.dao.ExecutablePO; @@ -90,6 +91,7 @@ import org.apache.kylin.job.dao.JobInfoDao; import org.apache.kylin.job.domain.JobInfo; import org.apache.kylin.job.rest.JobMapperFilter; import org.apache.kylin.job.runners.JobCheckUtil; +import org.apache.kylin.job.scheduler.JdbcJobScheduler; import org.apache.kylin.job.util.JobContextUtil; import org.apache.kylin.job.util.JobInfoUtil; import org.apache.kylin.metadata.cube.model.NBatchConstants; @@ -288,15 +290,18 @@ public class ExecutableManager { if (ExecutableState.READY == newStatus) { Optional.ofNullable(REMOVE_INFO).ifPresent(set -> set.forEach(info::remove)); } - String oldNodeInfo = info.get("node_info"); - String newNodeInfo = config.getServerAddress(); - if (Objects.nonNull(oldNodeInfo) && !Objects.equals(oldNodeInfo, newNodeInfo) - && !Objects.equals(taskOrJobId, jobId)) { - logger.info("The node running job has changed. Job id: {}, Step name: {}, Switch from {} to {}.", jobId, - taskOrJob.getName(), oldNodeInfo, newNodeInfo); + // check if job is running on current node + if (hasRunningJob(jobId)) { + String oldNodeInfo = info.get("node_info"); + String newNodeInfo = config.getServerAddress(); + if (Objects.nonNull(oldNodeInfo) && !Objects.equals(oldNodeInfo, newNodeInfo) + && !Objects.equals(taskOrJobId, jobId)) { + logger.info("The node running job has changed. Job id: {}, Step name: {}, Switch from {} to {}.", + jobId, taskOrJob.getName(), oldNodeInfo, newNodeInfo); + } + info.put("node_info", newNodeInfo); + info.put("host_name", AddressUtil.getHostName()); } - info.put("node_info", newNodeInfo); - info.put("host_name", AddressUtil.getHostName()); jobOutput.setInfo(info); String appId = info.get(ExecutableConstants.YARN_APP_ID); if (StringUtils.isNotEmpty(appId)) { @@ -328,6 +333,15 @@ public class ExecutableManager { } } + + private boolean hasRunningJob(String jobId) { + JobContext jobContext = JobContextUtil.getJobContext(config); + JdbcJobScheduler jobScheduler = jobContext.getJobScheduler(); + if (null != jobScheduler && jobScheduler.getRunningJob().containsKey(jobId)) { + return true; + } + return false; + } public static String extractJobId(String taskOrJobId) { val jobIdPair = taskOrJobId.split("_"); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java index 40ec936a25..85a2ca0065 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java @@ -28,6 +28,7 @@ import org.apache.kylin.job.JobContext; import lombok.val; public class ExecutableThread extends Thread { + public static final String JOB_THREAD_NAME_PATTERN = "JobWorker(project:%s,jobid:%s)"; private Map<String, Executable> dagExecutablesMap; private JobContext context; private DefaultExecutable dagExecutable; @@ -49,7 +50,7 @@ public class ExecutableThread extends Thread { //only the first 8 chars of the job uuid val jobIdSimple = dagExecutable.getId().split("-")[0]; val project = dagExecutable.getProject(); - try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple); + try (SetThreadName ignored = new SetThreadName(JOB_THREAD_NAME_PATTERN, project, jobIdSimple); SetLogCategory ignore = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { // context.addRunningJob(executable); dagExecutable.executeDagExecutable(dagExecutablesMap, executable, context); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java index cac6b8529a..85363f3101 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java @@ -17,15 +17,22 @@ */ package org.apache.kylin.job.runners; +import java.util.Collections; +import java.util.List; import java.util.Map; +import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.job.JobContext; import org.apache.kylin.job.core.AbstractJobExecutable; import org.apache.kylin.job.dao.ExecutablePO; +import org.apache.kylin.job.domain.JobInfo; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.rest.JobMapperFilter; import org.apache.kylin.job.scheduler.JdbcJobScheduler; import org.apache.kylin.job.util.JobContextUtil; import org.slf4j.Logger; @@ -79,8 +86,13 @@ public class JobCheckRunner implements Runnable { public void run() { logger.info("Start check job pool."); JdbcJobScheduler jdbcJobScheduler = jobContext.getJobScheduler(); + // for jobs running on current node Map<String, Pair<AbstractJobExecutable, Long>> runningJobs = jdbcJobScheduler.getRunningJob(); for (Map.Entry<String, Pair<AbstractJobExecutable, Long>> entry : runningJobs.entrySet()) { + if (Thread.currentThread().isInterrupted()) { + logger.warn("Job check thread {} is interrupted.", Thread.currentThread().getName()); + return; + } String jobId = entry.getKey(); AbstractJobExecutable jobExecutable = entry.getValue().getFirst(); long startTime = entry.getValue().getSecond(); @@ -98,6 +110,30 @@ public class JobCheckRunner implements Runnable { continue; } } + // for error or paused jobs + markSuicideForErrorOrPausedJobs(); + } + + private void markSuicideForErrorOrPausedJobs() { + JobMapperFilter jobMapperFilter = new JobMapperFilter(); + jobMapperFilter.setStatuses(Lists.newArrayList(ExecutableState.ERROR, ExecutableState.PAUSED)); + jobMapperFilter.setLimit(10); + jobMapperFilter.setOffset(0); + List<JobInfo> jobInfoList = jobContext.getJobInfoMapper().selectByJobFilter(jobMapperFilter); + if (CollectionUtils.isEmpty(jobInfoList)) { + return; + } + Collections.shuffle(jobInfoList); + for (JobInfo jobInfo : jobInfoList) { + if (Thread.currentThread().isInterrupted()) { + logger.warn("Job check thread {} is interrupted.", Thread.currentThread().getName()); + return; + } + if (JobCheckUtil.markSuicideJob(jobInfo.getJobId(), jobContext)) { + logger.info("suicide job = {} on checker runner", jobInfo.getJobId()); + continue; + } + } } private boolean stopJobIfStorageQuotaLimitReached(JobContext jobContext, String jobId, String project) { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckUtil.java index ff3dc4cab8..d10f8b1ad6 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckUtil.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckUtil.java @@ -42,12 +42,16 @@ public class JobCheckUtil { private static ScheduledExecutorService jobCheckThreadPool; private static synchronized ScheduledExecutorService getJobCheckThreadPool() { - if (null == jobCheckThreadPool) { + if (null == jobCheckThreadPool || jobCheckThreadPool.isShutdown()) { jobCheckThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("JobCheckThreadPool"); } return jobCheckThreadPool; } + public static void stopJobCheckScheduler() { + getJobCheckThreadPool().shutdownNow(); + } + public static void startQuotaStorageCheckRunner(QuotaStorageCheckRunner quotaStorageCheckRunner) { if (!KylinConfig.getInstanceFromEnv().isStorageQuotaEnabled()) { return; diff --git a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java index e2813cc0d8..52f4cd7f48 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.constant.LogConstant; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.ThreadUtils; import org.apache.kylin.guava30.shaded.common.collect.Lists; @@ -358,9 +359,23 @@ public class JdbcJobScheduler implements JobScheduler { private void executeJob(AbstractJobExecutable jobExecutable, JobInfo jobInfo) { JdbcJobLock jobLock = null; - try (JobExecutor jobExecutor = new JobExecutor(jobContext, jobExecutable)) { - // Must do this check before tryJobLock - if (!checkJobStatusBeforeExecute(jobExecutable)) { + try (JobExecutor jobExecutor = new JobExecutor(jobContext, jobExecutable); + SetLogCategory ignore = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { + // Check job status + AbstractExecutable executable = (AbstractExecutable) jobExecutable; + ExecutableState jobStatus = executable.getStatus(); + if (ExecutableState.PENDING != jobStatus) { + logger.warn("Unexpected status for {} <{}>, should not execute job", jobExecutable.getJobId(), + jobStatus); + if (ExecutableState.RUNNING == jobStatus) { + jobLock = tryJobLock(jobExecutable); + if (jobLock != null) { + //Maybe other node crashed during job execution, resume job status from running to ready. + logger.warn("Resume <RUNNING> job {}", jobExecutable.getJobId()); + ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), executable.getProject()) + .resumeJob(jobExecutable.getJobId(), true); + } + } return; } @@ -396,22 +411,6 @@ public class JdbcJobScheduler implements JobScheduler { return jobLock; } - private boolean checkJobStatusBeforeExecute(AbstractJobExecutable jobExecutable) { - AbstractExecutable executable = (AbstractExecutable) jobExecutable; - ExecutableState jobStatus = executable.getStatus(); - if (ExecutableState.PENDING == jobStatus) { - return true; - } - logger.warn("Unexpected status for {} <{}>, should not execute job", jobExecutable.getJobId(), jobStatus); - if (ExecutableState.RUNNING == jobStatus) { - // there should be other nodes crashed during job execution, resume job status from running to ready - logger.warn("Resume <RUNNING> job {}", jobExecutable.getJobId()); - ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), executable.getProject()) - .resumeJob(jobExecutable.getJobId(), true); - } - return false; - } - private void stopJobLockRenewAfterExecute(JdbcJobLock jobLock) { try { String jobId = jobLock.getLockId(); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JobExecutor.java b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JobExecutor.java index 186d5732f6..0bf8b9647d 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JobExecutor.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JobExecutor.java @@ -18,6 +18,8 @@ package org.apache.kylin.job.scheduler; +import static org.apache.kylin.job.execution.ExecutableThread.JOB_THREAD_NAME_PATTERN; + import java.util.Locale; import org.apache.kylin.job.JobContext; @@ -56,7 +58,7 @@ public class JobExecutor implements AutoCloseable { private void setThreadName() { String project = jobExecutable.getProject(); String jobFlag = jobExecutable.getJobId().split("-")[0]; - Thread.currentThread().setName(String.format(Locale.ROOT, "JobExecutor(project:%s,job:%s)", project, jobFlag)); + Thread.currentThread().setName(String.format(Locale.ROOT, JOB_THREAD_NAME_PATTERN, project, jobFlag)); } private void setbackThreadName() { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java index 4c53ac2f5a..bd6bc7f548 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/util/JobContextUtil.java @@ -117,6 +117,8 @@ public class JobContextUtil { if (config.isUTEnv()) { config.setProperty("kylin.job.master-poll-interval-second", "1"); config.setProperty("kylin.job.scheduler.poll-interval-second", "1"); + config.setProperty("kylin.job.slave-lock-renew-sec", "5"); + config.setProperty("kylin.job.slave-lock-renew-ratio", "0.4"); } if (null == jobContext) { jobContext = new JobContext(); @@ -324,7 +326,7 @@ public class JobContextUtil { } public static <T> T withTxAndRetry(JdbcUtil.Callback<T> consumer) { - return withTxAndRetry(consumer, 3); + return withTxAndRetry(consumer, KylinConfig.getInstanceFromEnv().getMaxTransactionRetry()); } @SneakyThrows diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java index 560366f92a..b7c2ef7c23 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java @@ -47,7 +47,6 @@ import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.awaitility.Duration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import lombok.val; @@ -258,8 +257,8 @@ class DagExecutableTest { } @Test - @Disabled("Fixed at KE-42833") void dagExecute() throws ExecuteException { + KylinConfig.getInstanceFromEnv().setProperty("kylin.job.max-transaction-retry", "23"); val job = new DefaultExecutable(); job.setProject(DEFAULT_PROJECT); val executable1 = new SucceedDagTestExecutable(); diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index 478c873d5f..449efd4d4c 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -18,6 +18,7 @@ package org.apache.kylin.job.impl.threadpool; +import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.with; import java.util.concurrent.TimeUnit; @@ -25,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.job.JobContext; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; @@ -72,8 +74,10 @@ public abstract class BaseSchedulerTest extends NLocalFileMetadataTestCase { @After public void after() throws Exception { - JobContextUtil.cleanUp(); + JobContext jobContext = JobContextUtil.getJobContext(KylinConfig.getInstanceFromEnv()); cleanupTestMetadata(); + JobContextUtil.cleanUp(); + await().atMost(30, TimeUnit.SECONDS).until(() -> jobContext.getJobScheduler().getRunningJob().size() == 0); } protected void waitForJobFinish(String jobId) { diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java index 8c98536533..1970736bdb 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java @@ -553,7 +553,6 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest { } @Test - @Ignore("Fixed at KE-41797") public void testDiscardErrorJobBeforeSchedule() { val currMem = ResourceAcquirer.currentAvailableMem(); val dfMgr = NDataflowManager.getInstance(getTestConfig(), project); @@ -590,7 +589,6 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest { } @Test - @Ignore("Fixed at KE-41797") public void testDiscardPausedJobBeforeSchedule() { val currMem = ResourceAcquirer.currentAvailableMem(); val dfMgr = NDataflowManager.getInstance(getTestConfig(), project); diff --git a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java index fe132ed534..ea2fbb89ab 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.job.JobContext; +import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.dao.JobInfoDao; import org.apache.kylin.job.domain.JobLock; import org.apache.kylin.job.execution.AbstractExecutable; @@ -35,6 +36,7 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.JobTypeEnum; import org.apache.kylin.job.execution.SucceedChainedTestExecutable; +import org.apache.kylin.job.mapper.JobLockMapper; import org.apache.kylin.job.rest.JobMapperFilter; import org.apache.kylin.job.util.JobContextUtil; import org.apache.kylin.junit.annotation.MetadataInfo; @@ -55,7 +57,7 @@ class JdbcJobSchedulerTest { @BeforeEach public void setup() { KylinConfig config = getTestConfig(); - config.setProperty("kylin.job.slave-pull-batch-size", "1"); + config.setProperty("kylin.job.max-concurrent-jobs", "2"); config.setProperty("kylin.job.slave-lock-renew-sec", "3"); jobContext = JobContextUtil.getJobContext(config); jobInfoDao = JobContextUtil.getJobInfoDao(config); @@ -206,6 +208,39 @@ class JdbcJobSchedulerTest { await().atMost(60, TimeUnit.SECONDS).until(() -> jobContext.getJobLockMapper().selectByJobId(jobId) == null); } + @Test + void testResumeRunningJobs() { + KylinConfig config = getTestConfig(); + // Stop schedule + JobContextUtil.cleanUp(); + // Init job mappers without schedule + JobInfoDao dao = JobContextUtil.getJobInfoDao(config); + JobLockMapper mapper = (JobLockMapper) ReflectionTestUtils.getField(dao, "jobLockMapper"); + + String jobId = mockJob(); + jobInfoDao.updateJob(jobId, job -> { + ExecutableOutputPO jobOutput = job.getOutput(); + jobOutput.setStatus(ExecutableState.RUNNING.name()); + jobOutput.addStartTime(System.currentTimeMillis()); + job.getTasks().forEach(task -> task.getOutput().setStatus(ExecutableState.PENDING.name())); + return true; + }); + mapper.insertSelective(new JobLock(jobId, 3)); + // init schedule + JobContextUtil.getJobContext(config); + + await().atMost(2, TimeUnit.SECONDS).until(() -> jobInfoDao.getExecutablePOByUuid(jobId).getOutput().getStatus() + .equals(ExecutableState.READY.name())); + await().atMost(2, TimeUnit.SECONDS).until(() -> jobInfoDao.getExecutablePOByUuid(jobId).getOutput().getStatus() + .equals(ExecutableState.PENDING.name())); + await().atMost(5, TimeUnit.SECONDS).until(() -> jobInfoDao.getExecutablePOByUuid(jobId).getOutput().getStatus() + .equals(ExecutableState.RUNNING.name())); + await().atMost(2, TimeUnit.SECONDS).until(() -> jobInfoDao.getExecutablePOByUuid(jobId).getOutput().getStatus() + .equals(ExecutableState.SUCCEED.name())); + //release lock + await().atMost(5, TimeUnit.SECONDS).until(() -> jobContext.getJobLockMapper().selectByJobId(jobId) == null); + } + private String mockJob() { ExecutableManager manager = ExecutableManager.getInstance(getTestConfig(), PROJECT); AbstractExecutable job = mockExecutable(); diff --git a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java index ba4a702f02..10b9202092 100644 --- a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java +++ b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java @@ -23,8 +23,10 @@ import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLI import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; @@ -437,6 +439,54 @@ public class JobControllerTest extends NLocalFileMetadataTestCase { Mockito.verify(jobController).updateStageStatus(request); } + @Test + public void testUpdateStageStatusConcurrently() { + KylinConfig.getInstanceFromEnv().setProperty("kylin.job.max-transaction-retry", "10"); + ExecutablePO job = mockJob(ExecutableState.RUNNING); + StageRequest request = new StageRequest(); + request.setProject(job.getProject()); + request.setSegmentId(job.getTargetSegments().get(0)); + request.setTaskId(job.getId() + "_01_01"); + request.setStatus("RUNNING"); + request.setJobLastRunningStartTime(String.valueOf(job.getOutput().getLastRunningStartTime())); + + // call real methods of joInfoService + ReflectionTestUtils.setField(jobController, "jobInfoService", Mockito.spy(JobInfoService.class)); + + AtomicInteger failedCount = new AtomicInteger(); + Runnable runnable = () -> { + try { + MvcResult result = mockMvc.perform(MockMvcRequestBuilders.put("/api/jobs/stage/status") // + .contentType(MediaType.APPLICATION_JSON).content(JsonUtil.writeValueAsString(request)) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()).andReturn(); + Map<String, String> response = JsonUtil.readValueAsMap(result.getResponse().getContentAsString()); + if (!response.get("code").equals(KylinException.CODE_SUCCESS)) { + failedCount.incrementAndGet(); + } + } catch (Exception exception) { + failedCount.incrementAndGet(); + } + }; + + int repeatTime = 10; + List<Thread> threads = new ArrayList<>(); + for (int i = 0; i < repeatTime; i++) { + threads.add(new Thread(runnable)); + } + threads.forEach(Thread::start); + threads.forEach(thread -> { + try { + thread.join(); + } catch (InterruptedException e) { + failedCount.incrementAndGet(); + } + }); + + Mockito.verify(jobController, Mockito.times(repeatTime)).updateStageStatus(request); + Assert.assertEquals(0, failedCount.get()); + } + @Test public void testUpdateSparkJobTime() throws Exception { ExecutablePO job = mockJob(ExecutableState.RUNNING); diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java b/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java index 1e2dab1a1c..ca9bcef301 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java @@ -226,7 +226,9 @@ public class JobInfoService extends BasicService implements JobSupporter { public ExecutableResponse getJobInstance(String jobId) { Preconditions.checkNotNull(jobId); ExecutablePO executablePO = jobInfoDao.getExecutablePOByUuid(jobId); - Preconditions.checkNotNull(executablePO, "Can not find the job: {}", jobId); + if (executablePO == null) { + throw new KylinException(JOB_NOT_EXIST, jobId); + } ExecutableManager executableManager = getManager(ExecutableManager.class, executablePO.getProject()); AbstractExecutable executable = executableManager.fromPO(executablePO); return convert(executable, executablePO); @@ -274,7 +276,9 @@ public class JobInfoService extends BasicService implements JobSupporter { AbstractExecutable executable = getManager(ExecutableManager.class, executablePO.getProject()) .fromPO(executablePO); val convert = this.convert(executable, executablePO); - val segments = getSegments(executable); + val segments = convert.isTargetSubjectError() + ? Lists.<ExecutableResponse.SegmentResponse> newArrayList() + : getSegments(executable); convert.setSegments(segments); return convert; }).collect(Collectors.toList()); diff --git a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java index d7c19b6be0..1f9ab4db76 100644 --- a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java +++ b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java @@ -154,6 +154,8 @@ public class QueryNodeFilter extends BaseFilter { // spark report job stage status notRoutePutApiSet.add("/kylin/api/jobs/stage/status"); + notRoutePutApiSet.add("/kylin/api/jobs/spark"); + notRoutePutApiSet.add("/kylin/api/jobs/wait_and_run_time"); } @Autowired diff --git a/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java b/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java index 039bb7256b..032b70bb7a 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/KylinLogTool.java @@ -120,7 +120,7 @@ public class KylinLogTool { "kylin.smart.log", "kylin.build.log", "kylin.security.log"); private static final ExtractLogByRangeTool DEFAULT_EXTRACT_LOG_BY_RANGE = new ExtractLogByRangeTool(LOG_PATTERN, - LOG_TIME_PATTERN, SECOND_DATE_FORMAT); + LOG_TIME_PATTERN_WITH_TRACE_ID, SECOND_DATE_FORMAT); // 2019-11-11 03:24:52,342 DEBUG [JobWorker(prj:doc_smart,jobid:8a13964c)-965] // // job.NSparkExecutable : Copied metadata to the target metaUrl, // @@ -356,7 +356,7 @@ public class KylinLogTool { while ((log = br.readLine()) != null) { Matcher matcher = pattern.matcher(log); if (matcher.find()) { - return matcher.group(1); + return matcher.group(matcher.groupCount()); } } } catch (Exception e) { diff --git a/src/tool/src/test/java/org/apache/kylin/tool/KylinLogToolTest.java b/src/tool/src/test/java/org/apache/kylin/tool/KylinLogToolTest.java index db99b0b2f4..1da733439b 100644 --- a/src/tool/src/test/java/org/apache/kylin/tool/KylinLogToolTest.java +++ b/src/tool/src/test/java/org/apache/kylin/tool/KylinLogToolTest.java @@ -45,6 +45,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; +import org.springframework.test.util.ReflectionTestUtils; import lombok.val; @@ -685,4 +686,24 @@ public class KylinLogToolTest extends NLocalFileMetadataTestCase { Assert.assertTrue(new File(logDir, "instance_4").exists()); Assert.assertEquals(2, new File(logDir, "instance_4").listFiles().length); } + + @Test + public void testGetFirstLogTime() throws IOException { + File mainDir = new File(temporaryFolder.getRoot(), testName.getMethodName()); + FileUtils.forceMkdir(mainDir); + + String log1 = "traceId: eba55ae6-0936-9d42-25d2-adf00a55f06d 2023-10-25T18:16:44,803 INFO [expansion_rate] " + + "[Transaction-Thread-467] handler.AbstractJobHandler : Job JobParam"; + String log2 = "2023-10-25T18:17:35,792 INFO [JobWorker(project:expansion_rate,jobid:d9694dc4)] " + + "execution.AbstractExecutable : Execute in CHAIN mode."; + File logFile1 = new File(mainDir, "log1.log"); + File logFile2 = new File(mainDir, "log2.log"); + FileUtils.writeStringToFile(logFile1, log1); + FileUtils.writeStringToFile(logFile2, log2); + + KylinLogTool.ExtractLogByRangeTool tool = (KylinLogTool.ExtractLogByRangeTool) ReflectionTestUtils + .getField(KylinLogTool.class, "DEFAULT_EXTRACT_LOG_BY_RANGE"); + Assert.assertEquals("2023-10-25T18:16:44", tool.getFirstTimeByLogFile(logFile1)); + Assert.assertEquals("2023-10-25T18:17:35", tool.getFirstTimeByLogFile(logFile2)); + } }