gaurav-narula commented on code in PR #15945:
URL: https://github.com/apache/kafka/pull/15945#discussion_r1600362319


##########
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+    private final Logger log;
+    private final int id;
+    private final Runnable refreshRegistrationCallback;
+
+    /**
+     * Create the tracker.
+     *
+     * @param id                            The ID of this broker.
+     * @param targetDirectories             The directories managed by this 
broker.
+     * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+     */
+    public BrokerRegistrationTracker(
+        int id,
+        List<Uuid> targetDirectories,
+        Runnable refreshRegistrationCallback
+    ) {
+        this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+            logger(BrokerRegistrationTracker.class);
+        this.id = id;
+        this.refreshRegistrationCallback = refreshRegistrationCallback;
+    }
+
+    @Override
+    public String name() {
+        return "BrokerRegistrationTracker(id=" + id + ")";
+    }
+
+    @Override
+    public void onMetadataUpdate(
+        MetadataDelta delta,
+        MetadataImage newImage,
+        LoaderManifest manifest
+    ) {
+        boolean checkBrokerRegistration = false;
+        if (delta.featuresDelta() != null) {
+            if (delta.metadataVersionChanged().isPresent()) {

Review Comment:
   Is this only for the logging? Seems like we unconditionally pass 
`newImage.features().metadataVersion()`  in `brokerRegistrationNeedsRefresh` 
down below and it only gates on `registration == null` 🤔 
   
   I tried commenting lines 78-86 out and the tests still pass which makes me 
wonder if `checkBrokerRegistration` can be simplified or if there's a test 
we're missing.



##########
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+    private final Logger log;
+    private final int id;
+    private final Runnable refreshRegistrationCallback;
+
+    /**
+     * Create the tracker.
+     *
+     * @param id                            The ID of this broker.
+     * @param targetDirectories             The directories managed by this 
broker.
+     * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+     */
+    public BrokerRegistrationTracker(
+        int id,
+        List<Uuid> targetDirectories,
+        Runnable refreshRegistrationCallback
+    ) {
+        this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+            logger(BrokerRegistrationTracker.class);
+        this.id = id;
+        this.refreshRegistrationCallback = refreshRegistrationCallback;
+    }
+
+    @Override
+    public String name() {
+        return "BrokerRegistrationTracker(id=" + id + ")";
+    }
+
+    @Override
+    public void onMetadataUpdate(
+        MetadataDelta delta,
+        MetadataImage newImage,
+        LoaderManifest manifest
+    ) {
+        boolean checkBrokerRegistration = false;
+        if (delta.featuresDelta() != null) {
+            if (delta.metadataVersionChanged().isPresent()) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Metadata version change is present: {}",
+                        delta.metadataVersionChanged());
+                }
+                checkBrokerRegistration = true;
+            }
+        }
+        if (delta.clusterDelta() != null) {
+            if (delta.clusterDelta().changedBrokers().get(id) != null) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Broker change is present: {}",
+                        delta.clusterDelta().changedBrokers().get(id));
+                }
+                checkBrokerRegistration = true;
+            }
+        }
+        if (checkBrokerRegistration) {
+            if 
(brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(),
+                    delta.clusterDelta().broker(id))) {
+                refreshRegistrationCallback.run();
+            }
+        }
+    }
+
+    /**
+     * Check if the current broker registration needs to be refreshed.
+     *
+     * @param registration  The current broker registration, or null if there 
is none.

Review Comment:
   I think a doc for `@param metadataVersion` would be useful as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to