mimaison commented on code in PR #20468:
URL: https://github.com/apache/kafka/pull/20468#discussion_r2324936446
##########
core/src/main/scala/kafka/server/ControllerServer.scala:
##########
@@ -38,9 +38,9 @@ import
org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher,
ScramPublisher}
import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider
Review Comment:
Can we move this with the other org.apache.kafka.server.common import below?
##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -42,8 +42,8 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker,
MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
-import org.apache.kafka.metadata.publisher.AclPublisher
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
+import org.apache.kafka.server.common.CredentialProvider
Review Comment:
Can we move this with the other `org.apache.kafka.server.common` import
below?
##########
metadata/src/main/java/org/apache/kafka/metadata/publisher/ScramPublisher.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.server.common.CredentialProvider;
+import org.apache.kafka.server.fault.FaultHandler;
+
+import java.util.Optional;
+
+public class ScramPublisher implements MetadataPublisher {
+ private final int nodeId;
+ private final FaultHandler faultHandler;
+ private final String nodeType;
+ private final CredentialProvider credentialProvider;
+
+ public ScramPublisher(int nodeId, FaultHandler faultHandler, String
nodeType, CredentialProvider credentialProvider) {
+ this.nodeId = nodeId;
+ this.faultHandler = faultHandler;
+ this.nodeType = nodeType;
+ this.credentialProvider = credentialProvider;
+ }
+
+ @Override
+ public final String name() {
+ return "ScramPublisher " + nodeType + " id=" + nodeId;
+ }
+
+ @Override
+ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage,
LoaderManifest manifest) {
+ onMetadataUpdate(delta, newImage);
+ }
+
+ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
+ String deltaName = "MetadataDelta up to " +
newImage.highestOffsetAndEpoch().offset();
+ try {
+ // Apply changes to SCRAM credentials.
+ Optional.ofNullable(delta.scramDelta()).ifPresent(scramDelta -> {
Review Comment:
If wonder if doing:
```java
ScramDelta scramDelta = delta.scramDelta();
if (scramDelta != null) {
```
is just more readable. WDYT?
##########
server-common/src/main/java/org/apache/kafka/server/common/CredentialProvider.java:
##########
Review Comment:
I wonder if we should keep the package and just change the module.
##########
metadata/src/main/java/org/apache/kafka/metadata/publisher/ScramPublisher.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.server.common.CredentialProvider;
+import org.apache.kafka.server.fault.FaultHandler;
+
+import java.util.Optional;
+
+public class ScramPublisher implements MetadataPublisher {
+ private final int nodeId;
+ private final FaultHandler faultHandler;
+ private final String nodeType;
+ private final CredentialProvider credentialProvider;
+
+ public ScramPublisher(int nodeId, FaultHandler faultHandler, String
nodeType, CredentialProvider credentialProvider) {
+ this.nodeId = nodeId;
+ this.faultHandler = faultHandler;
+ this.nodeType = nodeType;
+ this.credentialProvider = credentialProvider;
+ }
+
+ @Override
+ public final String name() {
+ return "ScramPublisher " + nodeType + " id=" + nodeId;
+ }
+
+ @Override
+ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage,
LoaderManifest manifest) {
+ onMetadataUpdate(delta, newImage);
+ }
+
+ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
+ String deltaName = "MetadataDelta up to " +
newImage.highestOffsetAndEpoch().offset();
+ try {
+ // Apply changes to SCRAM credentials.
+ Optional.ofNullable(delta.scramDelta()).ifPresent(scramDelta -> {
+ scramDelta.changes().forEach((mechanism, userChanges) -> {
+ userChanges.forEach((userName, change) -> {
+ if (change.isPresent()) {
+ credentialProvider.updateCredential(mechanism,
userName, change.get().toCredential());
+ } else {
+ credentialProvider.removeCredentials(mechanism,
userName);
+ }
+ });
+ });
+ });
+ } catch (Throwable t) {
+ faultHandler.handleFault("Uncaught exception while publishing
SCRAM changes from " + deltaName, t);
+ }
+ }
+}
Review Comment:
Can we add a newline at the end of the file?
##########
core/src/main/scala/kafka/tools/TestRaftServer.scala:
##########
@@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch,
QuorumConfig, RaftClient}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider
Review Comment:
I'll stop noting it from here, the same applies to all files/imports
##########
core/src/main/scala/kafka/tools/TestRaftServer.scala:
##########
@@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch,
QuorumConfig, RaftClient}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider
Review Comment:
Can we move this with the other org.apache.kafka.server.common import below?
##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -44,7 +44,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName,
Reconfigurable}
import org.apache.kafka.network.{ConnectionQuotaEntity,
ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider
Review Comment:
nit: We try to order import alphabetically, can we move this 1 line down?
##########
core/src/main/scala/kafka/server/KafkaBroker.scala:
##########
@@ -28,7 +28,7 @@ import
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.{BrokerState, MetadataCache}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.common.CredentialProvider
Review Comment:
Can we move this with the other org.apache.kafka.server.common import below?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]