This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 363e4ae6dd4 KAFKA-20297 move Scheduler from client common to trogdor 
(#21753)
363e4ae6dd4 is described below

commit 363e4ae6dd43c98810425bd0e4f791aee9325563
Author: Chia-Yi Chiu <[email protected]>
AuthorDate: Wed Mar 18 20:24:52 2026 +0900

    KAFKA-20297 move Scheduler from client common to trogdor (#21753)
    
    **Summary**    This PR refactors the Scheduler utility and its
    implementations by moving them from the `clients` module to the
    `trogdor` module. It also ensures that client-side tests that previously
    relied on MockScheduler remain    functional by implementing a local
    version where needed.
    
      **Key Changes**     - Package Relocation: Moved `Scheduler`,
    `SystemScheduler`, and `MockScheduler` from
    `org.apache.kafka.common.utils`       to
    `org.apache.kafka.trogdor.common`.     - Trogdor Updates: Updated all
    references in the trogdor module (including `Agent`, `Coordinator`,
    `TaskManager`,       and `WorkerManager`) to point to the new location
    of the Scheduler interface and its implementations.     - Client Test
    Stability: Added a private `MockScheduler` inner class to
    `ExpiringCredentialRefreshingLoginTest.java`. This replaces the
    dependency on the global MockScheduler that       was moved to Trogdor,
    keeping the clients module tests decoupled from trogdor.     - Cleaned
    Up Imports: Removed unused imports and updated package declarations
    across 13 files to reflect the       architectural shift.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/common/utils/SystemScheduler.java | 43 ----------------------
 .../ExpiringCredentialRefreshingLoginTest.java     | 36 +++++++++++++++++-
 .../java/org/apache/kafka/trogdor/agent/Agent.java |  2 +-
 .../apache/kafka/trogdor/agent/WorkerManager.java  |  2 +-
 .../apache/kafka/trogdor/basic/BasicPlatform.java  |  2 +-
 .../org/apache/kafka/trogdor/common/Platform.java  |  6 ---
 .../apache/kafka/trogdor/common}/Scheduler.java    | 17 ++++++++-
 .../kafka/trogdor/coordinator/Coordinator.java     |  2 +-
 .../kafka/trogdor/coordinator/TaskManager.java     |  2 +-
 .../org/apache/kafka/trogdor/agent/AgentTest.java  |  4 +-
 .../kafka/trogdor/common/MiniTrogdorCluster.java   |  1 -
 .../kafka/trogdor/common}/MockScheduler.java       |  4 +-
 .../kafka/trogdor/coordinator/CoordinatorTest.java |  4 +-
 13 files changed, 62 insertions(+), 63 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java 
b/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java
deleted file mode 100644
index c8c1148376a..00000000000
--- a/clients/src/main/java/org/apache/kafka/common/utils/SystemScheduler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A scheduler implementation that uses the system clock.
- *
- * Use Scheduler.SYSTEM instead of constructing an instance of this class.
- */
-public class SystemScheduler implements Scheduler {
-    SystemScheduler() {
-    }
-
-    @Override
-    public Time time() {
-        return Time.SYSTEM;
-    }
-
-    @Override
-    public <T> Future<T> schedule(final ScheduledExecutorService executor,
-                                  final Callable<T> callable, long delayMs) {
-        return executor.schedule(callable, delayMs, TimeUnit.MILLISECONDS);
-    }
-}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
index dc94181468c..e8716935059 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import 
org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.LoginContextFactory;
-import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 
@@ -35,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.TreeMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -284,6 +284,40 @@ public class ExpiringCredentialRefreshingLoginTest {
         }
     }
 
