This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2f57e05b81 [Improvement-16979] Unify PriorityDelayQueue with 
AbstractDelayEventBus (#17155)
2f57e05b81 is described below

commit 2f57e05b81573ff55eae6c05c6b00cf21e69f4dc
Author: lile <[email protected]>
AuthorDate: Thu Aug 7 11:26:20 2025 +0800

    [Improvement-16979] Unify PriorityDelayQueue with AbstractDelayEventBus 
(#17155)
---
 .../eventbus/AbstractDelayEventBus.java            |  5 ++
 .../dolphinscheduler/eventbus/IEventBus.java       |  6 ++
 .../task/dispatcher/TaskDispatchableEventBus.java} | 21 ++---
 .../task/dispatcher/WorkerGroupDispatcher.java     | 19 ++---
 .../dispatcher/event/TaskDispatchableEvent.java    | 57 ++++++++++++++
 .../server/master/runner/queue/DelayEntry.java     | 82 -------------------
 .../queue/PriorityAndDelayBasedTaskEntry.java      | 50 ------------
 ...eBasedTaskExecutionRunnableComparableEntry.java | 77 ------------------
 .../dispatcher/TaskDispatchableEventBusTest.java}  | 23 +++---
 .../event/TaskDispatchableEventTest.java}          | 14 +++-
 ...edTaskExecutionRunnableComparableEntryTest.java | 91 ----------------------
 11 files changed, 110 insertions(+), 335 deletions(-)

diff --git 
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEventBus.java
 
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEventBus.java
index 81809c546c..cdb359e7e2 100644
--- 
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEventBus.java
+++ 
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEventBus.java
@@ -42,6 +42,11 @@ public abstract class AbstractDelayEventBus<T extends 
AbstractDelayEvent> implem
         return Optional.ofNullable(delayEventQueue.peek());
     }
 
