This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 363e4ae6dd4 KAFKA-20297 move Scheduler from client common to trogdor
(#21753)
363e4ae6dd4 is described below
commit 363e4ae6dd43c98810425bd0e4f791aee9325563
Author: Chia-Yi Chiu <[email protected]>
AuthorDate: Wed Mar 18 20:24:52 2026 +0900
KAFKA-20297 move Scheduler from client common to trogdor (#21753)
**Summary** This PR refactors the Scheduler utility and its
implementations by moving them from the `clients` module to the
`trogdor` module. It also ensures that client-side tests that previously
relied on MockScheduler remain functional by implementing a local
version where needed.
**Key Changes** - Package Relocation: Moved `Scheduler`,
`SystemScheduler`, and `MockScheduler` from
`org.apache.kafka.common.utils` to
`org.apache.kafka.trogdor.common`. - Trogdor Updates: Updated all
references in the trogdor module (including `Agent`, `Coordinator`,
`TaskManager`, and `WorkerManager`) to point to the new location
of the Scheduler interface and its implementations. - Client Test
Stability: Added a private `MockScheduler` inner class to
`ExpiringCredentialRefreshingLoginTest.java`. This replaces the
dependency on the global MockScheduler that was moved to Trogdor,
keeping the clients module tests decoupled from trogdor. - Cleaned
Up Imports: Removed unused imports and updated package declarations
across 13 files to reflect the architectural shift.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/common/utils/SystemScheduler.java | 43 ----------------------
.../ExpiringCredentialRefreshingLoginTest.java | 36 +++++++++++++++++-
.../java/org/apache/kafka/trogdor/agent/Agent.java | 2 +-
.../apache/kafka/trogdor/agent/WorkerManager.java | 2 +-
.../apache/kafka/trogdor/basic/BasicPlatform.java | 2 +-
.../org/apache/kafka/trogdor/common/Platform.java | 6 ---
.../apache/kafka/trogdor/common}/Scheduler.java | 17 ++++++++-
.../kafka/trogdor/coordinator/Coordinator.java | 2 +-
.../kafka/trogdor/coordinator/TaskManager.java | 2 +-
.../org/apache/kafka/trogdor/agent/AgentTest.java | 4 +-
.../kafka/trogdor/common/MiniTrogdorCluster.java | 1 -
.../kafka/trogdor/common}/MockScheduler.java | 4 +-
.../kafka/trogdor/coordinator/CoordinatorTest.java | 4 +-
13 files changed, 62 insertions(+), 63 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java
b/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java
deleted file mode 100644
index c8c1148376a..00000000000
--- a/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A scheduler implementation that uses the system clock.
- *
- * Use Scheduler.SYSTEM instead of constructing an instance of this class.
- */
-public class SystemScheduler implements Scheduler {
- SystemScheduler() {
- }
-
- @Override
- public Time time() {
- return Time.SYSTEM;
- }
-
- @Override
- public <T> Future<T> schedule(final ScheduledExecutorService executor,
- final Callable<T> callable, long delayMs) {
- return executor.schedule(callable, delayMs, TimeUnit.MILLISECONDS);
- }
-}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
index dc94181468c..e8716935059 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import
org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.LoginContextFactory;
-import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -35,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.TreeMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -284,6 +284,40 @@ public class ExpiringCredentialRefreshingLoginTest {
}
}
+ /*
+ * */
+ private static class MockScheduler implements MockTime.Listener {
+
+ private final MockTime time;
+ private final Map<Long, List<KafkaFutureImpl<Long>>> waiters = new
TreeMap<>();
+
+ public MockScheduler(MockTime time) {
+ this.time = time;
+ time.addListener(this);
+ }
+
+ @Override
+ public synchronized void onTimeUpdated() {
+ long timeMs = time.milliseconds();
+ var iterator = waiters.entrySet().iterator();
+ while (iterator.hasNext()) {
+ var entry = iterator.next();
+ if (entry.getKey() > timeMs) break;
+ entry.getValue().forEach(future -> future.complete(timeMs));
+ iterator.remove();
+ }
+ }
+
+ public synchronized void addWaiter(long delayMs, KafkaFutureImpl<Long>
waiter) {
+ long timeMs = time.milliseconds();
+ if (delayMs <= 0) {
+ waiter.complete(timeMs);
+ } else {
+ waiters.computeIfAbsent(timeMs + delayMs, k -> new
ArrayList<>()).add(waiter);
+ }
+ }
+ }
+
@Test
public void testRefresh() throws Exception {
for (int numExpectedRefreshes : new int[] {0, 1, 2}) {
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 0ef3c5e5745..c2d13e63308 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -19,11 +19,11 @@ package org.apache.kafka.trogdor.agent;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index b3ba8263db9..4c7fdd6b874 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -20,11 +20,11 @@ package org.apache.kafka.trogdor.agent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
index 1968be27096..cd0d39dabef 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
@@ -17,10 +17,10 @@
package org.apache.kafka.trogdor.basic;
-import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Shell;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
import org.apache.kafka.trogdor.common.Topology;
import com.fasterxml.jackson.databind.JsonNode;
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/common/Platform.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/common/Platform.java
index defed06c323..1df018ee7ea 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/common/Platform.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/Platform.java
@@ -17,7 +17,6 @@
package org.apache.kafka.trogdor.common;
-import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Utils;
import com.fasterxml.jackson.databind.JsonNode;
@@ -34,11 +33,6 @@ public interface Platform {
public static final String TROGDOR_COORDINATOR_PORT =
"trogdor.coordinator.port";
- public static final String TROGDOR_COORDINATOR_HEARTBEAT_MS =
- "trogdor.coordinator.heartbeat.ms";
-
- public static final int TROGDOR_COORDINATOR_HEARTBEAT_MS_DEFAULT =
60000;
-
public static Platform parse(String curNodeName, String path) throws
Exception {
JsonNode root = JsonUtil.JSON_SERDE.readTree(new File(path));
JsonNode platformNode = root.get("platform");
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/common/Scheduler.java
similarity index 79%
rename from clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java
rename to trogdor/src/main/java/org/apache/kafka/trogdor/common/Scheduler.java
index a8ada655b3e..e90782f4a9e 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/Scheduler.java
@@ -14,11 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.utils;
+package org.apache.kafka.trogdor.common;
+
+import org.apache.kafka.common.utils.Time;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* An interface for scheduling tasks for the future.
@@ -26,7 +29,17 @@ import java.util.concurrent.ScheduledExecutorService;
* Implementations of this class should be thread-safe.
*/
public interface Scheduler {
- Scheduler SYSTEM = new SystemScheduler();
+ Scheduler SYSTEM = new Scheduler() {
+ @Override
+ public Time time() {
+ return Time.SYSTEM;
+ }
+
+ @Override
+ public <T> Future<T> schedule(ScheduledExecutorService executor,
Callable<T> callable, long delayMs) {
+ return executor.schedule(callable, delayMs, TimeUnit.MILLISECONDS);
+ }
+ };
/**
* Get the timekeeper associated with this scheduler.
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index b39969e7a97..80eb929052c 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -18,10 +18,10 @@
package org.apache.kafka.trogdor.coordinator;
import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 6b10de71d48..d8e8627a9ee 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -19,13 +19,13 @@ package org.apache.kafka.trogdor.coordinator;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InvalidRequestException;
-import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
diff --git
a/trogdor/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
b/trogdor/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index e21059823f3..3780a8fd36d 100644
--- a/trogdor/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/trogdor/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -17,9 +17,7 @@
package org.apache.kafka.trogdor.agent;
-import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.basic.BasicNode;
@@ -28,8 +26,10 @@ import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.common.ExpectedTasks;
import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.MockScheduler;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec;
import org.apache.kafka.trogdor.fault.Kibosh;
import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile;
diff --git
a/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index e3cc7884d68..ec5f7b68cdf 100644
---
a/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++
b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -17,7 +17,6 @@
package org.apache.kafka.trogdor.common;
-import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.trogdor.agent.Agent;
import org.apache.kafka.trogdor.agent.AgentClient;
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MockScheduler.java
similarity index 96%
rename from
clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
rename to
trogdor/src/test/java/org/apache/kafka/trogdor/common/MockScheduler.java
index 882318ec003..c7ad383e064 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MockScheduler.java
@@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.utils;
+package org.apache.kafka.trogdor.common;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index ee5f143cd36..866c8cb48c0 100644
---
a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++
b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -17,15 +17,15 @@
package org.apache.kafka.trogdor.coordinator;
-import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.CapturingCommandRunner;
import org.apache.kafka.trogdor.common.ExpectedTasks;
import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
+import org.apache.kafka.trogdor.common.MockScheduler;
+import org.apache.kafka.trogdor.common.Scheduler;
import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;