This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c9a5f8603b6 MINOR: Migrate controllerSocketTimeoutMs from KafkaConfig
to AbstractKafkaConfig (#22188)
c9a5f8603b6 is described below
commit c9a5f8603b66812e3ff2590413d9597949bb61cc
Author: Yunchi Pang <[email protected]>
AuthorDate: Fri May 8 13:29:34 2026 -0400
MINOR: Migrate controllerSocketTimeoutMs from KafkaConfig to
AbstractKafkaConfig (#22188)
- Migratre `controllerSocketTimeoutMs` to `AbstractKafkaConfig`
- Update usage in `NodeToControllerChannelManagerImpl`
Reviewers: Murali Basani <[email protected]>, Ken Huang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 1 -
.../server/NodeToControllerChannelManagerImpl.java | 5 +-
.../server/NodeToControllerRequestThread.java | 6 +-
.../kafka/server/config/AbstractKafkaConfig.java | 4 ++
.../server/NodeToControllerRequestThreadTest.java | 83 ++++++++++------------
5 files changed, 44 insertions(+), 55 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6f8781b72e0..c8196a17d3a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -236,7 +236,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
val replicaSelectorClassName =
Option(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG))
/** ********* Replication configuration ***********/
- val controllerSocketTimeoutMs: Int =
getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
val defaultReplicationFactor: Int =
getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
val replicaLagTimeMaxMs =
getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG)
val replicaSocketTimeoutMs =
getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG)
diff --git
a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
index 1306da48c5a..42394a9074f 100644
---
a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
+++
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java
@@ -38,7 +38,6 @@ import org.apache.kafka.raft.KRaftConfigs;
import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.config.AbstractKafkaConfig;
-import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.slf4j.Logger;
@@ -82,7 +81,7 @@ public class NodeToControllerChannelManagerImpl implements
NodeToControllerChann
buildNetworkClient(controllerInformation),
manualMetadataUpdater,
controllerNodeProvider,
- config,
+ config.controllerSocketTimeoutMs(),
time,
threadName,
retryTimeoutMs
@@ -122,7 +121,7 @@ public class NodeToControllerChannelManagerImpl implements
NodeToControllerChann
50,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
- Math.min(Integer.MAX_VALUE, (int)
Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG),
retryTimeoutMs)), // request timeout should not exceed the provided retry
timeout
+ Math.min(Integer.MAX_VALUE, (int)
Math.min(config.controllerSocketTimeoutMs(), retryTimeoutMs)), // request
timeout should not exceed the provided retry timeout
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
config.getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,
diff --git
a/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
b/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
index e3d870999b2..6f4a85b3aa3 100644
---
a/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
+++
b/server/src/main/java/org/apache/kafka/server/NodeToControllerRequestThread.java
@@ -21,10 +21,8 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
@@ -67,11 +65,11 @@ public class NodeToControllerRequestThread extends
InterBrokerSendThread {
public NodeToControllerRequestThread(KafkaClient initialNetworkClient,
ManualMetadataUpdater metadataUpdater,
Supplier<ControllerInformation>
controllerNodeProvider,
- AbstractConfig config,
+ int controllerSocketTimeoutMs,
Time time,
String threadName,
Long retryTimeoutMs) {
- super(threadName, initialNetworkClient, Math.min(Integer.MAX_VALUE,
(int)
Math.min(config.getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG),
retryTimeoutMs)), time, false);
+ super(threadName, initialNetworkClient, Math.min(Integer.MAX_VALUE,
(int) Math.min(controllerSocketTimeoutMs, retryTimeoutMs)), time, false);
this.time = time;
this.controllerNodeProvider = controllerNodeProvider;
this.metadataUpdater = metadataUpdater;
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index d129fdcd65b..ac5e33dc70b 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -113,6 +113,10 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
return getInt(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG);
}
+ public int controllerSocketTimeoutMs() {
+ return getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG);
+ }
+
public int numRecoveryThreadsPerDataDir() {
return
getInt(ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG);
}
diff --git
a/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
b/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
index 5341db3c12d..2814c343dbc 100644
---
a/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
+++
b/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.network.ListenerName;
@@ -60,10 +59,6 @@ import static org.mockito.Mockito.mock;
class NodeToControllerRequestThreadTest {
- private static AbstractConfig createConfig() {
- return new AbstractConfig(ReplicationConfigs.CONFIG_DEF, Map.of());
- }
-
private static ControllerInformation controllerInfo(Optional<Node> node) {
return new ControllerInformation(node, new ListenerName(""),
SecurityProtocol.PLAINTEXT, "");
}
@@ -86,20 +81,36 @@ class NodeToControllerRequestThreadTest {
return () -> ref.getAndSet(second);
}
+ private static NodeToControllerRequestThread createAndStartRequestThread(
+ MockClient mockClient,
+ Supplier<ControllerInformation> controllerNodeProvider,
+ MockTime time,
+ long retryTimeoutMs) {
+ NodeToControllerRequestThread thread = new
NodeToControllerRequestThread(
+ mockClient, new ManualMetadataUpdater(),
+ controllerNodeProvider,
ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "",
retryTimeoutMs);
+ thread.setStarted(true);
+ return thread;
+ }
+
+ private static NodeToControllerRequestThread createAndStartRequestThread(
+ MockClient mockClient,
+ Supplier<ControllerInformation> controllerNodeProvider,
+ MockTime time) {
+ return createAndStartRequestThread(mockClient, controllerNodeProvider,
time, Long.MAX_VALUE);
+ }
+
@Test
void testRetryTimeoutWhileControllerNotAvailable() {
MockTime time = new MockTime();
- AbstractConfig config = createConfig();
Metadata metadata = mock(Metadata.class);
MockClient mockClient = new MockClient(time, metadata);
Supplier<ControllerInformation> controllerNodeProvider =
NodeToControllerRequestThreadTest::emptyControllerInfo;
long retryTimeoutMs = 30000;
- NodeToControllerRequestThread testRequestThread = new
NodeToControllerRequestThread(
- mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", retryTimeoutMs);
- testRequestThread.setStarted(true);
+ NodeToControllerRequestThread testRequestThread =
createAndStartRequestThread(
+ mockClient, controllerNodeProvider, time, retryTimeoutMs);
TestControllerRequestCompletionHandler completionHandler =
new TestControllerRequestCompletionHandler(null);
@@ -123,7 +134,6 @@ class NodeToControllerRequestThreadTest {
void testRequestsSent() {
// just a simple test that tests whether the request from 1 -> 2 is
sent and the response callback is called
MockTime time = new MockTime();
- AbstractConfig config = createConfig();
int controllerId = 2;
Metadata metadata = mock(Metadata.class);
@@ -134,10 +144,8 @@ class NodeToControllerRequestThreadTest {
() -> controllerInfo(Optional.of(activeController));
MetadataResponse expectedResponse =
RequestTestUtils.metadataUpdateWith(2, Map.of("a", 2));
- NodeToControllerRequestThread testRequestThread = new
NodeToControllerRequestThread(
- mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", Long.MAX_VALUE);
- testRequestThread.setStarted(true);
+ NodeToControllerRequestThread testRequestThread =
createAndStartRequestThread(
+ mockClient, controllerNodeProvider, time);
mockClient.prepareResponse(expectedResponse);
TestControllerRequestCompletionHandler completionHandler =
@@ -164,7 +172,6 @@ class NodeToControllerRequestThreadTest {
void testControllerChanged() {
// in this test the controller changes from node 1 -> node 2
MockTime time = new MockTime();
- AbstractConfig config = createConfig();
int oldControllerId = 1;
int newControllerId = 2;
@@ -178,10 +185,8 @@ class NodeToControllerRequestThreadTest {
controllerInfo(Optional.of(newController)));
MetadataResponse expectedResponse =
RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
- NodeToControllerRequestThread testRequestThread = new
NodeToControllerRequestThread(
- mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", Long.MAX_VALUE);
- testRequestThread.setStarted(true);
+ NodeToControllerRequestThread testRequestThread =
createAndStartRequestThread(
+ mockClient, controllerNodeProvider, time);
TestControllerRequestCompletionHandler completionHandler =
new TestControllerRequestCompletionHandler(expectedResponse);
@@ -212,7 +217,6 @@ class NodeToControllerRequestThreadTest {
@Test
void testNotController() {
MockTime time = new MockTime();
- AbstractConfig config = createConfig();
int oldControllerId = 1;
int newControllerId = 2;
@@ -230,10 +234,8 @@ class NodeToControllerRequestThreadTest {
Map.of("a", Errors.NOT_CONTROLLER),
Map.of("a", 2));
MetadataResponse expectedResponse =
RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
- NodeToControllerRequestThread testRequestThread = new
NodeToControllerRequestThread(
- mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", Long.MAX_VALUE);
- testRequestThread.setStarted(true);
+ NodeToControllerRequestThread testRequestThread =
createAndStartRequestThread(
+ mockClient, controllerNodeProvider, time);
TestControllerRequestCompletionHandler completionHandler =
new TestControllerRequestCompletionHandler(expectedResponse);
@@ -271,7 +273,6 @@ class NodeToControllerRequestThreadTest {
@Test
void testEnvelopeResponseWithNotControllerError() {
MockTime time = new MockTime();
- AbstractConfig config = createConfig();
int oldControllerId = 1;
int newControllerId = 2;
@@ -294,10 +295,8 @@ class NodeToControllerRequestThreadTest {
// response for retry request after receiving NOT_CONTROLLER error
MetadataResponse expectedResponse =
RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
- NodeToControllerRequestThread testRequestThread = new
NodeToControllerRequestThread(
- mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", Long.MAX_VALUE);
- testRequestThread.setStarted(true);
+ NodeToControllerRequestThread testRequestThread =
createAndStartRequestThread(
+ mockClient, controllerNodeProvider, time);
TestControllerRequestCompletionHandler completionHandler =
new TestControllerRequestCompletionHandler(expectedResponse);
@@ -343,7 +342,6 @@ class NodeToControllerRequestThreadTest {
@Test
void testRetryTimeout() {
MockTime time = new MockTime();
- AbstractConfig config = createConfig();
int controllerId = 1;
Metadata metadata = mock(Metadata.class);
@@ -357,10 +355,8 @@ class NodeToControllerRequestThreadTest {
MetadataResponse responseWithNotControllerError =
RequestTestUtils.metadataUpdateWith("cluster1", 2,
Map.of("a", Errors.NOT_CONTROLLER),
Map.of("a", 2));
- NodeToControllerRequestThread testRequestThread = new
NodeToControllerRequestThread(
- mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", retryTimeoutMs);
- testRequestThread.setStarted(true);
+ NodeToControllerRequestThread testRequestThread =
createAndStartRequestThread(
+ mockClient, controllerNodeProvider, time, retryTimeoutMs);
TestControllerRequestCompletionHandler completionHandler =
new TestControllerRequestCompletionHandler();
@@ -391,7 +387,6 @@ class NodeToControllerRequestThreadTest {
@Test
void testUnsupportedVersionHandling() {
MockTime time = new MockTime();
- AbstractConfig config = createConfig();
int controllerId = 2;
Metadata metadata = mock(Metadata.class);
@@ -422,10 +417,8 @@ class NodeToControllerRequestThreadTest {
mockClient.prepareUnsupportedVersionResponse(request ->
request.apiKey() == ApiKeys.METADATA);
- NodeToControllerRequestThread testRequestThread = new
NodeToControllerRequestThread(
- mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", Long.MAX_VALUE);
- testRequestThread.setStarted(true);
+ NodeToControllerRequestThread testRequestThread =
createAndStartRequestThread(
+ mockClient, controllerNodeProvider, time);
testRequestThread.enqueue(queueItem);
pollUntil(testRequestThread, () -> callbackResponse.get() != null);
@@ -435,7 +428,6 @@ class NodeToControllerRequestThreadTest {
@Test
void testAuthenticationExceptionHandling() {
MockTime time = new MockTime();
- AbstractConfig config = createConfig();
int controllerId = 2;
Metadata metadata = mock(Metadata.class);
@@ -466,10 +458,8 @@ class NodeToControllerRequestThreadTest {
mockClient.createPendingAuthenticationError(activeController, 50);
- NodeToControllerRequestThread testRequestThread = new
NodeToControllerRequestThread(
- mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", Long.MAX_VALUE);
- testRequestThread.setStarted(true);
+ NodeToControllerRequestThread testRequestThread =
createAndStartRequestThread(
+ mockClient, controllerNodeProvider, time);
testRequestThread.enqueue(queueItem);
pollUntil(testRequestThread, () -> callbackResponse.get() != null);
@@ -481,7 +471,6 @@ class NodeToControllerRequestThreadTest {
void testThreadNotStarted() {
// Make sure we throw if we enqueue anything while the thread is not
running
MockTime time = new MockTime();
- AbstractConfig config = createConfig();
Metadata metadata = mock(Metadata.class);
MockClient mockClient = new MockClient(time, metadata);
@@ -490,7 +479,7 @@ class NodeToControllerRequestThreadTest {
NodeToControllerRequestThread testRequestThread = new
NodeToControllerRequestThread(
mockClient, new ManualMetadataUpdater(),
- controllerNodeProvider, config, time, "", Long.MAX_VALUE);
+ controllerNodeProvider,
ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, time, "",
Long.MAX_VALUE);
TestControllerRequestCompletionHandler completionHandler =
new TestControllerRequestCompletionHandler(null);