This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5_beta in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 1ee47662e0374e33782b2c3a7ecca606d1474cfc Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com> AuthorDate: Fri Apr 28 19:29:56 2023 +0800 KYLIN-5652 Fix out-of-order invocation of listeners on project epoch --- .../apache/kylin/rest/config/AppInitializer.java | 3 + .../config/initialize/EpochChangedListener.java | 166 ++++++++++++--------- .../common/scheduler/ProjectSerialEventBus.java | 124 +++++++++++++++ .../common/scheduler/SchedulerEventNotifier.java | 11 +- .../scheduler/ProjectSerialEventBusTest.java | 112 ++++++++++++++ .../apache/kylin/metadata/epoch/EpochManager.java | 9 +- 6 files changed, 351 insertions(+), 74 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java index 6fc2ff9e5d..b34dcb6b2b 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java @@ -33,6 +33,7 @@ import org.apache.kylin.common.persistence.metadata.EpochStore; import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore; import org.apache.kylin.common.persistence.transaction.EventListenerRegistry; import org.apache.kylin.common.scheduler.EventBusFactory; +import org.apache.kylin.common.scheduler.ProjectSerialEventBus; import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.HostInfoFetcher; import org.apache.kylin.engine.spark.filter.QueryFiltersCollector; @@ -185,6 +186,8 @@ public class AppInitializer { new Date(System.currentTimeMillis() + kylinConfig.getGuardianHACheckInitDelay() * Constant.SECOND), kylinConfig.getGuardianHACheckInterval() * Constant.SECOND); } + + taskScheduler.scheduleAtFixedRate(new ProjectSerialEventBus.TimingDispatcher(), ProjectSerialEventBus.TimingDispatcher.INTERVAL); } private void postInit() { diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java index 788360f95c..8b7246806a 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java @@ -19,13 +19,17 @@ package org.apache.kylin.rest.config.initialize; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinRuntimeException; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.scheduler.EpochStartedNotifier; import org.apache.kylin.common.scheduler.ProjectControlledNotifier; import org.apache.kylin.common.scheduler.ProjectEscapedNotifier; +import org.apache.kylin.common.scheduler.SchedulerEventNotifier; +import org.apache.kylin.guava30.shaded.common.eventbus.Subscribe; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.NExecutableManager; import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; @@ -44,7 +48,6 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; -import org.apache.kylin.guava30.shaded.common.eventbus.Subscribe; import lombok.val; import lombok.extern.slf4j.Slf4j; @@ -70,81 +73,37 @@ public class EpochChangedListener { RecommendationTopNUpdateScheduler recommendationUpdateScheduler; @Subscribe - public void onProjectControlled(ProjectControlledNotifier notifier) throws IOException { - String project = notifier.getProject(); - val kylinConfig = KylinConfig.getInstanceFromEnv(); - val epochManager = EpochManager.getInstance(); - if (!GLOBAL.equals(project)) { - - if (!EpochManager.getInstance().checkEpochValid(project)) { - log.warn("epoch:{} is invalid in project controlled", project); - return; - } - - val oldScheduler = NDefaultScheduler.getInstance(project); - - if (oldScheduler.hasStarted() - && epochManager.checkEpochId(oldScheduler.getContext().getEpochId(), project)) { - return; + public void onProjectControlled(ProjectControlledNotifier notifier) { + wrapForCallbackInvocation(notifier, eventNotifier -> { + String project = notifier.getProject(); + val kylinConfig = KylinConfig.getInstanceFromEnv(); + val epochManager = EpochManager.getInstance(); + if (!GLOBAL.equals(project)) { + doOnProjectControlled(project, kylinConfig, epochManager); + } else { + doOnGlobalControlled(); } - - // if epoch id check failed, shutdown first - if (oldScheduler.hasStarted()) { - oldScheduler.forceShutdown(); - } - - log.info("start thread of project: {}", project); - NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project); - EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - scheduler.init(new JobEngineConfig(kylinConfig)); - if (!scheduler.hasStarted()) { - throw new RuntimeException("Scheduler for " + project + " has not been started"); - } - StreamingScheduler ss = StreamingScheduler.getInstance(project); - ss.init(); - if (!ss.getHasStarted().get()) { - throw new RuntimeException("Streaming Scheduler for " + project + " has not been started"); - } - QueryHistoryTaskScheduler qhAccelerateScheduler = QueryHistoryTaskScheduler.getInstance(project); - qhAccelerateScheduler.init(); - - if (!qhAccelerateScheduler.hasStarted()) { - throw new RuntimeException( - "Query history accelerate scheduler for " + project + " has not been started"); - } - recommendationUpdateScheduler.addProject(project); - return 0; - }, project, 1); - scheduler.setHasFinishedTransactions(new AtomicBoolean(true)); - } else { - //TODO need global leader - CreateAdminUserUtils.createAllAdmins(userService, env); - InitUserGroupUtils.initUserGroups(env); - UnitOfWork.doInTransactionWithRetry(() -> { - ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).createMetaStoreUuidIfNotExist(); - return null; - }, "", 1); - InitResourceGroupUtils.initResourceGroup(); - userAclService.syncAdminUserAcl(); - } + }); } @Subscribe public void onProjectEscaped(ProjectEscapedNotifier notifier) { - String project = notifier.getProject(); - val kylinConfig = KylinConfig.getInstanceFromEnv(); - if (!GLOBAL.equals(project)) { - log.info("Shutdown related thread: {}", project); - try { - NExecutableManager.getInstance(kylinConfig, project).destoryAllProcess(); - QueryHistoryTaskScheduler.shutdownByProject(project); - NDefaultScheduler.shutdownByProject(project); - StreamingScheduler.shutdownByProject(project); - recommendationUpdateScheduler.removeProject(project); - } catch (Exception e) { - log.warn("error when shutdown " + project + " thread", e); + wrapForCallbackInvocation(notifier, eventNotifier -> { + String project = eventNotifier.getProject(); + val kylinConfig = KylinConfig.getInstanceFromEnv(); + if (!GLOBAL.equals(project)) { + log.info("Shutdown related thread: {}", project); + try { + NExecutableManager.getInstance(kylinConfig, project).destoryAllProcess(); + QueryHistoryTaskScheduler.shutdownByProject(project); + NDefaultScheduler.shutdownByProject(project); + StreamingScheduler.shutdownByProject(project); + recommendationUpdateScheduler.removeProject(project); + } catch (Exception e) { + log.warn("error when shutdown " + project + " thread", e); + } } - } + }); } @Subscribe @@ -153,4 +112,71 @@ public class EpochChangedListener { val resourceStore = ResourceStore.getKylinMetaStore(kylinConfig); resourceStore.leaderCatchup(); } + + private void wrapForCallbackInvocation(SchedulerEventNotifier notifier, Consumer<SchedulerEventNotifier> consumer) { + try { + consumer.accept(notifier); + } finally { + notifier.invokeCallbackIfExists(); + } + } + + private void doOnGlobalControlled() { + //TODO need global leader + try { + CreateAdminUserUtils.createAllAdmins(userService, env); + } catch (IOException e) { + throw new KylinRuntimeException(e); + } + InitUserGroupUtils.initUserGroups(env); + UnitOfWork.doInTransactionWithRetry(() -> { + ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).createMetaStoreUuidIfNotExist(); + return null; + }, "", 1); + InitResourceGroupUtils.initResourceGroup(); + userAclService.syncAdminUserAcl(); + } + + private void doOnProjectControlled(String project, KylinConfig kylinConfig, EpochManager epochManager) { + if (!EpochManager.getInstance().checkEpochValid(project)) { + log.warn("epoch:{} is invalid in project controlled", project); + return; + } + + val oldScheduler = NDefaultScheduler.getInstance(project); + + if (oldScheduler.hasStarted() + && epochManager.checkEpochId(oldScheduler.getContext().getEpochId(), project)) { + return; + } + + // if epoch id check failed, shutdown first + if (oldScheduler.hasStarted()) { + oldScheduler.forceShutdown(); + } + + log.info("start thread of project: {}", project); + NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project); + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + scheduler.init(new JobEngineConfig(kylinConfig)); + if (!scheduler.hasStarted()) { + throw new KylinRuntimeException("Scheduler for " + project + " has not been started"); + } + StreamingScheduler ss = StreamingScheduler.getInstance(project); + ss.init(); + if (!ss.getHasStarted().get()) { + throw new KylinRuntimeException("Streaming Scheduler for " + project + " has not been started"); + } + QueryHistoryTaskScheduler qhAccelerateScheduler = QueryHistoryTaskScheduler.getInstance(project); + qhAccelerateScheduler.init(); + + if (!qhAccelerateScheduler.hasStarted()) { + throw new KylinRuntimeException( + "Query history accelerate scheduler for " + project + " has not been started"); + } + recommendationUpdateScheduler.addProject(project); + return 0; + }, project, 1); + scheduler.setHasFinishedTransactions(new AtomicBoolean(true)); + } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/ProjectSerialEventBus.java b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/ProjectSerialEventBus.java new file mode 100644 index 0000000000..5a2d27913d --- /dev/null +++ b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/ProjectSerialEventBus.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.scheduler; + +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.kylin.common.Singletons; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; +import java.util.function.Consumer; + +@Slf4j +public class ProjectSerialEventBus { + + public static class TimingDispatcher implements Runnable { + + public static final Duration INTERVAL = Duration.ofMinutes(10L); + + @Override + public void run() { + log.info("ProjectSerialEventBus.TimingDispatcher invokes dispatch"); + ProjectSerialEventBus.getInstance().dispatch(); + } + } + + // timeout 30 minutes + private static final long TIMEOUT_MILLISECONDS = 30L * 60L * 1000L; + + public static ProjectSerialEventBus getInstance() { + return Singletons.getInstance(ProjectSerialEventBus.class); + } + + private final EventBusFactory eventBus = EventBusFactory.getInstance(); + private final Queue<SchedulerEventNotifier> eventsQueue = new LinkedList<>(); + private final Set<RunningProject> runningProjects = new HashSet<>(); + private final Consumer<SchedulerEventNotifier> finishProjectCallback = event -> finishProjectAndDispatch(event.getProject()); + + private ProjectSerialEventBus() {} + + public synchronized void postAsync(SchedulerEventNotifier event) { + log.info("Post event {} on ProjectSerialEventBus", event); + event.setCallback(finishProjectCallback); + eventsQueue.add(event); + if (!runningProjects.contains(RunningProject.wrapForComparison(event.getProject()))) { + dispatch(); + } + } + + public synchronized void dispatch() { + // Remove expired running projects at first + runningProjects.removeIf(RunningProject::isExpired); + // Try dispatch events + Iterator<SchedulerEventNotifier> it = eventsQueue.iterator(); + while (it.hasNext()) { + SchedulerEventNotifier e = it.next(); + String project = e.getProject(); + if (!runningProjects.contains(RunningProject.wrapForComparison(project))) { + log.info("ProjectSerialEventBus dispatch event: {}", e); + eventBus.postAsync(e); + runningProjects.add(RunningProject.newInstance(project)); + it.remove(); + } + } + } + + public synchronized void finishProjectAndDispatch(String project) { + log.info("ProjectSerialEventBus project({}) event finished", project); + runningProjects.remove(RunningProject.wrapForComparison(project)); + dispatch(); + } + + @EqualsAndHashCode + @ToString + static class RunningProject { + + static RunningProject newInstance(String project) { + return new RunningProject(project, System.currentTimeMillis()); + } + + static RunningProject wrapForComparison(String project) { + return new RunningProject(project, -1L); + } + + private final String project; + + @EqualsAndHashCode.Exclude + private final long beginTime; + + private RunningProject(String project, long beginTime) { + this.project = project; + this.beginTime = beginTime; + } + + boolean isExpired() { + if (System.currentTimeMillis() - beginTime > TIMEOUT_MILLISECONDS) { + log.warn("ProjectSerialEventBus RunningProject expired: {}", this); + return true; + } + return false; + } + } +} diff --git a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/SchedulerEventNotifier.java b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/SchedulerEventNotifier.java index 9fe90833d8..ecc9c8de86 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/SchedulerEventNotifier.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/SchedulerEventNotifier.java @@ -19,6 +19,7 @@ package org.apache.kylin.common.scheduler; import java.util.Locale; +import java.util.function.Consumer; import lombok.AllArgsConstructor; import lombok.Getter; @@ -33,13 +34,21 @@ public class SchedulerEventNotifier { protected String project; protected String subject; + protected Consumer<SchedulerEventNotifier> callback; public String getEventType() { return this.getClass().getSimpleName(); } + public void invokeCallbackIfExists() { + if (callback != null) { + callback.accept(this); + } + } + @Override public String toString() { - return String.format(Locale.ROOT, "%s {project=%s, subject=%s}", getEventType(), project, subject); + return String.format(Locale.ROOT, "%s {project=%s, subject=%s, callback=%s}", getEventType(), project, + subject, callback); } } diff --git a/src/core-common/src/test/java/org/apache/kylin/common/scheduler/ProjectSerialEventBusTest.java b/src/core-common/src/test/java/org/apache/kylin/common/scheduler/ProjectSerialEventBusTest.java new file mode 100644 index 0000000000..77cdd62adb --- /dev/null +++ b/src/core-common/src/test/java/org/apache/kylin/common/scheduler/ProjectSerialEventBusTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.scheduler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.LinkedList; +import java.util.Set; + +import org.apache.kylin.junit.annotation.MetadataInfo; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; + +@MetadataInfo +public class ProjectSerialEventBusTest { + + private ProjectSerialEventBus projectSerialEventBus; + + private EventBusFactory eventBusFactory; + + @BeforeEach + public void setUp() throws Exception { + projectSerialEventBus = spy(ProjectSerialEventBus.getInstance()); + eventBusFactory = mock(EventBusFactory.class); + ReflectionTestUtils.setField(projectSerialEventBus, "eventBus", eventBusFactory); + } + + @Test + @SuppressWarnings("unchecked") + void testInnerQueueStatus() throws Exception { + LinkedList<SchedulerEventNotifier> innerEventsQueue = (LinkedList<SchedulerEventNotifier>) ReflectionTestUtils + .getField(projectSerialEventBus, "eventsQueue"); + Set<ProjectSerialEventBus.RunningProject> runningProjects = (Set<ProjectSerialEventBus.RunningProject>) ReflectionTestUtils + .getField(projectSerialEventBus, "runningProjects"); + + projectSerialEventBus.postAsync(new ProjectEscapedNotifier("project1")); + assertEquals(1, runningProjects.size()); + projectSerialEventBus.postAsync(new ProjectControlledNotifier("project1")); + assertEquals(1, runningProjects.size()); + projectSerialEventBus.postAsync(new ProjectEscapedNotifier("project2")); + assertEquals(2, runningProjects.size()); + projectSerialEventBus.postAsync(new ProjectControlledNotifier("project2")); + assertEquals(2, runningProjects.size()); + + verify(projectSerialEventBus, times(2)).dispatch(); + assertEquals(2, innerEventsQueue.size()); + assertEquals("project1", innerEventsQueue.get(0).getProject()); + assertTrue(innerEventsQueue.get(0) instanceof ProjectControlledNotifier); + assertEquals("project2", innerEventsQueue.get(1).getProject()); + assertTrue(innerEventsQueue.get(1) instanceof ProjectControlledNotifier); + + projectSerialEventBus.finishProjectAndDispatch("project1"); + assertEquals(2, runningProjects.size()); + projectSerialEventBus.finishProjectAndDispatch("project2"); + assertEquals(2, runningProjects.size()); + + assertEquals(0, innerEventsQueue.size()); + + projectSerialEventBus.finishProjectAndDispatch("project1"); + assertEquals(1, runningProjects.size()); + projectSerialEventBus.finishProjectAndDispatch("project2"); + assertEquals(0, runningProjects.size()); + } + + @Test + @SuppressWarnings("unchecked") + void testTimingDispatcherRun() { + LinkedList<SchedulerEventNotifier> innerEventsQueue = (LinkedList<SchedulerEventNotifier>) ReflectionTestUtils + .getField(projectSerialEventBus, "eventsQueue"); + Set<ProjectSerialEventBus.RunningProject> runningProjects = (Set<ProjectSerialEventBus.RunningProject>) ReflectionTestUtils + .getField(projectSerialEventBus, "runningProjects"); + + innerEventsQueue.add(new ProjectEscapedNotifier("project1")); + assertEquals(1, innerEventsQueue.size()); + assertEquals(0, runningProjects.size()); + + new ProjectSerialEventBus.TimingDispatcher().run(); + + assertEquals(0, innerEventsQueue.size()); + assertEquals(1, runningProjects.size()); + } + + @Test + void testRunningProjectExpired() { + ProjectSerialEventBus.RunningProject rp = ProjectSerialEventBus.RunningProject.newInstance("project1"); + final long DURATION_31_MINUTES_MILLIS = 31L * 60L * 1000L; + ReflectionTestUtils.setField(rp, "beginTime", System.currentTimeMillis() - DURATION_31_MINUTES_MILLIS); + assertTrue(rp.isExpired()); + } +} diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java index 623cabbc41..5f8e868886 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java @@ -54,6 +54,7 @@ import org.apache.kylin.common.scheduler.EpochStartedNotifier; import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.common.scheduler.ProjectControlledNotifier; import org.apache.kylin.common.scheduler.ProjectEscapedNotifier; +import org.apache.kylin.common.scheduler.ProjectSerialEventBus; import org.apache.kylin.common.scheduler.SourceUsageVerifyNotifier; import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.NamedThreadFactory; @@ -95,6 +96,7 @@ public class EpochManager { private final KylinConfig config; private String identity; private final EventBusFactory eventBusFactory; + private final ProjectSerialEventBus projectSerialEventBus; private final String serverMode; private final boolean epochCheckEnabled; private final long epochExpiredTime; @@ -111,6 +113,7 @@ public class EpochManager { this.config = KylinConfig.readSystemKylinConfig(); this.identity = EpochOrchestrator.getOwnerIdentity(); this.eventBusFactory = EventBusFactory.getInstance(); + this.projectSerialEventBus = ProjectSerialEventBus.getInstance(); this.epochStore = EpochStore.getEpochStore(config); this.serverMode = config.getServerMode(); this.epochCheckEnabled = config.getEpochCheckerEnabled(); @@ -358,7 +361,7 @@ public class EpochManager { } for (String project : escapedProjects) { - eventBusFactory.postAsync(new ProjectEscapedNotifier(project)); + projectSerialEventBus.postAsync(new ProjectEscapedNotifier(project)); } try (SetLogCategory ignored = new SetLogCategory(LogConstant.METADATA_CATEGORY)) { @@ -379,7 +382,7 @@ public class EpochManager { logger.debug("after {} controlled projects: {}", updateTypeName, String.join(",", newControlledProjects)); } - newControlledProjects.forEach(p -> eventBusFactory.postAsync(new ProjectControlledNotifier(p))); + newControlledProjects.forEach(p -> projectSerialEventBus.postAsync(new ProjectControlledNotifier(p))); } } @@ -680,7 +683,7 @@ public class EpochManager { public void updateEpochWithNotifier(String epochTarget, boolean force) { EpochUpdateLockManager.executeEpochWithLock(epochTarget, () -> { if (tryUpdateEpoch(epochTarget, force)) { - eventBusFactory.postAsync(new ProjectControlledNotifier(epochTarget)); + projectSerialEventBus.postAsync(new ProjectControlledNotifier(epochTarget)); } return null; });