This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 17fa123deee KAFKA-20247; Controller registration needs to retry after 
request timeout (#21619)
17fa123deee is described below

commit 17fa123deee35cfef69ac109383a154ebf7c5b0b
Author: Kevin Wu <[email protected]>
AuthorDate: Mon Mar 16 11:17:26 2026 -0500

    KAFKA-20247; Controller registration needs to retry after request timeout 
(#21619)
    
    Set pendingRpc to false when controller registration times out. If we do
    not set this, controllers cannot send ControllerRegistrationRequests
    thereafter. Instead, subsequent calls to
    maybeSendControllerRegistration() will always log:
    "maybeSendControllerRegistration: waiting for the previous RPC to
    complete." The asserts on L294 and L300 fail when pendingRpc does not
    get set in onTimeout.
    
    Previously, RegistrationResponseHandler callbacks were invoked from the
    NodeToControllerRequestThread. Instead, these callbacks should append an
    event to the ControllerRegistrationManger event queue.
    
    Added testRetransmitRegistrationAfterTimeout. This test times out a
    controller registration, and checks that the registration manager
    channel manager's request queue state is as expected.
    
    Reviewers: José Armando García Sancio <[email protected]>
---
 .../server/ControllerRegistrationManager.scala     | 15 ++++-
 .../server/ControllerRegistrationManagerTest.scala | 65 +++++++++++++++++++---
 2 files changed, 72 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala 
b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
index 9ee73e1c509..b5174cdfea0 100644
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
@@ -238,6 +238,16 @@ class ControllerRegistrationManager(
 
   private class RegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
     override def onComplete(response: ClientResponse): Unit = {
+      eventQueue.append(new RequestCompleteEvent(response))
+    }
+
+    override def onTimeout(): Unit = {
+      eventQueue.append(new RequestTimeoutEvent())
+    }
+  }
+
+  private class RequestCompleteEvent(response: ClientResponse) extends 
EventQueue.Event {
+    override def run(): Unit = {
       pendingRpc = false
       if (response.authenticationException() != null) {
         error(s"RegistrationResponseHandler: authentication error", 
response.authenticationException())
@@ -265,8 +275,11 @@ class ControllerRegistrationManager(
         }
       }
     }
+  }
 
-    override def onTimeout(): Unit = {
+  private class RequestTimeoutEvent extends EventQueue.Event {
+    override def run(): Unit = {
+      pendingRpc = false
       error(s"RegistrationResponseHandler: channel manager timed out before 
sending the request.")
       scheduleNextCommunicationAfterFailure()
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
index 46ea20758e2..a57111f481e 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
@@ -22,7 +22,7 @@ import 
org.apache.kafka.common.message.ControllerRegistrationResponseData
 import org.apache.kafka.common.metadata.{FeatureLevelRecord, 
RegisterControllerRecord}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ControllerRegistrationResponse
-import org.apache.kafka.common.utils.{ExponentialBackoff, Time}
+import org.apache.kafka.common.utils.ExponentialBackoff
 import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.{ListenerInfo, RecordTestUtils, VersionRange}
@@ -72,12 +72,15 @@ class ControllerRegistrationManagerTest {
   ): ControllerRegistrationManager = {
     new ControllerRegistrationManager(context.config.nodeId,
       context.clusterId,
-      Time.SYSTEM,
+      context.time,
       "controller-registration-manager-test-",
       createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
       RecordTestUtils.createTestControllerRegistration(1, 
false).incarnationId(),
       ListenerInfo.create(context.config.controllerListeners.asJava),
-      new ExponentialBackoff(1, 2, 100, 0.02))
+      new ExponentialBackoff(1, 2, 100, 0)
+      // Use a backoff with no jitter so we can reliably observe the 
intermediate
+      // state after receiving error responses and before the scheduled 
retries.
+    )
   }
 
   private def registeredInLog(manager: ControllerRegistrationManager): Boolean 
= {
@@ -249,20 +252,68 @@ class ControllerRegistrationManagerTest {
     try {
       context.controllerNodeProvider.node.set(controller1)
       manager.start(context.mockChannelManager)
+
+      // send a ControllerRegistrationRequest after learning the MV
+      doMetadataUpdate(MetadataImage.EMPTY,
+        manager,
+        MetadataVersion.IBP_3_7_IV0,
+        r => if (r.controllerId() == 1) None else Some(r))
+      assertEquals((true, 0, 0), rpcStats(manager))
+
+      // the first response will trigger a retry
       context.mockClient.prepareResponseFrom(new 
ControllerRegistrationResponse(
         new ControllerRegistrationResponseData().
           setErrorCode(Errors.UNKNOWN_CONTROLLER_ID.code()).
           setErrorMessage("Unknown controller 1")), controller1)
+      context.mockChannelManager.poll()
+      assertEquals((false, 0, 1), rpcStats(manager))
+
+      // the retried request will be sent after retryBackoffMs
+      context.time.sleep(1)
+      assertEquals((true, 0, 1), rpcStats(manager))
+
+      // the second response will complete the RPC successfully
       context.mockClient.prepareResponseFrom(new 
ControllerRegistrationResponse(
         new ControllerRegistrationResponseData()), controller1)
+      context.mockChannelManager.poll()
+      assertEquals((false, 1, 0), rpcStats(manager))
+    } finally {
+      manager.close()
+    }
+  }
+
+  @Test
+  def testRetransmitRegistrationAfterTimeout(): Unit = {
+    val context = new RegistrationTestContext(configProperties)
+    val manager = newControllerRegistrationManager(context)
+    try {
+      context.controllerNodeProvider.node.set(controller1)
+
+      // send a ControllerRegistrationRequest after learning the MV
+      manager.start(context.mockChannelManager)
+      assertFalse(registeredInLog(manager))
+      assertEquals((false, 0, 0), rpcStats(manager))
       doMetadataUpdate(MetadataImage.EMPTY,
         manager,
         MetadataVersion.IBP_3_7_IV0,
         r => if (r.controllerId() == 1) None else Some(r))
-      TestUtils.retryOnExceptionWithTimeout(30000, () => {
-        context.mockChannelManager.poll()
-        assertEquals((false, 1, 0), rpcStats(manager))
-      })
+      // pendingRpc = true, successfulRpcs = 0, failedRpcs = 0
+      assertEquals((true, 0, 0), rpcStats(manager))
+      assertEquals(1, context.mockChannelManager.unsentQueue.size())
+
+      // time out the request before polling
+      // this will call the timeout callback
+      context.time.sleep(context.mockChannelManager.getTimeoutMs)
+      context.mockChannelManager.poll()
+      // pendingRpc = false, successfulRpcs = 0, failedRpcs = 1
+      assertEquals((false, 0, 1), rpcStats(manager))
+      assertEquals(0, context.mockChannelManager.unsentQueue.size())
+
+      // the retried request will be sent after retryBackoffMs
+      context.time.sleep(1)
+      // pendingRpc = true, successfulRpcs = 0, failedRpcs = 1
+      assertEquals((true, 0, 1), rpcStats(manager))
+      assertEquals(1, context.mockChannelManager.unsentQueue.size())
     } finally {
       manager.close()
     }

Reply via email to