+    /*
+    * */
+    private static class MockScheduler implements MockTime.Listener {
+
+        private final MockTime time;
+        private final Map<Long, List<KafkaFutureImpl<Long>>> waiters = new 
TreeMap<>();
+
+        public MockScheduler(MockTime time) {
+            this.time = time;
+            time.addListener(this);
+        }
+
+        @Override
+        public synchronized void onTimeUpdated() {
+            long timeMs = time.milliseconds();
+            var iterator = waiters.entrySet().iterator();
+            while (iterator.hasNext()) {
+                var entry = iterator.next();
+                if (entry.getKey() > timeMs) break;
+                entry.getValue().forEach(future -> future.complete(timeMs));
+                iterator.remove();
+            }
+        }
+
+        public synchronized void addWaiter(long delayMs, KafkaFutureImpl<Long> 
waiter) {
+            long timeMs = time.milliseconds();
+            if (delayMs <= 0) {
+                waiter.complete(timeMs);
+            } else {
+                waiters.computeIfAbsent(timeMs + delayMs, k -> new 
ArrayList<>()).add(waiter);
+            }
+        }
+    }
+
     @Test
     public void testRefresh() throws Exception {
         for (int numExpectedRefreshes : new int[] {0, 1, 2}) {
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 0ef3c5e5745..c2d13e63308 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -19,11 +19,11 @@ package org.apache.kafka.trogdor.agent;
 
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
 import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index b3ba8263db9..4c7fdd6b874 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -20,11 +20,11 @@ package org.apache.kafka.trogdor.agent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
 import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
index 1968be27096..cd0d39dabef 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
@@ -17,10 +17,10 @@
 
 package org.apache.kafka.trogdor.basic;
 
-import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Shell;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
 import org.apache.kafka.trogdor.common.Topology;
 
 import com.fasterxml.jackson.databind.JsonNode;
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/common/Platform.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/common/Platform.java
index defed06c323..1df018ee7ea 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/common/Platform.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/Platform.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.trogdor.common;
 
-import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Utils;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -34,11 +33,6 @@ public interface Platform {
 
         public static final String TROGDOR_COORDINATOR_PORT = 
"trogdor.coordinator.port";
 
-        public static final String TROGDOR_COORDINATOR_HEARTBEAT_MS =
-            "trogdor.coordinator.heartbeat.ms";
-
-        public static final int TROGDOR_COORDINATOR_HEARTBEAT_MS_DEFAULT = 
60000;
-
         public static Platform parse(String curNodeName, String path) throws 
Exception {
             JsonNode root = JsonUtil.JSON_SERDE.readTree(new File(path));
             JsonNode platformNode = root.get("platform");
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/common/Scheduler.java
similarity index 79%
rename from clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java
rename to trogdor/src/main/java/org/apache/kafka/trogdor/common/Scheduler.java
index a8ada655b3e..e90782f4a9e 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Scheduler.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/Scheduler.java
@@ -14,11 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.utils;
+package org.apache.kafka.trogdor.common;
+
+import org.apache.kafka.common.utils.Time;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * An interface for scheduling tasks for the future.
@@ -26,7 +29,17 @@ import java.util.concurrent.ScheduledExecutorService;
  * Implementations of this class should be thread-safe.
  */
 public interface Scheduler {
-    Scheduler SYSTEM = new SystemScheduler();
+    Scheduler SYSTEM = new Scheduler() {
+        @Override
+        public Time time() {
+            return Time.SYSTEM;
+        }
+
+        @Override
+        public <T> Future<T> schedule(ScheduledExecutorService executor, 
Callable<T> callable, long delayMs) {
+            return executor.schedule(callable, delayMs, TimeUnit.MILLISECONDS);
+        }
+    };
 
     /**
      * Get the timekeeper associated with this scheduler.
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index b39969e7a97..80eb929052c 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -18,10 +18,10 @@
 package org.apache.kafka.trogdor.coordinator;
 
 import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
 import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 6b10de71d48..d8e8627a9ee 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -19,13 +19,13 @@ package org.apache.kafka.trogdor.coordinator;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.InvalidRequestException;
-import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
 import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
diff --git 
a/trogdor/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java 
b/trogdor/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index e21059823f3..3780a8fd36d 100644
--- a/trogdor/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/trogdor/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -17,9 +17,7 @@
 
 package org.apache.kafka.trogdor.agent;
 
-import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.trogdor.basic.BasicNode;
@@ -28,8 +26,10 @@ import org.apache.kafka.trogdor.basic.BasicTopology;
 import org.apache.kafka.trogdor.common.ExpectedTasks;
 import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
 import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.MockScheduler;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Scheduler;
 import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec;
 import org.apache.kafka.trogdor.fault.Kibosh;
 import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile;
diff --git 
a/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java 
b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index e3cc7884d68..ec5f7b68cdf 100644
--- 
a/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++ 
b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.trogdor.common;
 
-import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.trogdor.agent.Agent;
 import org.apache.kafka.trogdor.agent.AgentClient;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java 
b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MockScheduler.java
similarity index 96%
rename from 
clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
rename to 
trogdor/src/test/java/org/apache/kafka/trogdor/common/MockScheduler.java
index 882318ec003..c7ad383e064 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/trogdor/src/test/java/org/apache/kafka/trogdor/common/MockScheduler.java
@@ -14,9 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.utils;
+package org.apache.kafka.trogdor.common;
 
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
 
b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index ee5f143cd36..866c8cb48c0 100644
--- 
a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ 
b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -17,15 +17,15 @@
 
 package org.apache.kafka.trogdor.coordinator;
 
-import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.trogdor.agent.AgentClient;
 import org.apache.kafka.trogdor.common.CapturingCommandRunner;
 import org.apache.kafka.trogdor.common.ExpectedTasks;
 import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
 import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
+import org.apache.kafka.trogdor.common.MockScheduler;
+import org.apache.kafka.trogdor.common.Scheduler;
 import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;

Reply via email to