This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32c0e1c4d0e KAFKA-20247; Controller registration needs to retry after
request timeout (#21619)
32c0e1c4d0e is described below
commit 32c0e1c4d0ed4b2483a15d2408b0c58b1f17f81d
Author: Kevin Wu <[email protected]>
AuthorDate: Mon Mar 16 11:17:26 2026 -0500
KAFKA-20247; Controller registration needs to retry after request timeout
(#21619)
Set pendingRpc to false when controller registration times out. If we do
not set this, controllers cannot send ControllerRegistrationRequests
thereafter. Instead, subsequent calls to
maybeSendControllerRegistration() will always log:
"maybeSendControllerRegistration: waiting for the previous RPC to
complete." The asserts on L294 and L300 fail when pendingRpc does not
get set in onTimeout.
Previously, RegistrationResponseHandler callbacks were invoked from the
NodeToControllerRequestThread. Instead, these callbacks should append an
event to the ControllerRegistrationManger event queue.
Added testRetransmitRegistrationAfterTimeout. This test times out a
controller registration, and checks that the registration manager
channel manager's request queue state is as expected.
Reviewers: José Armando García Sancio <[email protected]>
---
.../server/ControllerRegistrationManager.scala | 15 ++++-
.../server/ControllerRegistrationManagerTest.scala | 65 +++++++++++++++++++---
2 files changed, 72 insertions(+), 8 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
index 9ee73e1c509..b5174cdfea0 100644
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
@@ -238,6 +238,16 @@ class ControllerRegistrationManager(
private class RegistrationResponseHandler extends
ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
+ eventQueue.append(new RequestCompleteEvent(response))
+ }
+
+ override def onTimeout(): Unit = {
+ eventQueue.append(new RequestTimeoutEvent())
+ }
+ }
+
+ private class RequestCompleteEvent(response: ClientResponse) extends
EventQueue.Event {
+ override def run(): Unit = {
pendingRpc = false
if (response.authenticationException() != null) {
error(s"RegistrationResponseHandler: authentication error",
response.authenticationException())
@@ -265,8 +275,11 @@ class ControllerRegistrationManager(
}
}
}
+ }
- override def onTimeout(): Unit = {
+ private class RequestTimeoutEvent extends EventQueue.Event {
+ override def run(): Unit = {
+ pendingRpc = false
error(s"RegistrationResponseHandler: channel manager timed out before
sending the request.")
scheduleNextCommunicationAfterFailure()
}
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
index a3c39cb05c0..299123f6489 100644
---
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
@@ -22,7 +22,7 @@ import
org.apache.kafka.common.message.ControllerRegistrationResponseData
import org.apache.kafka.common.metadata.{FeatureLevelRecord,
RegisterControllerRecord}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ControllerRegistrationResponse
-import org.apache.kafka.common.utils.{ExponentialBackoff, Time}
+import org.apache.kafka.common.utils.ExponentialBackoff
import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
import org.apache.kafka.metadata.{ListenerInfo, RecordTestUtils, VersionRange}
@@ -72,12 +72,15 @@ class ControllerRegistrationManagerTest {
): ControllerRegistrationManager = {
new ControllerRegistrationManager(context.config.nodeId,
context.clusterId,
- Time.SYSTEM,
+ context.time,
"controller-registration-manager-test-",
createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
RecordTestUtils.createTestControllerRegistration(1,
false).incarnationId(),
ListenerInfo.create(context.config.controllerListeners.asJava),
- new ExponentialBackoff(1, 2, 100, 0.02))
+ new ExponentialBackoff(1, 2, 100, 0)
+ // Use a backoff with no jitter so we can reliably observe the
intermediate
+ // state after receiving error responses and before the scheduled
retries.
+ )
}
private def registeredInLog(manager: ControllerRegistrationManager): Boolean
= {
@@ -249,20 +252,68 @@ class ControllerRegistrationManagerTest {
try {
context.controllerNodeProvider.node.set(controller1)
manager.start(context.mockChannelManager)
+
+ // send a ControllerRegistrationRequest after learning the MV
+ doMetadataUpdate(MetadataImage.EMPTY,
+ manager,
+ MetadataVersion.IBP_3_7_IV0,
+ r => if (r.controllerId() == 1) None else Some(r))
+ assertEquals((true, 0, 0), rpcStats(manager))
+
+ // the first response will trigger a retry
context.mockClient.prepareResponseFrom(new
ControllerRegistrationResponse(
new ControllerRegistrationResponseData().
setErrorCode(Errors.UNKNOWN_CONTROLLER_ID.code()).
setErrorMessage("Unknown controller 1")), controller1)
+ context.mockChannelManager.poll()
+ assertEquals((false, 0, 1), rpcStats(manager))
+
+ // the retried request will be sent after retryBackoffMs
+ context.time.sleep(1)
+ assertEquals((true, 0, 1), rpcStats(manager))
+
+ // the second response will complete the RPC successfully
context.mockClient.prepareResponseFrom(new
ControllerRegistrationResponse(
new ControllerRegistrationResponseData()), controller1)
+ context.mockChannelManager.poll()
+ assertEquals((false, 1, 0), rpcStats(manager))
+ } finally {
+ manager.close()
+ }
+ }
+
+ @Test
+ def testRetransmitRegistrationAfterTimeout(): Unit = {
+ val context = new RegistrationTestContext(configProperties)
+ val manager = newControllerRegistrationManager(context)
+ try {
+ context.controllerNodeProvider.node.set(controller1)
+
+ // send a ControllerRegistrationRequest after learning the MV
+ manager.start(context.mockChannelManager)
+ assertFalse(registeredInLog(manager))
+ assertEquals((false, 0, 0), rpcStats(manager))
doMetadataUpdate(MetadataImage.EMPTY,
manager,
MetadataVersion.IBP_3_7_IV0,
r => if (r.controllerId() == 1) None else Some(r))
- TestUtils.retryOnExceptionWithTimeout(30000, () => {
- context.mockChannelManager.poll()
- assertEquals((false, 1, 0), rpcStats(manager))
- })
+ // pendingRpc = true, successfulRpcs = 0, failedRpcs = 0
+ assertEquals((true, 0, 0), rpcStats(manager))
+ assertEquals(1, context.mockChannelManager.unsentQueue.size())
+
+ // time out the request before polling
+ // this will call the timeout callback
+ context.time.sleep(context.mockChannelManager.getTimeoutMs)
+ context.mockChannelManager.poll()
+ // pendingRpc = false, successfulRpcs = 0, failedRpcs = 1
+ assertEquals((false, 0, 1), rpcStats(manager))
+ assertEquals(0, context.mockChannelManager.unsentQueue.size())
+
+ // the retried request will be sent after retryBackoffMs
+ context.time.sleep(1)
+ // pendingRpc = true, successfulRpcs = 0, failedRpcs = 1
+ assertEquals((true, 0, 1), rpcStats(manager))
+ assertEquals(1, context.mockChannelManager.unsentQueue.size())
} finally {
manager.close()
}