This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 046c97f54c5fa582f8983c794bf0d5a4a0cfd15e Author: jlf <longfei.ji...@kyligence.io> AuthorDate: Thu Jul 6 16:10:53 2023 +0800 KYLIN-5759 Log governance of job building log 1. move build log in kylin.build.log 2. add LogConstant.SMART_CATEGORY --- .../apache/kylin/common/constant/LogConstant.java | 1 + .../kylin/common/logging/SetLogCategory.java | 22 +++- .../kylin/common/logging/SetLogCategoryTest.java | 131 +++++++++++++++++++++ .../kylin/job/execution/ExecutableThread.java | 3 +- .../org/apache/kylin/job/manager/JobManager.java | 30 +++-- .../runners/AbstractDefaultSchedulerRunner.java | 3 +- .../org/apache/kylin/job/runners/JobRunner.java | 3 +- .../rest/scheduler/AbstractSchedulerRunnable.java | 3 +- .../scheduler/AbstractSchedulerRunnableTest.java | 3 +- .../kylin/query/routing/RealizationChooser.java | 3 +- 10 files changed, 180 insertions(+), 22 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java b/src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java index 4a5293dffa..5fd9aae424 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/constant/LogConstant.java @@ -27,4 +27,5 @@ public class LogConstant { public static final String METADATA_CATEGORY = "metadata"; public static final String QUERY_CATEGORY = "query"; public static final String BUILD_CATEGORY = "build"; + public static final String SMART_CATEGORY = "smart"; } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/logging/SetLogCategory.java b/src/core-common/src/main/java/org/apache/kylin/common/logging/SetLogCategory.java index eb04aa9414..284380c8c4 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/logging/SetLogCategory.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/logging/SetLogCategory.java @@ -18,19 +18,35 @@ package org.apache.kylin.common.logging; import java.io.Closeable; +import java.util.Deque; +import java.util.LinkedList; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.ThreadContext; +import org.springframework.util.CollectionUtils; public class SetLogCategory implements Closeable { - private static final String KEY = "logCategory"; + private static final String LOG_CATEGORY = "logCategory"; + private static final ThreadLocal<Deque<String>> categoryThreadLocal = ThreadLocal.withInitial(LinkedList::new); + // when category exist, and use new category to logger message, will push oldSetLogCategory public SetLogCategory(String category) { - ThreadContext.put(KEY, category); + String oldCategory = ThreadContext.get(LOG_CATEGORY); + ThreadContext.put(LOG_CATEGORY, category); + if (StringUtils.isNotBlank(oldCategory)) { + categoryThreadLocal.get().offerFirst(oldCategory); + } } @Override public void close() { - ThreadContext.remove(KEY); + ThreadContext.remove(LOG_CATEGORY); + if (!CollectionUtils.isEmpty(categoryThreadLocal.get())) { + String oldCategory = categoryThreadLocal.get().pollFirst(); + if (StringUtils.isNotBlank(oldCategory)) { + ThreadContext.put(LOG_CATEGORY, oldCategory); + } + } } } diff --git a/src/core-common/src/test/java/org/apache/kylin/common/logging/SetLogCategoryTest.java b/src/core-common/src/test/java/org/apache/kylin/common/logging/SetLogCategoryTest.java new file mode 100644 index 0000000000..b447cf65a8 --- /dev/null +++ b/src/core-common/src/test/java/org/apache/kylin/common/logging/SetLogCategoryTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.logging; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Deque; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.kylin.common.KylinConfigMultithreadingTest; +import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.logging.log4j.ThreadContext; +import org.awaitility.Duration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; + +import lombok.val; + +class SetLogCategoryTest { + @AfterEach + @BeforeEach + void beforeAndAfter() { + ThreadContext.remove("logCategory"); + } + + @Test + void testMulti() throws Exception { + val callables = Lists.<Callable<String>> newArrayList(); + for (int i = 0; i < 20; i++) { + Callable<String> callable = () -> { + checkSetLogCategory(); + return "ok"; + }; + callables.add(callable); + } + KylinConfigMultithreadingTest.concurrentTest(100, 10, callables); + } + + private void checkSetLogCategory() { + checkResult(null, Lists.newArrayList(), 0); + val category0 = RandomUtil.randomUUIDStr(); + try (SetLogCategory logCategory = new SetLogCategory(category0)) { + checkResult(category0, Lists.newArrayList(logCategory), 0); + } + checkResult(null, Lists.newArrayList(), 0); + + val category1 = RandomUtil.randomUUIDStr(); + val category2 = RandomUtil.randomUUIDStr(); + val category3 = RandomUtil.randomUUIDStr(); + val category4 = RandomUtil.randomUUIDStr(); + val category5 = RandomUtil.randomUUIDStr(); + try (SetLogCategory logCategory1 = new SetLogCategory(category1)) { + checkResult(category1, Lists.newArrayList(logCategory1), 0); + + try (SetLogCategory logCategory2 = new SetLogCategory(category2)) { + checkResult(category2, Lists.newArrayList(logCategory1, logCategory2), 1); + } + checkResult(category1, Lists.newArrayList(logCategory1), 0); + + try (SetLogCategory logCategory3 = new SetLogCategory(category3)) { + checkResult(category3, Lists.newArrayList(logCategory1, logCategory3), 1); + + try (SetLogCategory logCategory4 = new SetLogCategory(category4)) { + checkResult(category4, Lists.newArrayList(logCategory1, logCategory3, logCategory4), 2); + + try (SetLogCategory logCategory5 = new SetLogCategory(category5)) { + checkResult(category5, + Lists.newArrayList(logCategory1, logCategory3, logCategory4, logCategory5), 3); + } + + checkResult(category4, Lists.newArrayList(logCategory1, logCategory3, logCategory4), 2); + } + + checkResult(category3, Lists.newArrayList(logCategory1, logCategory3), 1); + } + + checkResult(category1, Lists.newArrayList(logCategory1), 0); + + } + + checkResult(null, Lists.newArrayList(), 0); + } + + private void checkResult(String category, List<SetLogCategory> logCategories, int size) { + checkCategory(category); + checkCategoryThreadLocal(logCategories, size); + } + + private void checkCategory(String category) { + await().pollDelay(new Duration(RandomUtil.nextInt(90, 100), TimeUnit.MILLISECONDS)).until(() -> true); + assertEquals(category, ThreadContext.get("logCategory")); + } + + private void checkCategoryThreadLocal(List<SetLogCategory> logCategories, int size) { + for (SetLogCategory logCategory : logCategories) { + val categoryThreadLocal = (ThreadLocal<Deque<String>>) ReflectionTestUtils.getField(logCategory, + "categoryThreadLocal"); + assertNotNull(categoryThreadLocal); + assertEquals(size, categoryThreadLocal.get().size()); + + } + + val categoryThreadLocalStatic = (ThreadLocal<Deque<String>>) ReflectionTestUtils.getField(SetLogCategory.class, + "categoryThreadLocal"); + assertNotNull(categoryThreadLocalStatic); + assertEquals(size, categoryThreadLocalStatic.get().size()); + } +} \ No newline at end of file diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java index 162eed1daa..a885a5a072 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableThread.java @@ -20,6 +20,7 @@ package org.apache.kylin.job.execution; import java.util.Map; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.util.SetThreadName; @@ -48,7 +49,7 @@ public class ExecutableThread extends Thread { val jobIdSimple = dagExecutable.getId().split("-")[0]; val project = dagExecutable.getProject(); try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple); - SetLogCategory logCategory = new SetLogCategory("schedule")) { + SetLogCategory ignore = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { context.addRunningJob(executable); context.addRunningJobThread(executable); dagExecutable.executeDagExecutable(dagExecutablesMap, executable, context); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/manager/JobManager.java b/src/core-job/src/main/java/org/apache/kylin/job/manager/JobManager.java index a1426c8625..e3538da977 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/manager/JobManager.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/manager/JobManager.java @@ -26,7 +26,9 @@ import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.job.common.ExecutableUtil; import org.apache.kylin.job.common.SegmentUtil; import org.apache.kylin.job.execution.JobTypeEnum; @@ -119,20 +121,22 @@ public class JobManager { } public String addJob(JobParam jobParam, AbstractJobHandler handler) { - if (!config.isJobNode() && !config.isUTEnv()) { - throw new KylinException(JOB_CREATE_ABANDON); + try(SetLogCategory ignored = new SetLogCategory(LogConstant.BUILD_CATEGORY)){ + if (!config.isJobNode() && !config.isUTEnv()) { + throw new KylinException(JOB_CREATE_ABANDON); + } + checkNotNull(project); + checkStorageQuota(project); + jobParam.setProject(project); + ExecutableUtil.computeParams(jobParam); + + AbstractJobHandler localHandler = handler != null ? handler : createJobHandler(jobParam); + if (localHandler == null) + return null; + + localHandler.handle(jobParam); + return jobParam.getJobId(); } - checkNotNull(project); - checkStorageQuota(project); - jobParam.setProject(project); - ExecutableUtil.computeParams(jobParam); - - AbstractJobHandler localHandler = handler != null ? handler : createJobHandler(jobParam); - if (localHandler == null) - return null; - - localHandler.handle(jobParam); - return jobParam.getJobId(); } private AbstractJobHandler createJobHandler(JobParam jobParam) { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java index 047db1f31c..c8d6d38f2f 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java @@ -18,6 +18,7 @@ package org.apache.kylin.job.runners; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.job.execution.ExecutableContext; @@ -81,7 +82,7 @@ public abstract class AbstractDefaultSchedulerRunner implements Runnable { @Override public void run() { - try (SetLogCategory ignored = new SetLogCategory("schedule")) { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { if (checkEpochIdFailed()) { return; } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java index b8cd19bed0..48b0638c8f 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java @@ -18,6 +18,7 @@ package org.apache.kylin.job.runners; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.logging.SetLogCategory; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.job.exception.ExecuteException; @@ -49,7 +50,7 @@ public class JobRunner extends AbstractDefaultSchedulerRunner { //only the first 8 chars of the job uuid val jobIdSimple = executable.getId().substring(0, 8); try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple); - SetLogCategory logCategory = new SetLogCategory("schedule")) { + SetLogCategory ignore = new SetLogCategory(LogConstant.BUILD_CATEGORY)) { context.addRunningJobThread(executable); executable.execute(context); // trigger the next step asap diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AbstractSchedulerRunnable.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AbstractSchedulerRunnable.java index fbe04fae0a..27b4b9ea07 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AbstractSchedulerRunnable.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/scheduler/AbstractSchedulerRunnable.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.logging.SetLogCategory; import org.springframework.web.client.RestTemplate; @@ -46,7 +47,7 @@ public abstract class AbstractSchedulerRunnable implements Runnable { @Override public void run() { - try (SetLogCategory ignored = new SetLogCategory("schedule")) { + try (SetLogCategory ignored = new SetLogCategory(LogConstant.SCHEDULE_CATEGORY)) { execute(); } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AbstractSchedulerRunnableTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AbstractSchedulerRunnableTest.java index 26ee319ef0..ed1aeaa525 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AbstractSchedulerRunnableTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/scheduler/AbstractSchedulerRunnableTest.java @@ -20,6 +20,7 @@ package org.apache.kylin.rest.scheduler; import static org.junit.jupiter.api.Assertions.assertEquals; +import org.apache.kylin.common.constant.LogConstant; import org.apache.logging.log4j.ThreadContext; import org.junit.jupiter.api.Test; @@ -33,7 +34,7 @@ class AbstractSchedulerRunnableTest { @Override protected void execute() { val logCategory = ThreadContext.get("logCategory"); - assertEquals("schedule", logCategory); + assertEquals(LogConstant.SCHEDULE_CATEGORY, logCategory); } }; abstractSchedulerRunnable.run(); diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java index 7ae0238f04..768edd91ff 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java @@ -58,6 +58,7 @@ import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.common.exception.ServerErrorCode; import org.apache.kylin.common.logging.SetLogCategory; @@ -175,7 +176,7 @@ public class RealizationChooser { String queryId = QueryContext.current().getQueryId(); try (KylinConfig.SetAndUnsetThreadLocalConfig ignored0 = KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig); SetThreadName ignored1 = new SetThreadName(Thread.currentThread().getName() + " QueryId %s", queryId); - SetLogCategory ignored2 = new SetLogCategory("query")) { + SetLogCategory ignored2 = new SetLogCategory(LogConstant.QUERY_CATEGORY)) { String project = ctx.olapSchema.getProjectName(); NTableMetadataManager.getInstance(kylinConfig, project);