This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 17fa123deee KAFKA-20247; Controller registration needs to retry after
request timeout (#21619)
17fa123deee is described below
commit 17fa123deee35cfef69ac109383a154ebf7c5b0b
Author: Kevin Wu <[email protected]>
AuthorDate: Mon Mar 16 11:17:26 2026 -0500
KAFKA-20247; Controller registration needs to retry after request timeout
(#21619)
Set pendingRpc to false when controller registration times out. If we do
not set this, controllers cannot send ControllerRegistrationRequests
thereafter. Instead, subsequent calls to
maybeSendControllerRegistration() will always log:
"maybeSendControllerRegistration: waiting for the previous RPC to
complete." The asserts on L294 and L300 fail when pendingRpc does not
get set in onTimeout.
Previously, RegistrationResponseHandler callbacks were invoked from the
NodeToControllerRequestThread. Instead, these callbacks should append an
event to the ControllerRegistrationManger event queue.
Added testRetransmitRegistrationAfterTimeout. This test times out a
controller registration, and checks that the registration manager
channel manager's request queue state is as expected.
Reviewers: José Armando García Sancio <[email protected]>
---
.../server/ControllerRegistrationManager.scala | 15 ++++-
.../server/ControllerRegistrationManagerTest.scala | 65 +++++++++++++++++++---
2 files changed, 72 insertions(+), 8 deletions(-)
diff --git
a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
index 9ee73e1c509..b5174cdfea0 100644
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
@@ -238,6 +238,16 @@ class ControllerRegistrationManager(
private class RegistrationResponseHandler extends
ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
+ eventQueue.append(new RequestCompleteEvent(response))
+ }
+
+ override def onTimeout(): Unit = {
+ eventQueue.append(new RequestTimeoutEvent())
+ }
+ }
+
+ private class RequestCompleteEvent(response: ClientResponse) extends
EventQueue.Event {
+ override def run(): Unit = {
pendingRpc = false
if (response.authenticationException() != null) {
error(s"RegistrationResponseHandler: authentication error",
response.authenticationException())
@@ -265,8 +275,11 @@ class ControllerRegistrationManager(
}
}
}
+ }
- override def onTimeout(): Unit = {
+ private class RequestTimeoutEvent extends EventQueue.Event {
+ override def run(): Unit = {
+ pendingRpc = false
error(s"RegistrationResponseHandler: channel manager timed out before
sending the request.")
scheduleNextCommunicationAfterFailure()
}
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
index 46ea20758e2..a57111f481e 100644
---
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
@@ -22,7 +22,7 @@ import
org.apache.kafka.common.message.ControllerRegistrationResponseData
import org.apache.kafka.common.metadata.{FeatureLevelRecord,
RegisterControllerRecord}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ControllerRegistrationResponse
-import org.apache.kafka.common.utils.{ExponentialBackoff, Time}
+import org.apache.kafka.common.utils.ExponentialBackoff
import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
import org.apache.kafka.metadata.{ListenerInfo, RecordTestUtils, VersionRange}
@@ -72,12 +72,15 @@ class ControllerRegistrationManagerTest {
): ControllerRegistrationManager = {
new ControllerRegistrationManager(context.config.nodeId,
context.clusterId,
- Time.SYSTEM,
+ context.time,
"controller-registration-manager-test-",
createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
RecordTestUtils.createTestControllerRegistration(1,
false).incarnationId(),
ListenerInfo.create(context.config.controllerListeners.asJava),
- new ExponentialBackoff(1, 2, 100, 0.02))
+ new ExponentialBackoff(1, 2, 100, 0)
+ // Use a backoff with no jitter so we can reliably observe the
intermediate
+ // state after receiving error responses and before the scheduled
retries.
+ )
}
private def registeredInLog(manager: ControllerRegistrationManager): Boolean
= {
@@ -249,20 +252,68 @@ class ControllerRegistrationManagerTest {
try {
context.controllerNodeProvider.node.set(controller1)
manager.start(context.mockChannelManager)
+
+ // send a ControllerRegistrationRequest after learning the MV
+ doMetadataUpdate(MetadataImage.EMPTY,
+ manager,
+ MetadataVersion.IBP_3_7_IV0,
+ r => if (r.controllerId() == 1) None else Some(r))
+ assertEquals((true, 0, 0), rpcStats(manager))
+
+ // the first response will trigger a retry
context.mockClient.prepareResponseFrom(new
ControllerRegistrationResponse(
new ControllerRegistrationResponseData().
setErrorCode(Errors.UNKNOWN_CONTROLLER_ID.code()).
setErrorMessage("Unknown controller 1")), controller1)
+ context.mockChannelManager.poll()
+ assertEquals((false, 0, 1), rpcStats(manager))
+
+ // the retried request will be sent after retryBackoffMs
+ context.time.sleep(1)
+ assertEquals((true, 0, 1), rpcStats(manager))
+
+ // the second response will complete the RPC successfully
context.mockClient.prepareResponseFrom(new
ControllerRegistrationResponse(
new ControllerRegistrationResponseData()), controller1)
+ context.mockChannelManager.poll()
+ assertEquals((false, 1, 0), rpcStats(manager))
+ } finally {
+ manager.close()
+ }
+ }
+
+ @Test
+ def testRetransmitRegistrationAfterTimeout(): Unit = {
+ val context = new RegistrationTestContext(configProperties)
+ val manager = newControllerRegistrationManager(context)
+ try {
+ context.controllerNodeProvider.node.set(controller1)
+
+ // send a ControllerRegistrationRequest after learning the MV
+ manager.start(context.mockChannelManager)
+ assertFalse(registeredInLog(manager))
+ assertEquals((false, 0, 0), rpcStats(manager))
doMetadataUpdate(MetadataImage.EMPTY,
manager,
MetadataVersion.IBP_3_7_IV0,
r => if (r.controllerId() == 1) None else Some(r))
- TestUtils.retryOnExceptionWithTimeout(30000, () => {
- context.mockChannelManager.poll()
- assertEquals((false, 1, 0), rpcStats(manager))
- })
+ // pendingRpc = true, successfulRpcs = 0, failedRpcs = 0
+ assertEquals((true, 0, 0), rpcStats(manager))
+ assertEquals(1, context.mockChannelManager.unsentQueue.size())
+
+ // time out the request before polling
+ // this will call the timeout callback
+ context.time.sleep(context.mockChannelManager.getTimeoutMs)
+ context.mockChannelManager.poll()
+ // pendingRpc = false, successfulRpcs = 0, failedRpcs = 1
+ assertEquals((false, 0, 1), rpcStats(manager))
+ assertEquals(0, context.mockChannelManager.unsentQueue.size())
+
+ // the retried request will be sent after retryBackoffMs
+ context.time.sleep(1)
+ // pendingRpc = true, successfulRpcs = 0, failedRpcs = 1
+ assertEquals((true, 0, 1), rpcStats(manager))
+ assertEquals(1, context.mockChannelManager.unsentQueue.size())
} finally {
manager.close()
}