This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5_beta
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1ee47662e0374e33782b2c3a7ecca606d1474cfc
Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com>
AuthorDate: Fri Apr 28 19:29:56 2023 +0800

    KYLIN-5652 Fix out-of-order invocation of listeners on project epoch
---
 .../apache/kylin/rest/config/AppInitializer.java   |   3 +
 .../config/initialize/EpochChangedListener.java    | 166 ++++++++++++---------
 .../common/scheduler/ProjectSerialEventBus.java    | 124 +++++++++++++++
 .../common/scheduler/SchedulerEventNotifier.java   |  11 +-
 .../scheduler/ProjectSerialEventBusTest.java       | 112 ++++++++++++++
 .../apache/kylin/metadata/epoch/EpochManager.java  |   9 +-
 6 files changed, 351 insertions(+), 74 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
index 6fc2ff9e5d..b34dcb6b2b 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
@@ -33,6 +33,7 @@ import 
org.apache.kylin.common.persistence.metadata.EpochStore;
 import org.apache.kylin.common.persistence.metadata.JdbcAuditLogStore;
 import org.apache.kylin.common.persistence.transaction.EventListenerRegistry;
 import org.apache.kylin.common.scheduler.EventBusFactory;
+import org.apache.kylin.common.scheduler.ProjectSerialEventBus;
 import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.HostInfoFetcher;
 import org.apache.kylin.engine.spark.filter.QueryFiltersCollector;
@@ -185,6 +186,8 @@ public class AppInitializer {
                     new Date(System.currentTimeMillis() + 
kylinConfig.getGuardianHACheckInitDelay() * Constant.SECOND),
                     kylinConfig.getGuardianHACheckInterval() * 
Constant.SECOND);
         }
