This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a3fae9cbfa9 KAFKA-20348: Move ControllerRegistrationManager to server 
module (#21854)
a3fae9cbfa9 is described below

commit a3fae9cbfa9556efc3020ccf0bb6e0e1e6271496
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Mar 23 21:10:05 2026 +0100

    KAFKA-20348: Move ControllerRegistrationManager to server module (#21854)
    
    
    Reviewers: Christo Lolov <[email protected]>
---
 .../server/ControllerRegistrationManager.scala     | 301 -----------------
 .../main/scala/kafka/server/ControllerServer.scala |   2 +-
 .../server/ControllerRegistrationManagerTest.scala |   2 +-
 .../controller/ControllerRegistrationManager.java  | 375 +++++++++++++++++++++
 4 files changed, 377 insertions(+), 303 deletions(-)

diff --git 
a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala 
b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
deleted file mode 100644
index b5174cdfea0..00000000000
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util
-import java.util.concurrent.TimeUnit.MILLISECONDS
-import kafka.utils.Logging
-import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.message.ControllerRegistrationRequestData
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{ControllerRegistrationRequest, 
ControllerRegistrationResponse}
-import org.apache.kafka.metadata.{ListenerInfo, VersionRange}
-import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
-import org.apache.kafka.image.loader.LoaderManifest
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.image.publisher.MetadataPublisher
-import org.apache.kafka.queue.EventQueue.DeadlineFunction
-import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
MetadataVersion, NodeToControllerChannelManager}
-
-import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters._
-
-/**
- * The controller registration manager handles registering this controller 
with the controller
- * quorum. This support was added by KIP-919, and requires a metadata version 
of 3.7 or higher.
- *
- * This code uses an event queue paradigm. Modifications get translated into 
events, which
- * are placed on the queue to be processed sequentially. As described in the 
JavaDoc for
- * each variable, most mutable state can be accessed only from that event 
queue thread.
- */
-class ControllerRegistrationManager(
-  val nodeId: Int,
-  val clusterId: String,
-  val time: Time,
-  val threadNamePrefix: String,
-  val supportedFeatures: util.Map[String, VersionRange],
-  val incarnationId: Uuid,
-  val listenerInfo: ListenerInfo,
-  val resendExponentialBackoff: ExponentialBackoff = new 
ExponentialBackoff(100, 2, 120000L, 0.02)
-) extends Logging with MetadataPublisher {
-  override def name(): String = "ControllerRegistrationManager"
-
-  private def logPrefix(): String = {
-    val builder = new StringBuilder("[ControllerRegistrationManager")
-    builder.append(" id=").append(nodeId)
-    builder.append(" incarnation=").append(incarnationId)
-    builder.append("] ")
-    builder.toString()
-  }
-
-  val logContext = new LogContext(logPrefix())
-
-  this.logIdent = logContext.logPrefix()
-
-  /**
-   * True if there is a pending RPC. Only read or written from the event queue 
thread.
-   */
-  var pendingRpc = false
-
-  /**
-   * The number of RPCs that we successfully sent.
-   * Only read or written from the event queue thread.
-   */
-  var successfulRpcs = 0L
-
-  /**
-   * The number of RPCs that we failed to send, or got back a failure response 
for. This is
-   * cleared after a success. Only read or written from the event queue thread.
-   */
-  var failedRpcs = 0L
-
-  /**
-   * The current metadata version that is in effect. Only read or written from 
the event queue thread.
-   */
-  private var metadataVersion: Option[MetadataVersion] = None
-
-  /**
-   * True if we're registered. Only read or written from the event queue 
thread.
-   */
-  var registeredInLog: Boolean = false
-
-  /**
-   * The channel manager, or null if this manager has not been started yet.  
This variable
-   * can only be read or written from the event queue thread.
-   */
-  private var _channelManager: NodeToControllerChannelManager = _
-
-  /**
-   * The event queue.
-   */
-  private[server] val eventQueue = new KafkaEventQueue(time,
-    logContext,
-    threadNamePrefix + "registration-manager-",
-    new ShutdownEvent())
-
-  private class ShutdownEvent extends EventQueue.Event {
-    override def run(): Unit = {
-      try {
-        info(s"shutting down.")
-        if (_channelManager != null) {
-          _channelManager.shutdown()
-          _channelManager = null
-        }
-      } catch {
-        case t: Throwable => error("ControllerRegistrationManager.stop error", 
t)
-      }
-    }
-  }
-
-  /**
-   * Start the ControllerRegistrationManager.
-   *
-   * @param channelManager                The channel manager to use.
-   */
-  def start(channelManager: NodeToControllerChannelManager): Unit = {
-    eventQueue.append(() => {
-      try {
-        info(s"initialized channel manager.")
-        _channelManager = channelManager
-        maybeSendControllerRegistration()
-      } catch {
-        case t: Throwable => error("start error", t)
-      }
-    })
-  }
-
-  /**
-   * Start shutting down the ControllerRegistrationManager, but do not block.
-   */
-  def beginShutdown(): Unit = {
-    eventQueue.beginShutdown("beginShutdown")
-  }
-
-  /**
-   * Shut down the ControllerRegistrationManager and block until all threads 
are joined.
-   */
-  override def close(): Unit = {
-    beginShutdown()
-    eventQueue.close()
-  }
-
-  override def onMetadataUpdate(
-    delta: MetadataDelta,
-    newImage: MetadataImage,
-    manifest: LoaderManifest
-  ): Unit = {
-    if (delta.featuresDelta() != null ||
-        (delta.clusterDelta() != null && 
delta.clusterDelta().changedControllers().containsKey(nodeId))) {
-      eventQueue.append(new MetadataUpdateEvent(delta, newImage))
-    }
-  }
-
-  private class MetadataUpdateEvent(
-    delta: MetadataDelta,
-    newImage: MetadataImage
-  ) extends EventQueue.Event {
-    override def run(): Unit = {
-      try {
-        if (delta.featuresDelta() != null) {
-          metadataVersion = newImage.features().metadataVersion().toScala
-        }
-        if (delta.clusterDelta() != null) {
-          if (delta.clusterDelta().changedControllers().containsKey(nodeId)) {
-            val curRegistration = newImage.cluster().controllers().get(nodeId)
-            if (curRegistration == null) {
-              info(s"Registration removed for this node ID.")
-              registeredInLog = false
-            } else if (!curRegistration.incarnationId().equals(incarnationId)) 
{
-              info(s"Found registration for ${curRegistration.incarnationId()} 
instead of our incarnation.")
-              registeredInLog = false
-            } else {
-              info(s"Our registration has been persisted to the metadata log.")
-              registeredInLog = true
-            }
-          }
-        }
-        maybeSendControllerRegistration()
-      } catch {
-        case t: Throwable => error("onMetadataUpdate error", t)
-      }
-    }
-  }
-
-  private def maybeSendControllerRegistration(): Unit = {
-    val metadataVersion = this.metadataVersion
-    if (registeredInLog) {
-      debug("maybeSendControllerRegistration: controller is already 
registered.")
-    } else if (_channelManager == null) {
-      debug("maybeSendControllerRegistration: cannot register yet because the 
channel manager has not been initialized.")
-    } else if (metadataVersion.isEmpty) {
-      info("maybeSendControllerRegistration: cannot register yet because the 
metadata.version is not known yet.")
-    } else if (!metadataVersion.get.isControllerRegistrationSupported) {
-      info("maybeSendControllerRegistration: cannot register yet because the 
metadata.version is " +
-          s"still $metadataVersion, which does not support KIP-919 controller 
registration.")
-    } else if (pendingRpc) {
-      info("maybeSendControllerRegistration: waiting for the previous RPC to 
complete.")
-    } else {
-      sendControllerRegistration()
-    }
-  }
-
-  private def sendControllerRegistration(): Unit = {
-    val features = new ControllerRegistrationRequestData.FeatureCollection()
-    supportedFeatures.asScala.foreach {
-      case (name, range) => features.add(new 
ControllerRegistrationRequestData.Feature().
-        setName(name).
-        setMinSupportedVersion(range.min()).
-        setMaxSupportedVersion(range.max()))
-    }
-    val data = new ControllerRegistrationRequestData().
-      setControllerId(nodeId).
-      setFeatures(features).
-      setIncarnationId(incarnationId).
-      setListeners(listenerInfo.toControllerRegistrationRequest).
-      setZkMigrationReady(false)
-
-    info(s"sendControllerRegistration: attempting to send $data")
-    _channelManager.sendRequest(new 
ControllerRegistrationRequest.Builder(data),
-      new RegistrationResponseHandler())
-    pendingRpc = true
-  }
-
-  private class RegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
-    override def onComplete(response: ClientResponse): Unit = {
-      eventQueue.append(new RequestCompleteEvent(response))
-    }
-
-    override def onTimeout(): Unit = {
-      eventQueue.append(new RequestTimeoutEvent())
-    }
-  }
-
-  private class RequestCompleteEvent(response: ClientResponse) extends 
EventQueue.Event {
-    override def run(): Unit = {
-      pendingRpc = false
-      if (response.authenticationException() != null) {
-        error(s"RegistrationResponseHandler: authentication error", 
response.authenticationException())
-        scheduleNextCommunicationAfterFailure()
-      } else if (response.versionMismatch() != null) {
-        error(s"RegistrationResponseHandler: unsupported API version error", 
response.versionMismatch())
-        scheduleNextCommunicationAfterFailure()
-      } else if (response.responseBody() == null) {
-        error(s"RegistrationResponseHandler: unknown error")
-        scheduleNextCommunicationAfterFailure()
-      } else if 
(!response.responseBody().isInstanceOf[ControllerRegistrationResponse]) {
-        error(s"RegistrationResponseHandler: invalid response type error")
-        scheduleNextCommunicationAfterFailure()
-      } else {
-        val message = 
response.responseBody().asInstanceOf[ControllerRegistrationResponse]
-        val errorCode = Errors.forCode(message.data().errorCode())
-        if (errorCode == Errors.NONE) {
-          successfulRpcs = successfulRpcs + 1
-          failedRpcs = 0
-          info(s"RegistrationResponseHandler: controller acknowledged 
ControllerRegistrationRequest.")
-        } else {
-          info(s"RegistrationResponseHandler: controller returned error 
$errorCode " +
-            s"(${message.data().errorMessage()})")
-          scheduleNextCommunicationAfterFailure()
-        }
-      }
-    }
-  }
-
-  private class RequestTimeoutEvent extends EventQueue.Event {
-    override def run(): Unit = {
-      pendingRpc = false
-      error(s"RegistrationResponseHandler: channel manager timed out before 
sending the request.")
-      scheduleNextCommunicationAfterFailure()
-    }
-  }
-
-  private def scheduleNextCommunicationAfterFailure(): Unit = {
-    val delayMs = resendExponentialBackoff.backoff(failedRpcs)
-    failedRpcs = failedRpcs + 1
-    scheduleNextCommunication(delayMs)
-  }
-
-  private def scheduleNextCommunication(intervalMs: Long): Unit = {
-    trace(s"Scheduling next communication at $intervalMs ms from now.")
-    val deadlineNs = time.nanoseconds() + MILLISECONDS.toNanos(intervalMs)
-    eventQueue.scheduleDeferred("communication",
-      new DeadlineFunction(deadlineNs),
-      () => maybeSendControllerRegistration())
-  }
-}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 7b9291ce165..455b8cc8901 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -47,6 +47,7 @@ import 
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLAS
 import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.config.ConfigType
 import org.apache.kafka.server.config.DelegationTokenManagerConfigs
+import org.apache.kafka.server.controller.ControllerRegistrationManager
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, 
LinuxIoMetricsCollector}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -308,7 +309,6 @@ class ControllerServer(
 
       // Create the registration manager, which handles sending KIP-919 
controller registrations.
       registrationManager = new ControllerRegistrationManager(config.nodeId,
-        clusterId,
         time,
         s"controller-${config.nodeId}-",
         
QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled),
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
index 299123f6489..c643e4738fc 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.{KRaftConfigs, LeaderAndEpoch, QuorumConfig}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.config.ServerLogConfigs
+import org.apache.kafka.server.controller.ControllerRegistrationManager
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.{Test, Timeout}
@@ -71,7 +72,6 @@ class ControllerRegistrationManagerTest {
     context: RegistrationTestContext,
   ): ControllerRegistrationManager = {
     new ControllerRegistrationManager(context.config.nodeId,
-      context.clusterId,
       context.time,
       "controller-registration-manager-test-",
       createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
diff --git 
a/server/src/main/java/org/apache/kafka/server/controller/ControllerRegistrationManager.java
 
b/server/src/main/java/org/apache/kafka/server/controller/ControllerRegistrationManager.java
new file mode 100644
index 00000000000..6fbc5dda059
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/controller/ControllerRegistrationManager.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.controller;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ControllerRegistrationRequest;
+import org.apache.kafka.common.requests.ControllerRegistrationResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.metadata.ControllerRegistration;
+import org.apache.kafka.metadata.ListenerInfo;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The controller registration manager handles registering this controller 
with the controller
+ * quorum. This support was added by KIP-919, and requires a metadata version 
of 3.7 or higher.
+ *
+ * This code uses an event queue paradigm. Modifications get translated into 
events, which
+ * are placed on the queue to be processed sequentially. As described in the 
JavaDoc for
+ * each variable, most mutable state can be accessed only from that event 
queue thread.
+ */
+public class ControllerRegistrationManager implements MetadataPublisher {
+    
+    private final Logger logger;
+    private final int nodeId;
+    private final Time time;
+    private final Map<String, VersionRange> supportedFeatures;
+    private final Uuid incarnationId;
+    private final ListenerInfo listenerInfo;
+    private final ExponentialBackoff resendExponentialBackoff;
+
+    /**
+     * The event queue.
+     */
+    private final KafkaEventQueue eventQueue;
+
+    /**
+     * True if there is a pending RPC. Only read or written from the event 
queue thread.
+     */
+    private boolean pendingRpc = false;
+
+    /**
+     * The number of RPCs that we successfully sent.
+     * Only read or written from the event queue thread.
+     */
+    private long successfulRpcs = 0L;
+
+    /**
+     * The number of RPCs that we failed to send, or got back a failure 
response for. This is
+     * cleared after a success. Only read or written from the event queue 
thread.
+     */
+    private long failedRpcs = 0L;
+
+    /**
+     * The current metadata version that is in effect. Only read or written 
from the event queue thread.
+     */
+    private Optional<MetadataVersion> metadataVersion = Optional.empty();
+
+    /**
+     * True if we're registered. Only read or written from the event queue 
thread.
+     */
+    private boolean registeredInLog = false;
+
+    /**
+     * The channel manager, or null if this manager has not been started yet. 
This variable
+     * can only be read or written from the event queue thread.
+     */
+    private NodeToControllerChannelManager channelManager;
+
+    public ControllerRegistrationManager(
+            int nodeId,
+            Time time,
+            String threadNamePrefix,
+            Map<String, VersionRange> supportedFeatures,
+            Uuid incarnationId,
+            ListenerInfo listenerInfo) {
+        this(nodeId, time, threadNamePrefix, supportedFeatures, incarnationId, 
listenerInfo, new ExponentialBackoff(100, 2, 120000L, 0.02));
+    }
+
+    public ControllerRegistrationManager(
+            int nodeId,
+            Time time,
+            String threadNamePrefix,
+            Map<String, VersionRange> supportedFeatures,
+            Uuid incarnationId,
+            ListenerInfo listenerInfo,
+            ExponentialBackoff resendExponentialBackoff
+    )  {
+        this.nodeId = nodeId;
+        this.time = time;
+        this.supportedFeatures = supportedFeatures;
+        this.incarnationId = incarnationId;
+        this.listenerInfo = listenerInfo;
+        this.resendExponentialBackoff = resendExponentialBackoff;
+        LogContext logContext = new 
LogContext("[ControllerRegistrationManager" +
+                " id=" + this.nodeId +
+                " incarnation=" + this.incarnationId +
+                "] ");
+        this.logger = logContext.logger(ControllerRegistrationManager.class);
+        this.eventQueue = new KafkaEventQueue(time,
+                logContext,
+                threadNamePrefix + "registration-manager-",
+                new ShutdownEvent());
+    }
+    
+    @Override 
+    public String name() {
+        return "ControllerRegistrationManager";
+    }
+
+    private class ShutdownEvent implements EventQueue.Event {
+        
+        @Override 
+        public void run() {
+            try {
+                logger.info("shutting down.");
+                if (channelManager != null) {
+                    channelManager.shutdown();
+                    channelManager = null;
+                }
+            } catch (Throwable t) {
+                logger.error("ControllerRegistrationManager.stop error", t);
+            }
+        }
+    }
+
+    /**
+     * Start the ControllerRegistrationManager.
+     *
+     * @param channelManager The channel manager to use.
+     */
+    public void start(NodeToControllerChannelManager channelManager) {
+        eventQueue.append(() -> {
+            try {
+                logger.info("initialized channel manager.");
+                this.channelManager = channelManager;
+                maybeSendControllerRegistration();
+            } catch (Throwable t) {
+                logger.error("start error", t);
+            }
+        });
+    }
+
+    /**
+     * Start shutting down the ControllerRegistrationManager, but do not block.
+     */
+    void beginShutdown() {
+        eventQueue.beginShutdown("beginShutdown");
+    }
+
+    /**
+     * Shut down the ControllerRegistrationManager and block until all threads 
are joined.
+     */
+    @Override 
+    public void close() throws Exception {
+        beginShutdown();
+        eventQueue.close();
+    }
+
+    @Override 
+    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, 
LoaderManifest manifest) {
+        if (delta.featuresDelta() != null ||
+                (delta.clusterDelta() != null && 
delta.clusterDelta().changedControllers().containsKey(nodeId))) {
+            eventQueue.append(new MetadataUpdateEvent(delta, newImage));
+        }
+    }
+
+    private class MetadataUpdateEvent implements EventQueue.Event {
+
+        private final MetadataDelta delta;
+        private final MetadataImage newImage;
+
+        MetadataUpdateEvent(MetadataDelta delta, MetadataImage newImage) {
+            this.delta = delta;
+            this.newImage = newImage;
+        }
+            
+        @Override 
+        public void run() {
+            try {
+                if (delta.featuresDelta() != null) {
+                    metadataVersion = newImage.features().metadataVersion();
+                }
+                if (delta.clusterDelta() != null) {
+                    if 
(delta.clusterDelta().changedControllers().containsKey(nodeId)) {
+                        ControllerRegistration curRegistration = 
newImage.cluster().controllers().get(nodeId);
+                        if (curRegistration == null) {
+                            logger.info("Registration removed for this node 
ID.");
+                            registeredInLog = false;
+                        } else if 
(!curRegistration.incarnationId().equals(incarnationId)) {
+                            logger.info("Found registration for {} instead of 
our incarnation.", curRegistration.incarnationId());
+                            registeredInLog = false;
+                        } else {
+                            logger.info("Our registration has been persisted 
to the metadata log.");
+                            registeredInLog = true;
+                        }
+                    }
+                }
+                maybeSendControllerRegistration();
+            } catch (Throwable t) {
+                logger.error("onMetadataUpdate error", t);
+            }
+        }
+    }
+
+    private void maybeSendControllerRegistration() {
+        Optional<MetadataVersion> metadataVersion = this.metadataVersion;
+        if (registeredInLog) {
+            logger.debug("maybeSendControllerRegistration: controller is 
already registered.");
+        } else if (channelManager == null) {
+            logger.debug("maybeSendControllerRegistration: cannot register yet 
because the channel manager has not been initialized.");
+        } else if (metadataVersion.isEmpty()) {
+            logger.info("maybeSendControllerRegistration: cannot register yet 
because the metadata.version is not known yet.");
+        } else if (!metadataVersion.get().isControllerRegistrationSupported()) 
{
+            logger.info("maybeSendControllerRegistration: cannot register yet 
because the metadata.version is " +
+                    "still {}, which does not support KIP-919 controller 
registration.", metadataVersion);
+        } else if (pendingRpc) {
+            logger.info("maybeSendControllerRegistration: waiting for the 
previous RPC to complete.");
+        } else {
+            sendControllerRegistration();
+        }
+    }
+
+    private void sendControllerRegistration() {
+        ControllerRegistrationRequestData.FeatureCollection features = new 
ControllerRegistrationRequestData.FeatureCollection();
+        supportedFeatures.forEach((name, range) -> features.add(new 
ControllerRegistrationRequestData.Feature().
+                setName(name).
+                setMinSupportedVersion(range.min()).
+                setMaxSupportedVersion(range.max())));
+        ControllerRegistrationRequestData data = new 
ControllerRegistrationRequestData().
+            setControllerId(nodeId).
+            setFeatures(features).
+            setIncarnationId(incarnationId).
+            setListeners(listenerInfo.toControllerRegistrationRequest()).
+            setZkMigrationReady(false);
+
+        logger.info("sendControllerRegistration: attempting to send {}", data);
+        channelManager.sendRequest(new 
ControllerRegistrationRequest.Builder(data),
+                new RegistrationResponseHandler());
+        pendingRpc = true;
+    }
+
+    private class RegistrationResponseHandler implements 
ControllerRequestCompletionHandler {
+        
+        @Override 
+        public void onComplete(ClientResponse response) {
+            eventQueue.append(new RequestCompleteEvent(response));
+        }
+
+        @Override
+        public void onTimeout() {
+            eventQueue.append(new RequestTimeoutEvent());
+        }
+    }
+
+    private class RequestCompleteEvent implements EventQueue.Event {
+
+        private final ClientResponse response;
+        
+        RequestCompleteEvent(ClientResponse response) {
+            this.response = response;
+        }
+        
+        @Override 
+        public void run() {
+            pendingRpc = false;
+            if (response.authenticationException() != null) {
+                logger.error("RegistrationResponseHandler: authentication 
error", response.authenticationException());
+                scheduleNextCommunicationAfterFailure();
+            } else if (response.versionMismatch() != null) {
+                logger.error("RegistrationResponseHandler: unsupported API 
version error", response.versionMismatch());
+                scheduleNextCommunicationAfterFailure();
+            } else if (response.responseBody() == null) {
+                logger.error("RegistrationResponseHandler: unknown error");
+                scheduleNextCommunicationAfterFailure();
+            } else if (!(response.responseBody() instanceof 
ControllerRegistrationResponse message)) {
+                logger.error("RegistrationResponseHandler: invalid response 
type error");
+                scheduleNextCommunicationAfterFailure();
+            } else {
+                Errors errorCode = Errors.forCode(message.data().errorCode());
+                if (errorCode == Errors.NONE) {
+                    successfulRpcs = successfulRpcs + 1;
+                    failedRpcs = 0;
+                    logger.info("RegistrationResponseHandler: controller 
acknowledged ControllerRegistrationRequest.");
+                } else {
+                    logger.info("RegistrationResponseHandler: controller 
returned error {} ({})", errorCode, message.data().errorMessage());
+                    scheduleNextCommunicationAfterFailure();
+                }
+            }
+        }
+    }
+
+    private class RequestTimeoutEvent implements EventQueue.Event {
+        
+        @Override 
+        public void run() {
+            pendingRpc = false;
+            logger.error("RegistrationResponseHandler: channel manager timed 
out before sending the request.");
+            scheduleNextCommunicationAfterFailure();
+        }
+    }
+
+    private void scheduleNextCommunicationAfterFailure() {
+        long delayMs = resendExponentialBackoff.backoff(failedRpcs);
+        failedRpcs = failedRpcs + 1;
+        scheduleNextCommunication(delayMs);
+    }
+
+    private void scheduleNextCommunication(long intervalMs) {
+        logger.trace("Scheduling next communication at {} ms from now.", 
intervalMs);
+        long deadlineNs = time.nanoseconds() + 
TimeUnit.MILLISECONDS.toNanos(intervalMs);
+        eventQueue.scheduleDeferred("communication",
+                new EventQueue.DeadlineFunction(deadlineNs),
+                this::maybeSendControllerRegistration);
+    }
+
+    // Only for testing
+    public KafkaEventQueue eventQueue() {
+        return eventQueue;
+    }
+
+    // Only for testing
+    public boolean registeredInLog() {
+        return registeredInLog;
+    }
+
+    // Only for testing
+    public boolean pendingRpc() {
+        return pendingRpc;
+    }
+
+    // Only for testing
+    public long successfulRpcs() {
+        return successfulRpcs;
+    }
+
+    // Only for testing
+    public long failedRpcs() {
+        return failedRpcs;
+    }
+
+}

Reply via email to