mimaison commented on code in PR #20468:
URL: https://github.com/apache/kafka/pull/20468#discussion_r2324936446


##########
core/src/main/scala/kafka/server/ControllerServer.scala:
##########
@@ -38,9 +38,9 @@ import 
org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
 import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, 
ScramPublisher}
 import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider

Review Comment:
   Can we move this with the other org.apache.kafka.server.common import below?



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -42,8 +42,8 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRec
 import org.apache.kafka.coordinator.transaction.ProducerIdManager
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, 
MetadataPublisher}
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
-import org.apache.kafka.metadata.publisher.AclPublisher
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
+import org.apache.kafka.server.common.CredentialProvider

Review Comment:
   Can we move this with the other `org.apache.kafka.server.common` import 
below?



##########
metadata/src/main/java/org/apache/kafka/metadata/publisher/ScramPublisher.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.server.common.CredentialProvider;
+import org.apache.kafka.server.fault.FaultHandler;
+
+import java.util.Optional;
+
+public class ScramPublisher implements MetadataPublisher {
+    private final int nodeId;
+    private final FaultHandler faultHandler;
+    private final String nodeType;
+    private final CredentialProvider credentialProvider;
+
+    public ScramPublisher(int nodeId, FaultHandler faultHandler, String 
nodeType, CredentialProvider credentialProvider) {
+        this.nodeId = nodeId;
+        this.faultHandler = faultHandler;
+        this.nodeType = nodeType;
+        this.credentialProvider = credentialProvider;
+    }
+
+    @Override
+    public final String name() {
+        return "ScramPublisher " + nodeType + " id=" + nodeId;
+    }
+
+    @Override
+    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, 
LoaderManifest manifest) {
+        onMetadataUpdate(delta, newImage);
+    }
+
+    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
+        String deltaName = "MetadataDelta up to " + 
newImage.highestOffsetAndEpoch().offset();
+        try {
+            // Apply changes to SCRAM credentials.
+            Optional.ofNullable(delta.scramDelta()).ifPresent(scramDelta -> {

Review Comment:
   If wonder if doing:
   ```java
   ScramDelta scramDelta = delta.scramDelta();
   if (scramDelta != null) {
   ```
   is just more readable. WDYT?



##########
server-common/src/main/java/org/apache/kafka/server/common/CredentialProvider.java:
##########


Review Comment:
   I wonder if we should keep the package and just change the module.



##########
metadata/src/main/java/org/apache/kafka/metadata/publisher/ScramPublisher.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.server.common.CredentialProvider;
+import org.apache.kafka.server.fault.FaultHandler;
+
+import java.util.Optional;
+
+public class ScramPublisher implements MetadataPublisher {
+    private final int nodeId;
+    private final FaultHandler faultHandler;
+    private final String nodeType;
+    private final CredentialProvider credentialProvider;
+
+    public ScramPublisher(int nodeId, FaultHandler faultHandler, String 
nodeType, CredentialProvider credentialProvider) {
+        this.nodeId = nodeId;
+        this.faultHandler = faultHandler;
+        this.nodeType = nodeType;
+        this.credentialProvider = credentialProvider;
+    }
+
+    @Override
+    public final String name() {
+        return "ScramPublisher " + nodeType + " id=" + nodeId;
+    }
+
+    @Override
+    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, 
LoaderManifest manifest) {
+        onMetadataUpdate(delta, newImage);
+    }
+
+    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
+        String deltaName = "MetadataDelta up to " + 
newImage.highestOffsetAndEpoch().offset();
+        try {
+            // Apply changes to SCRAM credentials.
+            Optional.ofNullable(delta.scramDelta()).ifPresent(scramDelta -> {
+                scramDelta.changes().forEach((mechanism, userChanges) -> {
+                    userChanges.forEach((userName, change) -> {
+                        if (change.isPresent()) {
+                            credentialProvider.updateCredential(mechanism, 
userName, change.get().toCredential());
+                        } else {
+                            credentialProvider.removeCredentials(mechanism, 
userName);
+                        }
+                    });
+                });
+            });
+        } catch (Throwable t) {
+            faultHandler.handleFault("Uncaught exception while publishing 
SCRAM changes from " + deltaName, t);
+        }
+    }
+}

Review Comment:
   Can we add a newline at the end of the file?



##########
core/src/main/scala/kafka/tools/TestRaftServer.scala:
##########
@@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{Exit, Time, Utils}
 import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
 import org.apache.kafka.raft.errors.NotLeaderException
 import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, 
QuorumConfig, RaftClient}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider

Review Comment:
   I'll stop noting it from here, the same applies to all files/imports



##########
core/src/main/scala/kafka/tools/TestRaftServer.scala:
##########
@@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{Exit, Time, Utils}
 import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
 import org.apache.kafka.raft.errors.NotLeaderException
 import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, 
QuorumConfig, RaftClient}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider

Review Comment:
   Can we move this with the other org.apache.kafka.server.common import below?



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -44,7 +44,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
 import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, 
Reconfigurable}
 import org.apache.kafka.network.{ConnectionQuotaEntity, 
ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider

Review Comment:
   nit: We try to order import alphabetically, can we move this 1 line down?



##########
core/src/main/scala/kafka/server/KafkaBroker.scala:
##########
@@ -28,7 +28,7 @@ import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.metadata.{BrokerState, MetadataCache}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider

Review Comment:
   Can we move this with the other org.apache.kafka.server.common import below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to