+
+        taskScheduler.scheduleAtFixedRate(new 
ProjectSerialEventBus.TimingDispatcher(), 
ProjectSerialEventBus.TimingDispatcher.INTERVAL);
     }
 
     private void postInit() {
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
index 788360f95c..8b7246806a 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
@@ -19,13 +19,17 @@ package org.apache.kylin.rest.config.initialize;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.scheduler.EpochStartedNotifier;
 import org.apache.kylin.common.scheduler.ProjectControlledNotifier;
 import org.apache.kylin.common.scheduler.ProjectEscapedNotifier;
+import org.apache.kylin.common.scheduler.SchedulerEventNotifier;
+import org.apache.kylin.guava30.shaded.common.eventbus.Subscribe;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.NExecutableManager;
 import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
@@ -44,7 +48,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
-import org.apache.kylin.guava30.shaded.common.eventbus.Subscribe;
 import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
@@ -70,81 +73,37 @@ public class EpochChangedListener {
     RecommendationTopNUpdateScheduler recommendationUpdateScheduler;
 
     @Subscribe
-    public void onProjectControlled(ProjectControlledNotifier notifier) throws 
IOException {
-        String project = notifier.getProject();
-        val kylinConfig = KylinConfig.getInstanceFromEnv();
-        val epochManager = EpochManager.getInstance();
-        if (!GLOBAL.equals(project)) {
-
-            if (!EpochManager.getInstance().checkEpochValid(project)) {
-                log.warn("epoch:{} is invalid in project controlled", project);
-                return;
-            }
-
-            val oldScheduler = NDefaultScheduler.getInstance(project);
-
-            if (oldScheduler.hasStarted()
-                    && 
epochManager.checkEpochId(oldScheduler.getContext().getEpochId(), project)) {
-                return;
+    public void onProjectControlled(ProjectControlledNotifier notifier) {
+        wrapForCallbackInvocation(notifier, eventNotifier -> {
+            String project = notifier.getProject();
+            val kylinConfig = KylinConfig.getInstanceFromEnv();
+            val epochManager = EpochManager.getInstance();
+            if (!GLOBAL.equals(project)) {
+                doOnProjectControlled(project, kylinConfig, epochManager);
+            } else {
+                doOnGlobalControlled();
             }
-
-            // if epoch id check failed, shutdown first
-            if (oldScheduler.hasStarted()) {
-                oldScheduler.forceShutdown();
-            }
-
-            log.info("start thread of project: {}", project);
-            NDefaultScheduler scheduler = 
NDefaultScheduler.getInstance(project);
-            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-                scheduler.init(new JobEngineConfig(kylinConfig));
-                if (!scheduler.hasStarted()) {
-                    throw new RuntimeException("Scheduler for " + project + " 
has not been started");
-                }
-                StreamingScheduler ss = 
StreamingScheduler.getInstance(project);
-                ss.init();
-                if (!ss.getHasStarted().get()) {
-                    throw new RuntimeException("Streaming Scheduler for " + 
project + " has not been started");
-                }
-                QueryHistoryTaskScheduler qhAccelerateScheduler = 
QueryHistoryTaskScheduler.getInstance(project);
-                qhAccelerateScheduler.init();
-
-                if (!qhAccelerateScheduler.hasStarted()) {
-                    throw new RuntimeException(
-                            "Query history accelerate scheduler for " + 
project + " has not been started");
-                }
-                recommendationUpdateScheduler.addProject(project);
-                return 0;
-            }, project, 1);
-            scheduler.setHasFinishedTransactions(new AtomicBoolean(true));
-        } else {
-            //TODO need global leader
-            CreateAdminUserUtils.createAllAdmins(userService, env);
-            InitUserGroupUtils.initUserGroups(env);
-            UnitOfWork.doInTransactionWithRetry(() -> {
-                
ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).createMetaStoreUuidIfNotExist();
-                return null;
-            }, "", 1);
-            InitResourceGroupUtils.initResourceGroup();
-            userAclService.syncAdminUserAcl();
-        }
+        });
     }
 
     @Subscribe
     public void onProjectEscaped(ProjectEscapedNotifier notifier) {
-        String project = notifier.getProject();
-        val kylinConfig = KylinConfig.getInstanceFromEnv();
-        if (!GLOBAL.equals(project)) {
-            log.info("Shutdown related thread: {}", project);
-            try {
-                NExecutableManager.getInstance(kylinConfig, 
project).destoryAllProcess();
-                QueryHistoryTaskScheduler.shutdownByProject(project);
-                NDefaultScheduler.shutdownByProject(project);
-                StreamingScheduler.shutdownByProject(project);
-                recommendationUpdateScheduler.removeProject(project);
-            } catch (Exception e) {
-                log.warn("error when shutdown " + project + " thread", e);
+        wrapForCallbackInvocation(notifier, eventNotifier -> {
+            String project = eventNotifier.getProject();
+            val kylinConfig = KylinConfig.getInstanceFromEnv();
+            if (!GLOBAL.equals(project)) {
+                log.info("Shutdown related thread: {}", project);
+                try {
+                    NExecutableManager.getInstance(kylinConfig, 
project).destoryAllProcess();
+                    QueryHistoryTaskScheduler.shutdownByProject(project);
+                    NDefaultScheduler.shutdownByProject(project);
+                    StreamingScheduler.shutdownByProject(project);
+                    recommendationUpdateScheduler.removeProject(project);
+                } catch (Exception e) {
+                    log.warn("error when shutdown " + project + " thread", e);
+                }
             }
-        }
+        });
     }
 
     @Subscribe
@@ -153,4 +112,71 @@ public class EpochChangedListener {
         val resourceStore = ResourceStore.getKylinMetaStore(kylinConfig);
         resourceStore.leaderCatchup();
     }
+
+    private void wrapForCallbackInvocation(SchedulerEventNotifier notifier, 
Consumer<SchedulerEventNotifier> consumer) {
+        try {
+            consumer.accept(notifier);
+        } finally {
+            notifier.invokeCallbackIfExists();
+        }
+    }
+
+    private void doOnGlobalControlled() {
+        //TODO need global leader
+        try {
+            CreateAdminUserUtils.createAllAdmins(userService, env);
+        } catch (IOException e) {
+            throw new KylinRuntimeException(e);
+        }
+        InitUserGroupUtils.initUserGroups(env);
+        UnitOfWork.doInTransactionWithRetry(() -> {
+            
ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).createMetaStoreUuidIfNotExist();
+            return null;
+        }, "", 1);
+        InitResourceGroupUtils.initResourceGroup();
+        userAclService.syncAdminUserAcl();
+    }
+
+    private void doOnProjectControlled(String project, KylinConfig 
kylinConfig, EpochManager epochManager) {
+        if (!EpochManager.getInstance().checkEpochValid(project)) {
+            log.warn("epoch:{} is invalid in project controlled", project);
+            return;
+        }
+
+        val oldScheduler = NDefaultScheduler.getInstance(project);
+
+        if (oldScheduler.hasStarted()
+                && 
epochManager.checkEpochId(oldScheduler.getContext().getEpochId(), project)) {
+            return;
+        }
+
+        // if epoch id check failed, shutdown first
+        if (oldScheduler.hasStarted()) {
+            oldScheduler.forceShutdown();
+        }
+
+        log.info("start thread of project: {}", project);
+        NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project);
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            scheduler.init(new JobEngineConfig(kylinConfig));
+            if (!scheduler.hasStarted()) {
+                throw new KylinRuntimeException("Scheduler for " + project + " 
has not been started");
+            }
+            StreamingScheduler ss = StreamingScheduler.getInstance(project);
+            ss.init();
+            if (!ss.getHasStarted().get()) {
+                throw new KylinRuntimeException("Streaming Scheduler for " + 
project + " has not been started");
+            }
+            QueryHistoryTaskScheduler qhAccelerateScheduler = 
QueryHistoryTaskScheduler.getInstance(project);
+            qhAccelerateScheduler.init();
+
+            if (!qhAccelerateScheduler.hasStarted()) {
+                throw new KylinRuntimeException(
+                        "Query history accelerate scheduler for " + project + 
" has not been started");
+            }
+            recommendationUpdateScheduler.addProject(project);
+            return 0;
+        }, project, 1);
+        scheduler.setHasFinishedTransactions(new AtomicBoolean(true));
+    }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/ProjectSerialEventBus.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/ProjectSerialEventBus.java
new file mode 100644
index 0000000000..5a2d27913d
--- /dev/null
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/ProjectSerialEventBus.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.scheduler;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kylin.common.Singletons;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+import java.util.function.Consumer;
+
+@Slf4j
+public class ProjectSerialEventBus {
+
+    public static class TimingDispatcher implements Runnable {
+
+        public static final Duration INTERVAL = Duration.ofMinutes(10L);
+
+        @Override
+        public void run() {
+            log.info("ProjectSerialEventBus.TimingDispatcher invokes 
dispatch");
+            ProjectSerialEventBus.getInstance().dispatch();
+        }
+    }
+
+    // timeout 30 minutes
+    private static final long TIMEOUT_MILLISECONDS = 30L * 60L * 1000L;
+
+    public static ProjectSerialEventBus getInstance() {
+        return Singletons.getInstance(ProjectSerialEventBus.class);
+    }
+
+    private final EventBusFactory eventBus = EventBusFactory.getInstance();
+    private final Queue<SchedulerEventNotifier> eventsQueue = new 
LinkedList<>();
+    private final Set<RunningProject> runningProjects = new HashSet<>();
+    private final Consumer<SchedulerEventNotifier> finishProjectCallback = 
event -> finishProjectAndDispatch(event.getProject());
+
+    private ProjectSerialEventBus() {}
+
+    public synchronized void postAsync(SchedulerEventNotifier event) {
+        log.info("Post event {} on ProjectSerialEventBus", event);
+        event.setCallback(finishProjectCallback);
+        eventsQueue.add(event);
+        if 
(!runningProjects.contains(RunningProject.wrapForComparison(event.getProject())))
 {
+            dispatch();
+        }
+    }
+
+    public synchronized void dispatch() {
+        // Remove expired running projects at first
+        runningProjects.removeIf(RunningProject::isExpired);
+        // Try dispatch events
+        Iterator<SchedulerEventNotifier> it = eventsQueue.iterator();
+        while (it.hasNext()) {
+            SchedulerEventNotifier e = it.next();
+            String project = e.getProject();
+            if 
(!runningProjects.contains(RunningProject.wrapForComparison(project))) {
+                log.info("ProjectSerialEventBus dispatch event: {}", e);
+                eventBus.postAsync(e);
+                runningProjects.add(RunningProject.newInstance(project));
+                it.remove();
+            }
+        }
+    }
+
+    public synchronized void finishProjectAndDispatch(String project) {
+        log.info("ProjectSerialEventBus project({}) event finished", project);
+        runningProjects.remove(RunningProject.wrapForComparison(project));
+        dispatch();
+    }
+
+    @EqualsAndHashCode
+    @ToString
+    static class RunningProject {
+
+        static RunningProject newInstance(String project) {
+            return new RunningProject(project, System.currentTimeMillis());
+        }
+
+        static RunningProject wrapForComparison(String project) {
+            return new RunningProject(project, -1L);
+        }
+
+        private final String project;
+
+        @EqualsAndHashCode.Exclude
+        private final long beginTime;
+
+        private RunningProject(String project, long beginTime) {
+            this.project = project;
+            this.beginTime = beginTime;
+        }
+
+        boolean isExpired() {
+            if (System.currentTimeMillis() - beginTime > TIMEOUT_MILLISECONDS) 
{
+                log.warn("ProjectSerialEventBus RunningProject expired: {}", 
this);
+                return true;
+            }
+            return false;
+        }
+    }
+}
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/SchedulerEventNotifier.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/SchedulerEventNotifier.java
index 9fe90833d8..ecc9c8de86 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/scheduler/SchedulerEventNotifier.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/scheduler/SchedulerEventNotifier.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.common.scheduler;
 
 import java.util.Locale;
+import java.util.function.Consumer;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -33,13 +34,21 @@ public class SchedulerEventNotifier {
 
     protected String project;
     protected String subject;
+    protected Consumer<SchedulerEventNotifier> callback;
 
     public String getEventType() {
         return this.getClass().getSimpleName();
     }
 
+    public void invokeCallbackIfExists() {
+        if (callback != null) {
+            callback.accept(this);
+        }
+    }
+
     @Override
     public String toString() {
-        return String.format(Locale.ROOT, "%s {project=%s, subject=%s}", 
getEventType(), project, subject);
+        return String.format(Locale.ROOT, "%s {project=%s, subject=%s, 
callback=%s}", getEventType(), project,
+                subject, callback);
     }
 }
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/scheduler/ProjectSerialEventBusTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/scheduler/ProjectSerialEventBusTest.java
new file mode 100644
index 0000000000..77cdd62adb
--- /dev/null
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/scheduler/ProjectSerialEventBusTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.scheduler;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.LinkedList;
+import java.util.Set;
+
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+
+@MetadataInfo
+public class ProjectSerialEventBusTest {
+
+    private ProjectSerialEventBus projectSerialEventBus;
+
+    private EventBusFactory eventBusFactory;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        projectSerialEventBus = spy(ProjectSerialEventBus.getInstance());
+        eventBusFactory = mock(EventBusFactory.class);
+        ReflectionTestUtils.setField(projectSerialEventBus, "eventBus", 
eventBusFactory);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testInnerQueueStatus() throws Exception {
+        LinkedList<SchedulerEventNotifier> innerEventsQueue = 
(LinkedList<SchedulerEventNotifier>) ReflectionTestUtils
+                .getField(projectSerialEventBus, "eventsQueue");
+        Set<ProjectSerialEventBus.RunningProject> runningProjects = 
(Set<ProjectSerialEventBus.RunningProject>) ReflectionTestUtils
+                .getField(projectSerialEventBus, "runningProjects");
+
+        projectSerialEventBus.postAsync(new 
ProjectEscapedNotifier("project1"));
+        assertEquals(1, runningProjects.size());
+        projectSerialEventBus.postAsync(new 
ProjectControlledNotifier("project1"));
+        assertEquals(1, runningProjects.size());
+        projectSerialEventBus.postAsync(new 
ProjectEscapedNotifier("project2"));
+        assertEquals(2, runningProjects.size());
+        projectSerialEventBus.postAsync(new 
ProjectControlledNotifier("project2"));
+        assertEquals(2, runningProjects.size());
+
+        verify(projectSerialEventBus, times(2)).dispatch();
+        assertEquals(2, innerEventsQueue.size());
+        assertEquals("project1", innerEventsQueue.get(0).getProject());
+        assertTrue(innerEventsQueue.get(0) instanceof 
ProjectControlledNotifier);
+        assertEquals("project2", innerEventsQueue.get(1).getProject());
+        assertTrue(innerEventsQueue.get(1) instanceof 
ProjectControlledNotifier);
+
+        projectSerialEventBus.finishProjectAndDispatch("project1");
+        assertEquals(2, runningProjects.size());
+        projectSerialEventBus.finishProjectAndDispatch("project2");
+        assertEquals(2, runningProjects.size());
+
+        assertEquals(0, innerEventsQueue.size());
+
+        projectSerialEventBus.finishProjectAndDispatch("project1");
+        assertEquals(1, runningProjects.size());
+        projectSerialEventBus.finishProjectAndDispatch("project2");
+        assertEquals(0, runningProjects.size());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testTimingDispatcherRun() {
+        LinkedList<SchedulerEventNotifier> innerEventsQueue = 
(LinkedList<SchedulerEventNotifier>) ReflectionTestUtils
+                .getField(projectSerialEventBus, "eventsQueue");
+        Set<ProjectSerialEventBus.RunningProject> runningProjects = 
(Set<ProjectSerialEventBus.RunningProject>) ReflectionTestUtils
+                .getField(projectSerialEventBus, "runningProjects");
+
+        innerEventsQueue.add(new ProjectEscapedNotifier("project1"));
+        assertEquals(1, innerEventsQueue.size());
+        assertEquals(0, runningProjects.size());
+
+        new ProjectSerialEventBus.TimingDispatcher().run();
+
+        assertEquals(0, innerEventsQueue.size());
+        assertEquals(1, runningProjects.size());
+    }
+
+    @Test
+    void testRunningProjectExpired() {
+        ProjectSerialEventBus.RunningProject rp = 
ProjectSerialEventBus.RunningProject.newInstance("project1");
+        final long DURATION_31_MINUTES_MILLIS = 31L * 60L * 1000L;
+        ReflectionTestUtils.setField(rp, "beginTime", 
System.currentTimeMillis() - DURATION_31_MINUTES_MILLIS);
+        assertTrue(rp.isExpired());
+    }
+}
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
index 623cabbc41..5f8e868886 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
@@ -54,6 +54,7 @@ import org.apache.kylin.common.scheduler.EpochStartedNotifier;
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.common.scheduler.ProjectControlledNotifier;
 import org.apache.kylin.common.scheduler.ProjectEscapedNotifier;
+import org.apache.kylin.common.scheduler.ProjectSerialEventBus;
 import org.apache.kylin.common.scheduler.SourceUsageVerifyNotifier;
 import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.NamedThreadFactory;
@@ -95,6 +96,7 @@ public class EpochManager {
     private final KylinConfig config;
     private String identity;
     private final EventBusFactory eventBusFactory;
+    private final ProjectSerialEventBus projectSerialEventBus;
     private final String serverMode;
     private final boolean epochCheckEnabled;
     private final long epochExpiredTime;
@@ -111,6 +113,7 @@ public class EpochManager {
         this.config = KylinConfig.readSystemKylinConfig();
         this.identity = EpochOrchestrator.getOwnerIdentity();
         this.eventBusFactory = EventBusFactory.getInstance();
+        this.projectSerialEventBus = ProjectSerialEventBus.getInstance();
         this.epochStore = EpochStore.getEpochStore(config);
         this.serverMode = config.getServerMode();
         this.epochCheckEnabled = config.getEpochCheckerEnabled();
@@ -358,7 +361,7 @@ public class EpochManager {
             }
 
             for (String project : escapedProjects) {
-                eventBusFactory.postAsync(new ProjectEscapedNotifier(project));
+                projectSerialEventBus.postAsync(new 
ProjectEscapedNotifier(project));
             }
 
             try (SetLogCategory ignored = new 
SetLogCategory(LogConstant.METADATA_CATEGORY)) {
@@ -379,7 +382,7 @@ public class EpochManager {
                         logger.debug("after {} controlled projects: {}", 
updateTypeName,
                             String.join(",", newControlledProjects));
                     }
-                    newControlledProjects.forEach(p -> 
eventBusFactory.postAsync(new ProjectControlledNotifier(p)));
+                    newControlledProjects.forEach(p -> 
projectSerialEventBus.postAsync(new ProjectControlledNotifier(p)));
                 }
             }
 
@@ -680,7 +683,7 @@ public class EpochManager {
     public void updateEpochWithNotifier(String epochTarget, boolean force) {
         EpochUpdateLockManager.executeEpochWithLock(epochTarget, () -> {
             if (tryUpdateEpoch(epochTarget, force)) {
-                eventBusFactory.postAsync(new 
ProjectControlledNotifier(epochTarget));
+                projectSerialEventBus.postAsync(new 
ProjectControlledNotifier(epochTarget));
             }
             return null;
         });

Reply via email to