This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2f57e05b81 [Improvement-16979] Unify PriorityDelayQueue with
AbstractDelayEventBus (#17155)
2f57e05b81 is described below
commit 2f57e05b81573ff55eae6c05c6b00cf21e69f4dc
Author: lile <[email protected]>
AuthorDate: Thu Aug 7 11:26:20 2025 +0800
[Improvement-16979] Unify PriorityDelayQueue with AbstractDelayEventBus
(#17155)
---
.../eventbus/AbstractDelayEventBus.java | 5 ++
.../dolphinscheduler/eventbus/IEventBus.java | 6 ++
.../task/dispatcher/TaskDispatchableEventBus.java} | 21 ++---
.../task/dispatcher/WorkerGroupDispatcher.java | 19 ++---
.../dispatcher/event/TaskDispatchableEvent.java | 57 ++++++++++++++
.../server/master/runner/queue/DelayEntry.java | 82 -------------------
.../queue/PriorityAndDelayBasedTaskEntry.java | 50 ------------
...eBasedTaskExecutionRunnableComparableEntry.java | 77 ------------------
.../dispatcher/TaskDispatchableEventBusTest.java} | 23 +++---
.../event/TaskDispatchableEventTest.java} | 14 +++-
...edTaskExecutionRunnableComparableEntryTest.java | 91 ----------------------
11 files changed, 110 insertions(+), 335 deletions(-)
diff --git
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEventBus.java
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEventBus.java
index 81809c546c..cdb359e7e2 100644
---
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEventBus.java
+++
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEventBus.java
@@ -42,6 +42,11 @@ public abstract class AbstractDelayEventBus<T extends
AbstractDelayEvent> implem
return Optional.ofNullable(delayEventQueue.peek());
}
+ @Override
+ public T take() throws InterruptedException {
+ return delayEventQueue.take();
+ }
+
@Override
public Optional<T> remove() {
return Optional.ofNullable(delayEventQueue.remove());
diff --git
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/IEventBus.java
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/IEventBus.java
index b2f6577a41..cd0b5d541b 100644
---
a/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/IEventBus.java
+++
b/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/IEventBus.java
@@ -54,6 +54,12 @@ public interface IEventBus<T extends IEvent> {
*/
Optional<T> poll() throws InterruptedException;
+ /**
+ * Remove the head event from the bus. This method will block if the event
bus is empty.
+ * <p> If the thread is interrupted, an {@link InterruptedException} will
be thrown.
+ */
+ T take() throws InterruptedException;
+
/**
* peek the head event from the bus. This method will not block if the
event bus is empty will return empty optional.
*/
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java
similarity index 63%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java
index cf67281323..f6fdb119b0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java
@@ -15,30 +15,33 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.queue;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
-import java.util.concurrent.DelayQueue;
+import org.apache.dolphinscheduler.eventbus.AbstractDelayEventBus;
+import
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
import lombok.SneakyThrows;
-public class PriorityDelayQueue<V extends DelayEntry> {
-
- private final DelayQueue<V> queue = new DelayQueue<>();
+public class TaskDispatchableEventBus<V extends TaskDispatchableEvent<T>, T
extends Comparable<T>>
+ extends
+ AbstractDelayEventBus<V> {
public void add(V v) {
- queue.put(v);
+ super.publish(v);
}
@SneakyThrows
public V take() {
- return queue.take();
+ return super.take();
}
+ // Only use in test
public int size() {
- return queue.size();
+ return delayEventQueue.size();
}
+ // Only use in test
public void clear() {
- queue.clear();
+ delayEventQueue.clear();
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
index ef909ed7f0..7cb5f8d472 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java
@@ -20,9 +20,8 @@ package
org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import
org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
+import
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityAndDelayBasedTaskEntry;
-import
org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue;
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
import java.util.Set;
@@ -34,7 +33,7 @@ import lombok.extern.slf4j.Slf4j;
/**
* WorkerGroupTaskDispatcher is responsible for dispatching tasks from the
task queue.
* The main responsibilities include:
- * 1. Continuously fetching tasks from the {@link PriorityDelayQueue} for
dispatch.
+ * 1. Continuously fetching tasks from the {@link TaskDispatchableEvent} for
dispatch.
* 2. Re-queuing tasks that fail to dispatch according to retry logic.
* 3. Ensuring thread safety and correct state transitions during task
processing.
*/
@@ -43,11 +42,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
private final ITaskExecutorClient taskExecutorClient;
- // TODO The current queue is flawed. When a high-priority task fails,
- // it will be delayed and will not return to the first or second position.
- // Tasks with the same priority will preempt its position.
- // If it needs to be placed at the front of the queue, the queue needs to
be re-implemented.
- private final
PriorityDelayQueue<PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable>>
workerGroupQueue;
+ private final
TaskDispatchableEventBus<TaskDispatchableEvent<ITaskExecutionRunnable>,
ITaskExecutionRunnable> workerGroupEventBus;
private final Set<Integer> waitingDispatchTaskIds;
@@ -56,7 +51,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient
taskExecutorClient) {
super("WorkerGroupTaskDispatcher-" + workerGroupName);
this.taskExecutorClient = taskExecutorClient;
- this.workerGroupQueue = new PriorityDelayQueue<>();
+ this.workerGroupEventBus = new TaskDispatchableEventBus<>();
this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
}
@@ -75,7 +70,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
@Override
public void run() {
while (runningFlag.get()) {
- PriorityAndDelayBasedTaskEntry<ITaskExecutionRunnable> taskEntry =
workerGroupQueue.take();
+ TaskDispatchableEvent<ITaskExecutionRunnable> taskEntry =
workerGroupEventBus.take();
ITaskExecutionRunnable taskExecutionRunnable = taskEntry.getData();
try (
TaskExecutorMDCUtils.MDCAutoClosable ignore =
@@ -120,7 +115,7 @@ public class WorkerGroupDispatcher extends BaseDaemonThread
{
*/
public void dispatchTask(final ITaskExecutionRunnable
taskExecutionRunnable, final long delayTimeMills) {
waitingDispatchTaskIds.add(taskExecutionRunnable.getId());
- workerGroupQueue.add(new
PriorityAndDelayBasedTaskEntry<>(delayTimeMills, taskExecutionRunnable));
+ workerGroupEventBus.add(new TaskDispatchableEvent<>(delayTimeMills,
taskExecutionRunnable));
}
public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
@@ -141,6 +136,6 @@ public class WorkerGroupDispatcher extends BaseDaemonThread
{
}
int queueSize() {
- return this.workerGroupQueue.size();
+ return this.workerGroupEventBus.size();
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEvent.java
new file mode 100644
index 0000000000..9d99e4b958
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.eventbus.AbstractDelayEvent;
+
+import java.util.concurrent.Delayed;
+
+import lombok.Getter;
+
+@Getter
+public class TaskDispatchableEvent<V extends Comparable<V>> extends
AbstractDelayEvent {
+
+ protected final V data;
+
+ public TaskDispatchableEvent(long delayTimeMills, V data) {
+ super(delayTimeMills);
+ this.data = checkNotNull(data, "data is null");
+ }
+
+ @Override
+ public int compareTo(Delayed other) {
+ if (!(other instanceof TaskDispatchableEvent)) {
+ throw new RuntimeException("The object being compared is not a
TaskReadyForDispatchEvent.");
+ }
+
+ @SuppressWarnings("unchecked")
+ final TaskDispatchableEvent<V> otherEvent = (TaskDispatchableEvent<V>)
other;
+
+ // there should compare data first for priority
+ if (data != null && otherEvent.data != null) {
+ final int compareResult = data.compareTo(otherEvent.data);
+ if (compareResult != 0) {
+ return compareResult;
+ }
+ }
+
+ return super.compareTo(other);
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
deleted file mode 100644
index da6a750261..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.queue;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Objects;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import lombok.Getter;
-
-import org.jetbrains.annotations.NotNull;
-
-public class DelayEntry<V extends Comparable<V>> implements Delayed {
-
- private final long delayTimeMills;
-
- private final long triggerTimeMills;
-
- @Getter
- private final V data;
-
- public DelayEntry(long delayTimeMills, V data) {
- this.delayTimeMills = delayTimeMills;
- this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills;
- this.data = checkNotNull(data, "data is null");
- }
-
- @Override
- public long getDelay(@NotNull TimeUnit unit) {
- long remainTimeMills = triggerTimeMills - System.currentTimeMillis();
- if (TimeUnit.MILLISECONDS.equals(unit)) {
- return remainTimeMills;
- }
- return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public int compareTo(@NotNull Delayed o) {
- DelayEntry<V> other = (DelayEntry<V>) o;
- int delayTimeMillsCompareResult = Long.compare(delayTimeMills,
other.delayTimeMills);
- if (delayTimeMillsCompareResult != 0) {
- return delayTimeMillsCompareResult;
- }
-
- if (data == null || other.data == null) {
- return 0;
- }
- return data.compareTo(other.data);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- DelayEntry<?> that = (DelayEntry<?>) o;
- return delayTimeMills == that.delayTimeMills && Objects.equals(data,
that.data);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(delayTimeMills, data);
- }
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
deleted file mode 100644
index bf762afb3e..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityAndDelayBasedTaskEntry.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.queue;
-
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import org.jetbrains.annotations.NotNull;
-
-public class PriorityAndDelayBasedTaskEntry<V extends Comparable<V>> extends
DelayEntry<V> {
-
- public PriorityAndDelayBasedTaskEntry(long delayTimeMills, V data) {
- super(delayTimeMills, data);
- }
-
- @Override
- public long getDelay(@NotNull TimeUnit unit) {
- return super.getDelay(unit);
- }
-
- @Override
- public int compareTo(@NotNull Delayed o) {
- return super.compareTo(o);
- }
-
- @Override
- public boolean equals(Object o) {
- return super.equals(o);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
deleted file mode 100644
index 9bb47667e7..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntry.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.queue;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Objects;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import lombok.Getter;
-
-import org.jetbrains.annotations.NotNull;
-
-public class
TimeBasedTaskExecutionRunnableComparableEntry<ITaskExecutionRunnable>
implements Delayed {
-
- private final long triggerTimeMills;
- private final long delayTimeMills;
-
- @Getter
- private final ITaskExecutionRunnable data;
- public TimeBasedTaskExecutionRunnableComparableEntry(long delayTimeMills,
ITaskExecutionRunnable data) {
- this.delayTimeMills = delayTimeMills;
- this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills;
- this.data = checkNotNull(data, "data is null");
- }
-
- @Override
- public long getDelay(@NotNull TimeUnit unit) {
- long remainTimeMills = triggerTimeMills - System.currentTimeMillis();
- if (TimeUnit.MILLISECONDS.equals(unit)) {
- return remainTimeMills;
- }
- return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public int compareTo(@NotNull Delayed delayed) {
- if (this == delayed) {
- return 0;
- }
- return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
delayed.getDelay(TimeUnit.MILLISECONDS));
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TimeBasedTaskExecutionRunnableComparableEntry<?> that =
(TimeBasedTaskExecutionRunnableComparableEntry<?>) o;
- return this.getDelay(TimeUnit.MILLISECONDS) ==
that.getDelay(TimeUnit.MILLISECONDS)
- && Objects.equals(data, that.getData());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(data);
- }
-}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBusTest.java
similarity index 66%
rename from
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
rename to
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBusTest.java
index 32aeda7f64..2b2206b5fa 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueueTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBusTest.java
@@ -15,41 +15,42 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.queue;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.mock;
+import
org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-public class PriorityDelayQueueTest {
+public class TaskDispatchableEventBusTest {
- private PriorityDelayQueue<PriorityAndDelayBasedTaskEntry> queue;
+ private
TaskDispatchableEventBus<TaskDispatchableEvent<ITaskExecutionRunnable>,
ITaskExecutionRunnable> queue;
private ITaskExecutionRunnable taskExecutionRunnable;
@BeforeEach
public void setUp() {
- queue = new PriorityDelayQueue<>();
+ queue = new TaskDispatchableEventBus<>();
taskExecutionRunnable = mock(ITaskExecutionRunnable.class);
}
@Test
public void testAdd() {
- queue.add(new PriorityAndDelayBasedTaskEntry(1000,
taskExecutionRunnable));
+ queue.add(new TaskDispatchableEvent<>(1000, taskExecutionRunnable));
assertEquals(1, queue.size());
- queue.add(new PriorityAndDelayBasedTaskEntry(2000,
taskExecutionRunnable));
+ queue.add(new TaskDispatchableEvent<>(2000, taskExecutionRunnable));
assertEquals(2, queue.size());
}
@Test
public void testTake() throws InterruptedException {
- queue.add(new PriorityAndDelayBasedTaskEntry(1000,
taskExecutionRunnable));
- PriorityAndDelayBasedTaskEntry entry = queue.take();
+ queue.add(new TaskDispatchableEvent<>(1000, taskExecutionRunnable));
+ TaskDispatchableEvent<ITaskExecutionRunnable> entry = queue.take();
assertNotNull(entry);
assertEquals(0, queue.size());
@@ -59,14 +60,14 @@ public class PriorityDelayQueueTest {
public void testSize() {
assertEquals(0, queue.size());
- queue.add(new PriorityAndDelayBasedTaskEntry(1000,
taskExecutionRunnable));
+ queue.add(new TaskDispatchableEvent<>(1000, taskExecutionRunnable));
assertEquals(1, queue.size());
}
@Test
public void testClear() {
- queue.add(new PriorityAndDelayBasedTaskEntry(1000,
taskExecutionRunnable));
- queue.add(new PriorityAndDelayBasedTaskEntry(2000,
taskExecutionRunnable));
+ queue.add(new TaskDispatchableEvent<>(1000, taskExecutionRunnable));
+ queue.add(new TaskDispatchableEvent<>(2000, taskExecutionRunnable));
assertEquals(2, queue.size());
queue.clear();
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEventTest.java
similarity index 68%
rename from
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
rename to
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEventTest.java
index 00cf782e18..315ab0df30 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/event/TaskDispatchableEventTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.queue;
+package org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event;
import static com.google.common.truth.Truth.assertThat;
@@ -23,13 +23,21 @@ import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
-class DelayEntryTest {
+class TaskDispatchableEventTest {
@Test
void getDelay() {
- DelayEntry<String> delayEntry = new DelayEntry<>(5_000L, "Item");
+ TaskDispatchableEvent<String> delayEntry = new
TaskDispatchableEvent<>(5_000L, "Item");
assertThat(delayEntry.getDelay(TimeUnit.NANOSECONDS))
.isWithin(TimeUnit.NANOSECONDS.convert(500,
TimeUnit.MILLISECONDS))
.of(TimeUnit.NANOSECONDS.convert(5_000L,
TimeUnit.MILLISECONDS));
}
+
+ @Test
+ void priorityCompare() {
+ TaskDispatchableEvent<String> highPriorityEntry =
+ new TaskDispatchableEvent<>(15_000L, "1_HIGH");
+ TaskDispatchableEvent<String> lowPriorityEntry = new
TaskDispatchableEvent<>(5_000L, "3_LOW");
+ assertThat(highPriorityEntry.compareTo(lowPriorityEntry) < 0).isTrue();
+ }
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
deleted file mode 100644
index 2d4292137b..0000000000
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/TimeBasedTaskExecutionRunnableComparableEntryTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.queue;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-class TimeBasedTaskExecutionRunnableComparableEntryTest {
-
- private static final long TEST_DELAY_MILLS = 1000L;
- private final String testData = "testData";
- private TimeBasedTaskExecutionRunnableComparableEntry<String> entry;
-
- @BeforeEach
- public void setUp() {
- entry = new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
- }
-
- @Test
- void constructor_NullData_ThrowsNullPointerException() {
- try {
- new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, null);
- fail("Expected NullPointerException to be thrown");
- } catch (NullPointerException e) {
- assertEquals("data is null", e.getMessage());
- }
- }
-
- @Test
- void getDelay_BeforeTriggerTime_ReturnsPositive() {
- entry = new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
- Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(
- () -> assertTrue(entry.getDelay(TimeUnit.MILLISECONDS) > 0));
- }
-
- @Test
- void getDelay_AtTriggerTime_ReturnsZero() {
- entry = new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
- Awaitility.await().atLeast(1000, TimeUnit.MILLISECONDS)
- .with().pollInterval(1000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> {
- long remainTime =
entry.getDelay(TimeUnit.MILLISECONDS);
- // The allowable error is +-200
- System.out.println("remainTime:" + remainTime);
- assertTrue(Math.abs(remainTime) <= 200);
- });
- }
-
- @Test
- void getDelay_AfterTriggerTime_ReturnsNegative() {
- entry = new
TimeBasedTaskExecutionRunnableComparableEntry<>(TEST_DELAY_MILLS, testData);
- Awaitility.await().atMost(1500, TimeUnit.MILLISECONDS).untilAsserted(
- () -> assertTrue(entry.getDelay(TimeUnit.MILLISECONDS) < 0));
- }
-
- @Test
- void getDelay_DifferentTimeUnits_ReturnsCorrectValues() {
- long remainTimeMillis = entry.getDelay(TimeUnit.MILLISECONDS);
- long remainTimeSeconds = entry.getDelay(TimeUnit.SECONDS);
-
- assertTrue(remainTimeSeconds <= remainTimeMillis / 1000);
- }
-
- @Test
- void compareTo_SameObject_ReturnsZero() {
- assertEquals(0, entry.compareTo(entry));
- }
-}