OmniaGM commented on code in PR #16579: URL: https://github.com/apache/kafka/pull/16579#discussion_r1691320313
########## core/src/test/java/kafka/security/JaasTestUtils.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security; + +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Java; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.common.config.SaslConfigs.GSSAPI_MECHANISM; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; +import static org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM; + +public class JaasTestUtils { + public static class JaasSection { + private final String contextName; + private final List<JaasModule> modules; + + public JaasSection(String contextName, List<JaasModule> modules) { + this.contextName = contextName; + this.modules = modules; + } + + public List<JaasModule> getModules() { + return modules; + } + + public String getContextName() { + return contextName; + } + + @Override + public String toString() { + return String.format("%s {\n %s\n};\n", + contextName, + String.join("\n ", modules.stream().map(Object::toString).toArray(String[]::new))); + } + } + + private static final boolean IS_IBM_SECURITY = Java.isIbmJdk() && !Java.isIbmJdkSemeru(); + + private static final String ZK_SERVER_CONTEXT_NAME = "Server"; + private static final String ZK_CLIENT_CONTEXT_NAME = "Client"; + private static final String ZK_USER_SUPER_PASSWD = "adminpasswd"; + private static final String ZK_USER = "fpj"; + private static final String ZK_USER_PASSWORD = "fpjsecret"; + + public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer"; + public static final String KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME = "kafka"; + private static final String KAFKA_SERVER_PRINCIPAL = KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME + "/[email protected]"; + public static final String KAFKA_CLIENT_CONTEXT_NAME = "KafkaClient"; + public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME = "client"; + private static final String KAFKA_CLIENT_PRINCIPAL = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME + "@EXAMPLE.COM"; + public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 = "client2"; + private static final String KAFKA_CLIENT_PRINCIPAL_2 = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 + "@EXAMPLE.COM"; + + public static final String KAFKA_PLAIN_USER = "plain-user"; + private static final String KAFKA_PLAIN_PASSWORD = "plain-user-secret"; + public static final String KAFKA_PLAIN_USER_2 = "plain-user2"; + public static final String KAFKA_PLAIN_PASSWORD_2 = "plain-user2-secret"; + public static final String KAFKA_PLAIN_ADMIN = "plain-admin"; + private static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret"; + + public static final String KAFKA_SCRAM_USER = "scram-user"; + public static final String KAFKA_SCRAM_PASSWORD = "scram-user-secret"; + public static final String KAFKA_SCRAM_USER_2 = "scram-user2"; + public static final String KAFKA_SCRAM_PASSWORD_2 = "scram-user2-secret"; + public static final String KAFKA_SCRAM_ADMIN = "scram-admin"; + public static final String KAFKA_SCRAM_ADMIN_PASSWORD = "scram-admin-secret"; + + public static final String KAFKA_OAUTH_BEARER_USER = "oauthbearer-user"; + public static final String KAFKA_OAUTH_BEARER_USER_2 = "oauthbearer-user2"; + public static final String KAFKA_OAUTH_BEARER_ADMIN = "oauthbearer-admin"; + + public static final String SERVICE_NAME = "kafka"; + + public static Properties saslConfigs(Optional<Properties> saslProperties) { + Properties result = saslProperties.orElse(new Properties()); + if (IS_IBM_SECURITY && !result.containsKey(SaslConfigs.SASL_KERBEROS_SERVICE_NAME)) { + result.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, SERVICE_NAME); + } + return result; + } + + public static File writeJaasContextsToFile(List<JaasSection> jaasSections) throws IOException { + File jaasFile = TestUtils.tempFile(); + writeToFile(jaasFile, jaasSections); + return jaasFile; + } + + public static String scramClientLoginModule(String mechanism, String scramUser, String scramPassword) { + if (ScramMechanism.fromMechanismName(mechanism) == ScramMechanism.UNKNOWN) { + throw new IllegalArgumentException("Unsupported SCRAM mechanism " + mechanism); + } + return JaasModule.scramLoginModule(scramUser, scramPassword, false, new HashMap<>()).toString(); + } + + public static String clientLoginModule(String mechanism, Optional<File> keytabLocation, String serviceName) { + return kafkaClientModule(mechanism, keytabLocation, KAFKA_CLIENT_PRINCIPAL, KAFKA_PLAIN_USER, KAFKA_PLAIN_PASSWORD, KAFKA_SCRAM_USER, KAFKA_SCRAM_PASSWORD, KAFKA_OAUTH_BEARER_USER, serviceName).toString(); Review Comment: This line is very long can you please break it into multiple lines ########## core/src/test/java/kafka/security/JaasModule.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class JaasModule { + public static JaasModule zkDigestModule(boolean debug, Map<String, String> entries) { + String name = "org.apache.zookeeper.server.auth.DigestLoginModule"; + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + public static JaasModule krb5LoginModule(boolean useKeyTab, boolean storeKey, String keyTab, String principal, boolean debug, Optional<String> serviceName, boolean isIbmSecurity) { + String name = isIbmSecurity ? "com.ibm.security.auth.module.Krb5LoginModule" : "com.sun.security.auth.module.Krb5LoginModule"; + + Map<String, String> entries = new HashMap<>(); + if (isIbmSecurity) { + entries.put("principal", principal); + entries.put("credsType", "both"); + if (useKeyTab) { + entries.put("useKeytab", "file:" + keyTab); + } + } else { + entries.put("useKeyTab", Boolean.toString(useKeyTab)); + entries.put("storeKey", Boolean.toString(storeKey)); + entries.put("keyTab", keyTab); + entries.put("principal", principal); + serviceName.ifPresent(s -> entries.put("serviceName", s)); + } + + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + public static JaasModule oAuthBearerLoginModule(String username, boolean debug) { + String name = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule"; + + Map<String, String> entries = new HashMap<>(); + entries.put("unsecuredLoginStringClaim_sub", username); + + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + public static JaasModule plainLoginModule(String username, String password) { + return plainLoginModule(username, password, false, Collections.emptyMap()); + } + + public static JaasModule plainLoginModule(String username, String password, boolean debug, Map<String, String> validUsers) { + String name = "org.apache.kafka.common.security.plain.PlainLoginModule"; + + Map<String, String> entries = new HashMap<>(); + entries.put("username", username); + entries.put("password", password); + validUsers.forEach((user, pass) -> entries.put("user_" + user, pass)); + + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + public static JaasModule scramLoginModule(String username, String password) { + return scramLoginModule(username, password, false, Collections.emptyMap()); + } + + public static JaasModule scramLoginModule(String username, String password, boolean debug, Map<String, String> tokenProps) { + String name = "org.apache.kafka.common.security.scram.ScramLoginModule"; + + Map<String, String> entries = new HashMap<>(); + entries.put("username", username); + entries.put("password", password); + entries.putAll(tokenProps); + + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + private final String name; + + private final boolean debug; + + private final Map<String, String> entries; + + private final String toString; + + private JaasModule(String name, boolean debug, Map<String, String> entries, String toString) { + this.name = name; + this.debug = debug; + this.entries = entries; + this.toString = toString; + } + + public String name() { + return name; + } + + public boolean debug() { + return debug; + } + + Map<String, String> entries() { Review Comment: Is this method really used? ########## core/src/test/java/kafka/security/JaasTestUtils.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security; + +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Java; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.common.config.SaslConfigs.GSSAPI_MECHANISM; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; +import static org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM; + +public class JaasTestUtils { + public static class JaasSection { + private final String contextName; + private final List<JaasModule> modules; + + public JaasSection(String contextName, List<JaasModule> modules) { + this.contextName = contextName; + this.modules = modules; + } + + public List<JaasModule> getModules() { + return modules; + } + + public String getContextName() { + return contextName; + } + + @Override + public String toString() { + return String.format("%s {\n %s\n};\n", + contextName, + String.join("\n ", modules.stream().map(Object::toString).toArray(String[]::new))); + } + } + + private static final boolean IS_IBM_SECURITY = Java.isIbmJdk() && !Java.isIbmJdkSemeru(); + + private static final String ZK_SERVER_CONTEXT_NAME = "Server"; + private static final String ZK_CLIENT_CONTEXT_NAME = "Client"; + private static final String ZK_USER_SUPER_PASSWD = "adminpasswd"; + private static final String ZK_USER = "fpj"; + private static final String ZK_USER_PASSWORD = "fpjsecret"; + + public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer"; + public static final String KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME = "kafka"; + private static final String KAFKA_SERVER_PRINCIPAL = KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME + "/[email protected]"; + public static final String KAFKA_CLIENT_CONTEXT_NAME = "KafkaClient"; + public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME = "client"; + private static final String KAFKA_CLIENT_PRINCIPAL = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME + "@EXAMPLE.COM"; + public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 = "client2"; + private static final String KAFKA_CLIENT_PRINCIPAL_2 = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 + "@EXAMPLE.COM"; + + public static final String KAFKA_PLAIN_USER = "plain-user"; + private static final String KAFKA_PLAIN_PASSWORD = "plain-user-secret"; + public static final String KAFKA_PLAIN_USER_2 = "plain-user2"; + public static final String KAFKA_PLAIN_PASSWORD_2 = "plain-user2-secret"; + public static final String KAFKA_PLAIN_ADMIN = "plain-admin"; + private static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret"; + + public static final String KAFKA_SCRAM_USER = "scram-user"; + public static final String KAFKA_SCRAM_PASSWORD = "scram-user-secret"; + public static final String KAFKA_SCRAM_USER_2 = "scram-user2"; + public static final String KAFKA_SCRAM_PASSWORD_2 = "scram-user2-secret"; + public static final String KAFKA_SCRAM_ADMIN = "scram-admin"; + public static final String KAFKA_SCRAM_ADMIN_PASSWORD = "scram-admin-secret"; + + public static final String KAFKA_OAUTH_BEARER_USER = "oauthbearer-user"; + public static final String KAFKA_OAUTH_BEARER_USER_2 = "oauthbearer-user2"; + public static final String KAFKA_OAUTH_BEARER_ADMIN = "oauthbearer-admin"; + + public static final String SERVICE_NAME = "kafka"; + + public static Properties saslConfigs(Optional<Properties> saslProperties) { + Properties result = saslProperties.orElse(new Properties()); + if (IS_IBM_SECURITY && !result.containsKey(SaslConfigs.SASL_KERBEROS_SERVICE_NAME)) { + result.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, SERVICE_NAME); + } + return result; + } + + public static File writeJaasContextsToFile(List<JaasSection> jaasSections) throws IOException { + File jaasFile = TestUtils.tempFile(); + writeToFile(jaasFile, jaasSections); + return jaasFile; + } + + public static String scramClientLoginModule(String mechanism, String scramUser, String scramPassword) { + if (ScramMechanism.fromMechanismName(mechanism) == ScramMechanism.UNKNOWN) { + throw new IllegalArgumentException("Unsupported SCRAM mechanism " + mechanism); + } + return JaasModule.scramLoginModule(scramUser, scramPassword, false, new HashMap<>()).toString(); + } + + public static String clientLoginModule(String mechanism, Optional<File> keytabLocation, String serviceName) { + return kafkaClientModule(mechanism, keytabLocation, KAFKA_CLIENT_PRINCIPAL, KAFKA_PLAIN_USER, KAFKA_PLAIN_PASSWORD, KAFKA_SCRAM_USER, KAFKA_SCRAM_PASSWORD, KAFKA_OAUTH_BEARER_USER, serviceName).toString(); + } + + public static String clientLoginModule(String mechanism, Optional<File> keytabLocation) { + return clientLoginModule(mechanism, keytabLocation, SERVICE_NAME); + } + + public static String adminLoginModule(String mechanism, Optional<File> keytabLocation, String serviceName) { + return kafkaClientModule(mechanism, keytabLocation, KAFKA_SERVER_PRINCIPAL, KAFKA_PLAIN_ADMIN, KAFKA_PLAIN_ADMIN_PASSWORD, KAFKA_SCRAM_ADMIN, KAFKA_SCRAM_ADMIN_PASSWORD, KAFKA_OAUTH_BEARER_ADMIN, serviceName).toString(); + } + + public static String adminLoginModule(String mechanism, Optional<File> keytabLocation) { + return adminLoginModule(mechanism, keytabLocation, SERVICE_NAME); + } + + public static String tokenClientLoginModule(String tokenId, String password) { + Map<String, String> tokenProps = new HashMap<>(); + tokenProps.put("tokenauth", "true"); + return JaasModule.scramLoginModule(tokenId, password, false, tokenProps).toString(); + } + + public static List<JaasSection> zkSections() { + Map<String, String> zkServerEntries = new HashMap<>(); + zkServerEntries.put("user_super", ZK_USER_SUPER_PASSWD); + zkServerEntries.put("user_" + ZK_USER, ZK_USER_PASSWORD); + JaasSection zkServerSection = new JaasSection(ZK_SERVER_CONTEXT_NAME, Collections.singletonList(JaasModule.zkDigestModule(false, zkServerEntries))); + + Map<String, String> zkClientEntries = new HashMap<>(); + zkClientEntries.put("username", ZK_USER); + zkClientEntries.put("password", ZK_USER_PASSWORD); + JaasSection zkClientSection = new JaasSection(ZK_CLIENT_CONTEXT_NAME, Collections.singletonList(JaasModule.zkDigestModule(false, zkClientEntries))); + + return Arrays.asList(zkServerSection, zkClientSection); + } + + public static JaasSection kafkaServerSection(String contextName, List<String> mechanisms, Optional<File> keytabLocation) { + List<JaasModule> modules = new ArrayList<>(); + for (String mechanism : mechanisms) { + switch (mechanism) { + case GSSAPI_MECHANISM: + modules.add(JaasModule.krb5LoginModule( + true, + true, + keytabLocation.orElseThrow(() -> new IllegalArgumentException("Keytab location not specified for GSSAPI")).getAbsolutePath(), + KAFKA_SERVER_PRINCIPAL, + true, + Optional.of(SERVICE_NAME), + IS_IBM_SECURITY + )); + break; + case PLAIN_MECHANISM: + Map<String, String> validUsers = new HashMap<>(); + validUsers.put(KAFKA_PLAIN_ADMIN, KAFKA_PLAIN_ADMIN_PASSWORD); + validUsers.put(KAFKA_PLAIN_USER, KAFKA_PLAIN_PASSWORD); + validUsers.put(KAFKA_PLAIN_USER_2, KAFKA_PLAIN_PASSWORD_2); + modules.add(JaasModule.plainLoginModule(KAFKA_PLAIN_ADMIN, KAFKA_PLAIN_ADMIN_PASSWORD, false, validUsers)); + break; + case OAUTHBEARER_MECHANISM: + modules.add(JaasModule.oAuthBearerLoginModule(KAFKA_OAUTH_BEARER_ADMIN, false)); + break; + default: + if (ScramMechanism.fromMechanismName(mechanism) != ScramMechanism.UNKNOWN) { + modules.add(JaasModule.scramLoginModule(KAFKA_SCRAM_ADMIN, KAFKA_SCRAM_ADMIN_PASSWORD, false, new HashMap<>())); + } else { + throw new IllegalArgumentException("Unsupported server mechanism " + mechanism); + } + break; + } + } + return new JaasSection(contextName, modules); + } + + private static JaasModule kafkaClientModule(String mechanism, Optional<File> keytabLocation, String clientPrincipal, String plainUser, String plainPassword, String scramUser, String scramPassword, String oauthBearerUser, String serviceName) { Review Comment: Can you also break the list of these attributes to multiple lines ########## core/src/test/java/kafka/security/JaasModule.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class JaasModule { + public static JaasModule zkDigestModule(boolean debug, Map<String, String> entries) { + String name = "org.apache.zookeeper.server.auth.DigestLoginModule"; + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + public static JaasModule krb5LoginModule(boolean useKeyTab, boolean storeKey, String keyTab, String principal, boolean debug, Optional<String> serviceName, boolean isIbmSecurity) { + String name = isIbmSecurity ? "com.ibm.security.auth.module.Krb5LoginModule" : "com.sun.security.auth.module.Krb5LoginModule"; + + Map<String, String> entries = new HashMap<>(); + if (isIbmSecurity) { + entries.put("principal", principal); + entries.put("credsType", "both"); + if (useKeyTab) { + entries.put("useKeytab", "file:" + keyTab); + } + } else { + entries.put("useKeyTab", Boolean.toString(useKeyTab)); + entries.put("storeKey", Boolean.toString(storeKey)); + entries.put("keyTab", keyTab); + entries.put("principal", principal); + serviceName.ifPresent(s -> entries.put("serviceName", s)); + } + + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + public static JaasModule oAuthBearerLoginModule(String username, boolean debug) { + String name = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule"; + + Map<String, String> entries = new HashMap<>(); + entries.put("unsecuredLoginStringClaim_sub", username); + + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + public static JaasModule plainLoginModule(String username, String password) { + return plainLoginModule(username, password, false, Collections.emptyMap()); + } + + public static JaasModule plainLoginModule(String username, String password, boolean debug, Map<String, String> validUsers) { + String name = "org.apache.kafka.common.security.plain.PlainLoginModule"; + + Map<String, String> entries = new HashMap<>(); + entries.put("username", username); + entries.put("password", password); + validUsers.forEach((user, pass) -> entries.put("user_" + user, pass)); + + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + public static JaasModule scramLoginModule(String username, String password) { + return scramLoginModule(username, password, false, Collections.emptyMap()); + } + + public static JaasModule scramLoginModule(String username, String password, boolean debug, Map<String, String> tokenProps) { + String name = "org.apache.kafka.common.security.scram.ScramLoginModule"; + + Map<String, String> entries = new HashMap<>(); + entries.put("username", username); + entries.put("password", password); + entries.putAll(tokenProps); + + return new JaasModule( + name, + debug, + entries, + String.format("%s required\n debug=%b\n %s;\n", name, debug, entries.entrySet().stream() + .map(e -> e.getKey() + "=\"" + e.getValue() + "\"") + .reduce((e1, e2) -> e1 + "\n " + e2) + .orElse("")) + ); + } + + private final String name; + + private final boolean debug; + + private final Map<String, String> entries; + + private final String toString; + + private JaasModule(String name, boolean debug, Map<String, String> entries, String toString) { Review Comment: Why are we sending `toString` to the class and not generating this in `toString` method? specially that as far as I can see all `toString` passed values has the same formate. ########## core/src/test/java/kafka/security/JaasTestUtils.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security; + +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Java; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.common.config.SaslConfigs.GSSAPI_MECHANISM; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; +import static org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM; + +public class JaasTestUtils { + public static class JaasSection { + private final String contextName; + private final List<JaasModule> modules; + + public JaasSection(String contextName, List<JaasModule> modules) { + this.contextName = contextName; + this.modules = modules; + } + + public List<JaasModule> getModules() { + return modules; + } + + public String getContextName() { + return contextName; + } + + @Override + public String toString() { + return String.format("%s {\n %s\n};\n", + contextName, + String.join("\n ", modules.stream().map(Object::toString).toArray(String[]::new))); + } + } + + private static final boolean IS_IBM_SECURITY = Java.isIbmJdk() && !Java.isIbmJdkSemeru(); + + private static final String ZK_SERVER_CONTEXT_NAME = "Server"; + private static final String ZK_CLIENT_CONTEXT_NAME = "Client"; + private static final String ZK_USER_SUPER_PASSWD = "adminpasswd"; + private static final String ZK_USER = "fpj"; + private static final String ZK_USER_PASSWORD = "fpjsecret"; + + public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer"; + public static final String KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME = "kafka"; + private static final String KAFKA_SERVER_PRINCIPAL = KAFKA_SERVER_PRINCIPAL_UNQUALIFIED_NAME + "/[email protected]"; + public static final String KAFKA_CLIENT_CONTEXT_NAME = "KafkaClient"; + public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME = "client"; + private static final String KAFKA_CLIENT_PRINCIPAL = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME + "@EXAMPLE.COM"; + public static final String KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 = "client2"; + private static final String KAFKA_CLIENT_PRINCIPAL_2 = KAFKA_CLIENT_PRINCIPAL_UNQUALIFIED_NAME_2 + "@EXAMPLE.COM"; + + public static final String KAFKA_PLAIN_USER = "plain-user"; + private static final String KAFKA_PLAIN_PASSWORD = "plain-user-secret"; + public static final String KAFKA_PLAIN_USER_2 = "plain-user2"; + public static final String KAFKA_PLAIN_PASSWORD_2 = "plain-user2-secret"; + public static final String KAFKA_PLAIN_ADMIN = "plain-admin"; + private static final String KAFKA_PLAIN_ADMIN_PASSWORD = "plain-admin-secret"; + + public static final String KAFKA_SCRAM_USER = "scram-user"; + public static final String KAFKA_SCRAM_PASSWORD = "scram-user-secret"; + public static final String KAFKA_SCRAM_USER_2 = "scram-user2"; + public static final String KAFKA_SCRAM_PASSWORD_2 = "scram-user2-secret"; + public static final String KAFKA_SCRAM_ADMIN = "scram-admin"; + public static final String KAFKA_SCRAM_ADMIN_PASSWORD = "scram-admin-secret"; + + public static final String KAFKA_OAUTH_BEARER_USER = "oauthbearer-user"; + public static final String KAFKA_OAUTH_BEARER_USER_2 = "oauthbearer-user2"; + public static final String KAFKA_OAUTH_BEARER_ADMIN = "oauthbearer-admin"; + + public static final String SERVICE_NAME = "kafka"; + + public static Properties saslConfigs(Optional<Properties> saslProperties) { + Properties result = saslProperties.orElse(new Properties()); + if (IS_IBM_SECURITY && !result.containsKey(SaslConfigs.SASL_KERBEROS_SERVICE_NAME)) { + result.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, SERVICE_NAME); + } + return result; + } + + public static File writeJaasContextsToFile(List<JaasSection> jaasSections) throws IOException { + File jaasFile = TestUtils.tempFile(); + writeToFile(jaasFile, jaasSections); + return jaasFile; + } + + public static String scramClientLoginModule(String mechanism, String scramUser, String scramPassword) { + if (ScramMechanism.fromMechanismName(mechanism) == ScramMechanism.UNKNOWN) { + throw new IllegalArgumentException("Unsupported SCRAM mechanism " + mechanism); + } + return JaasModule.scramLoginModule(scramUser, scramPassword, false, new HashMap<>()).toString(); + } + + public static String clientLoginModule(String mechanism, Optional<File> keytabLocation, String serviceName) { + return kafkaClientModule(mechanism, keytabLocation, KAFKA_CLIENT_PRINCIPAL, KAFKA_PLAIN_USER, KAFKA_PLAIN_PASSWORD, KAFKA_SCRAM_USER, KAFKA_SCRAM_PASSWORD, KAFKA_OAUTH_BEARER_USER, serviceName).toString(); + } + + public static String clientLoginModule(String mechanism, Optional<File> keytabLocation) { + return clientLoginModule(mechanism, keytabLocation, SERVICE_NAME); + } + + public static String adminLoginModule(String mechanism, Optional<File> keytabLocation, String serviceName) { + return kafkaClientModule(mechanism, keytabLocation, KAFKA_SERVER_PRINCIPAL, KAFKA_PLAIN_ADMIN, KAFKA_PLAIN_ADMIN_PASSWORD, KAFKA_SCRAM_ADMIN, KAFKA_SCRAM_ADMIN_PASSWORD, KAFKA_OAUTH_BEARER_ADMIN, serviceName).toString(); Review Comment: same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