+    @Override
+    public T take() throws InterruptedException {
+        return delayEventQueue.take();
+    }
+
     @Override
     public Optional<T> remove() {
         return Optional.ofNullable(delayEventQueue.remove());
diff --git 
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/IEventBus.java
 
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/IEventBus.java
index b2f6577a41..cd0b5d541b 100644
--- 
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/IEventBus.java
+++ 
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/IEventBus.java
@@ -54,6 +54,12 @@ public interface IEventBus<T extends IEvent> {
      */
     Optional<T> poll() throws InterruptedException;
 
+    /**
+     * Remove the head event from the bus. This method will block if the event 
bus is empty.
+     * <p> If the thread is interrupted, an {@link InterruptedException} will 
be thrown.
+     */
+    T take() throws InterruptedException;
+
     /**
      * peek the head event from the bus. This method will not block if the 
event bus is empty will return empty optional.
      */
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java
similarity index 63%
rename from 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
rename to 
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java
index cf67281323..f6fdb119b0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java
@@ -15,30 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.queue;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
 
-import java.util.concurrent.DelayQueue;
+import org.apache.dolphinscheduler.eventbus.AbstractDelayEventBus;
+import 
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
 
 import lombok.SneakyThrows;
 
-public class PriorityDelayQueue<V extends DelayEntry> {
-
-    private final DelayQueue<V> queue = new DelayQueue<>();
+public class TaskDispatchableEventBus<V extends TaskDispatchableEvent<T>, T 
extends Comparable<T>>
+        extends
+            AbstractDelayEventBus<V> {
 
     public void add(V v) {
-        queue.put(v);
+        super.publish(v);
     }
 
     @SneakyThrows
     public V take() {
-        return queue.take();
+        return super.take();
     }
 
+    // Only use in test
     public int size() {
-        return queue.size();
+        return delayEventQueue.size();
     }
 
+    // Only use in test
     public void clear() {
-        queue.clear();
+        delayEventQueue.clear();
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
index ef909ed7f0..7cb5f8d472 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
@@ -20,9 +20,8 @@ package 
org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import 
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import 
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
-import 
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
 import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
 
 import java.util.Set;
@@ -34,7 +33,7 @@ import lombok.extern.slf4j.Slf4j;
 /**
  * WorkerGroupTaskDispatcher is responsible for dispatching tasks from the 
task queue.
  * The main responsibilities include:
- * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for 
dispatch.
+ * 1. Continuously fetching tasks from the {@link TaskDispatchableEvent} for 
dispatch.
  * 2. Re-queuing tasks that fail to dispatch according to retry logic.
  * 3. Ensuring thread safety and correct state transitions during task 
processing.
  */
@@ -43,11 +42,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
 
     private final ITaskExecutorClient taskExecutorClient;
 
-    // TODO The current queue is flawed. When a high-priority task fails,
-    // it will be delayed and will not return to the first or second position.
-    // Tasks with the same priority will preempt its position.
-    // If it needs to be placed at the front of the queue, the queue needs to 
be re-implemented.
-    private final 
PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>> 
workerGroupQueue;
+    private final 
TaskDispatchableEventBus<TaskDispatchableEvent<ITaskExecutionRunnable>, 
ITaskExecutionRunnable> workerGroupEventBus;
 
     private final Set<Integer> waitingDispatchTaskIds;
 
@@ -56,7 +51,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
     public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient 
taskExecutorClient) {
         super("WorkerGroupTaskDispatcher-" + workerGroupName);
         this.taskExecutorClient = taskExecutorClient;
-        this.workerGroupQueue = new PriorityDelayQueue<>();
+        this.workerGroupEventBus = new TaskDispatchableEventBus<>();
         this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
         log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
     }
@@ -75,7 +70,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
     @Override
     public void run() {
         while (runningFlag.get()) {
-            PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry = 
workerGroupQueue.take();
+            TaskDispatchableEvent<ITaskExecutionRunnable> taskEntry = 
workerGroupEventBus.take();
             ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
             try (
                     TaskExecutorMDCUtils.MDCAutoClosable ignore =
@@ -120,7 +115,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread 
{
      */
     public void dispatchTask(final ITaskExecutionRunnable 
taskExecutionRunnable, final long delayTimeMills) {
         waitingDispatchTaskIds.add(taskExecutionRunnable.getId());
-        workerGroupQueue.add(new 
PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
+        workerGroupEventBus.add(new TaskDispatchableEvent<>(delayTimeMills, 
taskExecutionRunnable));
     }
 
     public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
@@ -141,6 +136,6 @@ public class WorkerGroupDispatcher extends BaseDaemonThread 
{
     }
 
     int queueSize() {
-        return this.workerGroupQueue.size();
+        return this.workerGroupEventBus.size();
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEvent.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEvent.java
new file mode 100644
index 0000000000..9d99e4b958
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.eventbus.AbstractDelayEvent;
+
+import java.util.concurrent.Delayed;
+
+import lombok.Getter;
+
+@Getter
+public class TaskDispatchableEvent<V extends Comparable<V>> extends 
AbstractDelayEvent {
+
+    protected final V data;
+
+    public TaskDispatchableEvent(long delayTimeMills, V data) {
+        super(delayTimeMills);
+        this.data = checkNotNull(data, "data is null");
+    }
+
+    @Override
+    public int compareTo(Delayed other) {
+        if (!(other instanceof TaskDispatchableEvent)) {
+            throw new RuntimeException("The object being compared is not a 
TaskReadyForDispatchEvent.");
+        }
+
+        @SuppressWarnings("unchecked")
+        final TaskDispatchableEvent<V> otherEvent = (TaskDispatchableEvent<V>) 
other;
+
+        // there should compare data first for priority
+        if (data != null && otherEvent.data != null) {
+            final int compareResult = data.compareTo(otherEvent.data);
+            if (compareResult != 0) {
+                return compareResult;
+            }
+        }
+
+        return super.compareTo(other);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
deleted file mode 100644
index da6a750261..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.queue;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Objects;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import lombok.Getter;
-
-import org.jetbrains.annotations.NotNull;
-
-public class DelayEntry<V extends Comparable<V>> implements Delayed {
-
-    private final long delayTimeMills;
-
-    private final long triggerTimeMills;
-
-    @Getter
-    private final V data;
-
-    public DelayEntry(long delayTimeMills, V data) {
-        this.delayTimeMills = delayTimeMills;
-        this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills;
-        this.data = checkNotNull(data, "data is null");
-    }
-
-    @Override
-    public long getDelay(@NotNull TimeUnit unit) {
-        long remainTimeMills = triggerTimeMills - System.currentTimeMillis();
-        if (TimeUnit.MILLISECONDS.equals(unit)) {
-            return remainTimeMills;
-        }
-        return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public int compareTo(@NotNull Delayed o) {
-        DelayEntry<V> other = (DelayEntry<V>) o;
-        int delayTimeMillsCompareResult = Long.compare(delayTimeMills, 
other.delayTimeMills);
-        if (delayTimeMillsCompareResult != 0) {
-            return delayTimeMillsCompareResult;
-        }
-
-        if (data == null || other.data == null) {
-            return 0;
-        }
-        return data.compareTo(other.data);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-        DelayEntry<?> that = (DelayEntry<?>) o;
-        return delayTimeMills == that.delayTimeMills && Objects.equals(data, 
that.data);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(delayTimeMills, data);
-    }
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
deleted file mode 100644
index bf762afb3e..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.queue;
-
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import org.jetbrains.annotations.NotNull;
-
-public class PriorityAndDelayBasedTaskEntry<V extends Comparable<V>> extends 
DelayEntry<V> {
-
-    public PriorityAndDelayBasedTaskEntry(long delayTimeMills, V data) {
-        super(delayTimeMills, data);
-    }
-
-    @Override
-    public long getDelay(@NotNull TimeUnit unit) {
-        return super.getDelay(unit);
-    }
-
-    @Override
-    public int compareTo(@NotNull Delayed o) {
-        return super.compareTo(o);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        return super.equals(o);
-    }
-
-    @Override
-    public int hashCode() {
-        return super.hashCode();
-    }
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
deleted file mode 100644
index 9bb47667e7..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.queue;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Objects;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import lombok.Getter;
-
-import org.jetbrains.annotations.NotNull;
-
-public class 
TimeBasedTaskExecutionRunnableComparableEntry<ITaskExecutionRunnable> 
implements Delayed {
-
-    private final long triggerTimeMills;
-    private final long delayTimeMills;
-
-    @Getter
-    private final ITaskExecutionRunnable data;
-    public TimeBasedTaskExecutionRunnableComparableEntry(long delayTimeMills, 
ITaskExecutionRunnable data) {
-        this.delayTimeMills = delayTimeMills;
-        this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills;
-        this.data = checkNotNull(data, "data is null");
-    }
-
-    @Override
-    public long getDelay(@NotNull TimeUnit unit) {
-        long remainTimeMills = triggerTimeMills - System.currentTimeMillis();
-        if (TimeUnit.MILLISECONDS.equals(unit)) {
-            return remainTimeMills;
-        }
-        return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public int compareTo(@NotNull Delayed delayed) {
-        if (this == delayed) {
-            return 0;
-        }
-        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), 
delayed.getDelay(TimeUnit.MILLISECONDS));
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        TimeBasedTaskExecutionRunnableComparableEntry<?> that = 
(TimeBasedTaskExecutionRunnableComparableEntry<?>) o;
-        return this.getDelay(TimeUnit.MILLISECONDS) == 
that.getDelay(TimeUnit.MILLISECONDS)
-                && Objects.equals(data, that.getData());
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(data);
-    }
-}
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBusTest.java
similarity index 66%
rename from 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
rename to 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBusTest.java
index 32aeda7f64..2b2206b5fa 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBusTest.java
@@ -15,41 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.queue;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.Mockito.mock;
 
+import 
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
 import 
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-public class PriorityDelayQueueTest {
+public class TaskDispatchableEventBusTest {
 
-    private PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> queue;
+    private 
TaskDispatchableEventBus<TaskDispatchableEvent<ITaskExecutionRunnable>, 
ITaskExecutionRunnable> queue;
     private ITaskExecutionRunnable taskExecutionRunnable;
 
     @BeforeEach
     public void setUp() {
-        queue = new PriorityDelayQueue<>();
+        queue = new TaskDispatchableEventBus<>();
         taskExecutionRunnable = mock(ITaskExecutionRunnable.class);
     }
 
     @Test
     public void testAdd() {
-        queue.add(new PriorityAndDelayBasedTaskEntry(1000, 
taskExecutionRunnable));
+        queue.add(new TaskDispatchableEvent<>(1000, taskExecutionRunnable));
         assertEquals(1, queue.size());
 
-        queue.add(new PriorityAndDelayBasedTaskEntry(2000, 
taskExecutionRunnable));
+        queue.add(new TaskDispatchableEvent<>(2000, taskExecutionRunnable));
         assertEquals(2, queue.size());
     }
 
     @Test
     public void testTake() throws InterruptedException {
-        queue.add(new PriorityAndDelayBasedTaskEntry(1000, 
taskExecutionRunnable));
-        PriorityAndDelayBasedTaskEntry entry = queue.take();
+        queue.add(new TaskDispatchableEvent<>(1000, taskExecutionRunnable));
+        TaskDispatchableEvent<ITaskExecutionRunnable> entry = queue.take();
         assertNotNull(entry);
         assertEquals(0, queue.size());
 
@@ -59,14 +60,14 @@ public class PriorityDelayQueueTest {
     public void testSize() {
         assertEquals(0, queue.size());
 
-        queue.add(new PriorityAndDelayBasedTaskEntry(1000, 
taskExecutionRunnable));
+        queue.add(new TaskDispatchableEvent<>(1000, taskExecutionRunnable));
         assertEquals(1, queue.size());
     }
 
     @Test
     public void testClear() {
-        queue.add(new PriorityAndDelayBasedTaskEntry(1000, 
taskExecutionRunnable));
-        queue.add(new PriorityAndDelayBasedTaskEntry(2000, 
taskExecutionRunnable));
+        queue.add(new TaskDispatchableEvent<>(1000, taskExecutionRunnable));
+        queue.add(new TaskDispatchableEvent<>(2000, taskExecutionRunnable));
         assertEquals(2, queue.size());
 
         queue.clear();
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEventTest.java
similarity index 68%
rename from 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
rename to 
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEventTest.java
index 00cf782e18..315ab0df30 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEventTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.runner.queue;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event;
 
 import static com.google.common.truth.Truth.assertThat;
 
@@ -23,13 +23,21 @@ import java.util.concurrent.TimeUnit;
 
 import org.junit.jupiter.api.Test;
 
-class DelayEntryTest {
+class TaskDispatchableEventTest {
 
     @Test
     void getDelay() {
-        DelayEntry<String> delayEntry = new DelayEntry<>(5_000L, "Item");
+        TaskDispatchableEvent<String> delayEntry = new 
TaskDispatchableEvent<>(5_000L, "Item");
         assertThat(delayEntry.getDelay(TimeUnit.NANOSECONDS))
                 .isWithin(TimeUnit.NANOSECONDS.convert(500, 
TimeUnit.MILLISECONDS))
                 .of(TimeUnit.NANOSECONDS.convert(5_000L, 
TimeUnit.MILLISECONDS));
     }
+
+    @Test
+    void priorityCompare() {
+        TaskDispatchableEvent<String> highPriorityEntry =
+                new TaskDispatchableEvent<>(15_000L, "1_HIGH");
+        TaskDispatchableEvent<String> lowPriorityEntry = new 
TaskDispatchableEvent<>(5_000L, "3_LOW");
+        assertThat(highPriorityEntry.compareTo(lowPriorityEntry) < 0).isTrue();
+    }
 }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
deleted file mode 100644
index 2d4292137b..0000000000
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.queue;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-class TimeBasedTaskExecutionRunnableComparableEntryTest {
-
-    private static final long TEST_DELAY_MILLS = 1000L;
-    private final String testData = "testData";
-    private TimeBasedTaskExecutionRunnableComparableEntry<String> entry;
-
-    @BeforeEach
-    public void setUp() {
-        entry = new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
-    }
-
-    @Test
-    void constructor_NullData_ThrowsNullPointerException() {
-        try {
-            new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, null);
-            fail("Expected NullPointerException to be thrown");
-        } catch (NullPointerException e) {
-            assertEquals("data is null", e.getMessage());
-        }
-    }
-
-    @Test
-    void getDelay_BeforeTriggerTime_ReturnsPositive() {
-        entry = new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
-        Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(
-                () -> assertTrue(entry.getDelay(TimeUnit.MILLISECONDS) > 0));
-    }
-
-    @Test
-    void getDelay_AtTriggerTime_ReturnsZero() {
-        entry = new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
-        Awaitility.await().atLeast(1000, TimeUnit.MILLISECONDS)
-                .with().pollInterval(1000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> {
-                            long remainTime = 
entry.getDelay(TimeUnit.MILLISECONDS);
-                            // The allowable error is +-200
-                            System.out.println("remainTime:" + remainTime);
-                            assertTrue(Math.abs(remainTime) <= 200);
-                        });
-    }
-
-    @Test
-    void getDelay_AfterTriggerTime_ReturnsNegative() {
-        entry = new 
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
-        Awaitility.await().atMost(1500, TimeUnit.MILLISECONDS).untilAsserted(
-                () -> assertTrue(entry.getDelay(TimeUnit.MILLISECONDS) < 0));
-    }
-
-    @Test
-    void getDelay_DifferentTimeUnits_ReturnsCorrectValues() {
-        long remainTimeMillis = entry.getDelay(TimeUnit.MILLISECONDS);
-        long remainTimeSeconds = entry.getDelay(TimeUnit.SECONDS);
-
-        assertTrue(remainTimeSeconds <= remainTimeMillis / 1000);
-    }
-
-    @Test
-    void compareTo_SameObject_ReturnsZero() {
-        assertEquals(0, entry.compareTo(entry));
-    }
-}

Reply via email to