showuon commented on code in PR #15246:
URL: https://github.com/apache/kafka/pull/15246#discussion_r1465931069


##########
server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import javax.crypto.SecretKeyFactory;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.server.util.Csv;
+import org.junit.jupiter.api.Test;
+
+import java.security.GeneralSecurityException;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+class PasswordEncoderTest {
+
+    @Test
+    public void testEncodeDecode() throws GeneralSecurityException {
+        PasswordEncoder encoder = PasswordEncoder.encrypting(new 
Password("password-encoder-secret"),
+                Optional.empty(),
+                PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM,
+                PasswordEncoderConfigs.DEFAULT_KEY_LENGTH,
+                PasswordEncoderConfigs.DEFAULT_ITERATIONS);
+        String password = "test-password";
+        String encoded = encoder.encode(new Password(password));
+        Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
+        assertEquals("4096", encodedMap.get(PasswordEncoder.ITERATIONS));
+        assertEquals("128", encodedMap.get(PasswordEncoder.KEY_LENGTH));
+        String defaultKeyFactoryAlgorithm;
+        try {
+            SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512");
+            defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA512";
+
+        } catch (Exception e) {
+            defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA1";
+        }
+        assertEquals(defaultKeyFactoryAlgorithm, 
encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
+        assertEquals("AES/CBC/PKCS5Padding", 
encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
+        verifyEncodedPassword(encoder, password, encoded);
+    }
+
+    @Test
+    public void testEncoderConfigChange() throws GeneralSecurityException {
+        PasswordEncoder encoder = PasswordEncoder.encrypting(new 
Password("password-encoder-secret"),
+                Optional.of("PBKDF2WithHmacSHA1"),
+                "DES/CBC/PKCS5Padding",
+                64,
+                1024);
+        String password = "test-password";
+        String encoded = encoder.encode(new Password(password));
+        Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
+        assertEquals("1024", encodedMap.get(PasswordEncoder.ITERATIONS));
+        assertEquals("64", encodedMap.get(PasswordEncoder.KEY_LENGTH));
+        assertEquals("PBKDF2WithHmacSHA1", 
encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
+        assertEquals("DES/CBC/PKCS5Padding", 
encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
+
+        // Test that decoding works even if PasswordEncoder algorithm, 
iterations etc. are altered
+        PasswordEncoder decoder = PasswordEncoder.encrypting(new 
Password("password-encoder-secret"),
+                Optional.of("PBKDF2WithHmacSHA1"),
+                "AES/CBC/PKCS5Padding",
+                128,
+                2048);
+        assertEquals(password, decoder.decode(encoded).value());
+
+        // Test that decoding fails if secret is altered
+        PasswordEncoder decoder2 = PasswordEncoder.encrypting(new 
Password("secret-2"),
+                Optional.of("PBKDF2WithHmacSHA1"),
+                "AES/CBC/PKCS5Padding",
+                128,
+                1024);
+        try {
+            decoder2.decode(encoded);
+            fail("Expected ConfigException to be thrown");
+        } catch (ConfigException expected) {
+        }

Review Comment:
   Nit: Can be replaced with `assertThrows(ConfigException.class, () -> ...)`



##########
core/src/main/scala/kafka/utils/PasswordEncoder.scala:
##########
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.utils
-
-import java.nio.charset.StandardCharsets
-import java.security.{AlgorithmParameters, NoSuchAlgorithmException, 
SecureRandom}
-import java.security.spec.AlgorithmParameterSpec
-import java.util.Base64
-
-import javax.crypto.{Cipher, SecretKeyFactory}
-import javax.crypto.spec._
-import kafka.utils.PasswordEncoder._
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.config.types.Password
-
-import scala.collection.Map
-
-object PasswordEncoder {
-  val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm"
-  val CipherAlgorithmProp = "cipherAlgorithm"
-  val InitializationVectorProp = "initializationVector"
-  val KeyLengthProp = "keyLength"
-  val SaltProp = "salt"
-  val IterationsProp = "iterations"
-  val EncryptedPasswordProp = "encryptedPassword"
-  val PasswordLengthProp = "passwordLength"
-
-  def encrypting(secret: Password,
-                 keyFactoryAlgorithm: Option[String],
-                 cipherAlgorithm: String,
-                 keyLength: Int,
-                 iterations: Int): EncryptingPasswordEncoder = {
-    new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, 
cipherAlgorithm, keyLength, iterations)
-  }
-
-  def noop(): NoOpPasswordEncoder = {
-    new NoOpPasswordEncoder()
-  }
-}
-
-trait PasswordEncoder {
-  def encode(password: Password): String
-  def decode(encodedPassword: String): Password
-
-  private[utils] def base64Decode(encoded: String): Array[Byte] = 
Base64.getDecoder.decode(encoded)
-}
-
-/**
- * A password encoder that does not modify the given password. This is used in 
KRaft mode only.
- */

Review Comment:
   This class comment is missing.



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -211,19 +212,19 @@ object ConfigCommand extends Logging {
   }
 
   private[admin] def createPasswordEncoder(encoderConfigs: Map[String, 
String]): PasswordEncoder = {
-    encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
-    val encoderSecret = 
encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
+    encoderConfigs.get(PasswordEncoderConfigs.SECRET)
+    val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.SECRET,
       throw new IllegalArgumentException("Password encoder secret not 
specified"))
     PasswordEncoder.encrypting(new Password(encoderSecret),
-      None,
-      encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderCipherAlgorithmProp, 
Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM),
-      
encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_KEY_LENGTH),
-      
encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_ITERATIONS))
+      Optional.empty(),
+      encoderConfigs.getOrElse(PasswordEncoderConfigs.CIPHER_ALGORITHM, 
PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM),
+      
encoderConfigs.get(PasswordEncoderConfigs.KEY_LENGTH).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_KEY_LENGTH),
+      
encoderConfigs.get(PasswordEncoderConfigs.ITERATIONS).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_ITERATIONS))
   }
 
   /**
    * Pre-process broker configs provided to convert them to persistent format.
-   * Password configs are encrypted using the secret 
`KafkaConfig.PasswordEncoderSecretProp`.
+   * Password configs are encrypted using the secret 
`PasswordEncoderConfigs.SECRET_PROP`.

Review Comment:
   `PasswordEncoderConfigs.SECRET_PROP` -> `PasswordEncoderConfigs.SECRET` ?



##########
core/src/main/scala/kafka/utils/PasswordEncoder.scala:
##########
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.utils
-
-import java.nio.charset.StandardCharsets
-import java.security.{AlgorithmParameters, NoSuchAlgorithmException, 
SecureRandom}
-import java.security.spec.AlgorithmParameterSpec
-import java.util.Base64
-
-import javax.crypto.{Cipher, SecretKeyFactory}
-import javax.crypto.spec._
-import kafka.utils.PasswordEncoder._
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.config.types.Password
-
-import scala.collection.Map
-
-object PasswordEncoder {
-  val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm"
-  val CipherAlgorithmProp = "cipherAlgorithm"
-  val InitializationVectorProp = "initializationVector"
-  val KeyLengthProp = "keyLength"
-  val SaltProp = "salt"
-  val IterationsProp = "iterations"
-  val EncryptedPasswordProp = "encryptedPassword"
-  val PasswordLengthProp = "passwordLength"
-
-  def encrypting(secret: Password,
-                 keyFactoryAlgorithm: Option[String],
-                 cipherAlgorithm: String,
-                 keyLength: Int,
-                 iterations: Int): EncryptingPasswordEncoder = {
-    new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, 
cipherAlgorithm, keyLength, iterations)
-  }
-
-  def noop(): NoOpPasswordEncoder = {
-    new NoOpPasswordEncoder()
-  }
-}
-
-trait PasswordEncoder {
-  def encode(password: Password): String
-  def decode(encodedPassword: String): Password
-
-  private[utils] def base64Decode(encoded: String): Array[Byte] = 
Base64.getDecoder.decode(encoded)
-}
-
-/**
- * A password encoder that does not modify the given password. This is used in 
KRaft mode only.
- */
-class NoOpPasswordEncoder extends PasswordEncoder {
-  override def encode(password: Password): String = password.value()
-  override def decode(encodedPassword: String): Password = new 
Password(encodedPassword)
-}
-
-/**
-  * Password encoder and decoder implementation. Encoded passwords are 
persisted as a CSV map
-  * containing the encoded password in base64 and along with the properties 
used for encryption.
-  *
-  * @param secret The secret used for encoding and decoding
-  * @param keyFactoryAlgorithm  Key factory algorithm if configured. By 
default, PBKDF2WithHmacSHA512 is
-  *                             used if available, PBKDF2WithHmacSHA1 
otherwise.
-  * @param cipherAlgorithm Cipher algorithm used for encoding.
-  * @param keyLength Key length used for encoding. This should be valid for 
the specified algorithms.
-  * @param iterations Iteration count used for encoding.
-  *
-  * The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and 
`iterations` are used for encoding passwords.
-  * The values used for encoding are stored along with the encoded password 
and the stored values are used for decoding.
-  *
-  */

Review Comment:
   This is missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to