This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 14adbad94 ATLAS-4788 : Kafka password is in clear text in application.properties 14adbad94 is described below commit 14adbad94a0de95f6d390b2d8c80d9bd914c0438 Author: chaitali <chaitali.bor...@freestoneinfotech.com> AuthorDate: Mon Oct 30 17:13:07 2023 +0530 ATLAS-4788 : Kafka password is in clear text in application.properties Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com> --- .../java/org/apache/atlas/utils/KafkaUtils.java | 17 ++- .../org/apache/atlas/utils/KafkaUtilsTest.java | 123 ++++++++++++++++++++- 2 files changed, 138 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java index 167442259..672caa8e6 100644 --- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java +++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java @@ -19,6 +19,7 @@ package org.apache.atlas.utils; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.security.SecurityUtil; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -43,6 +44,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.apache.atlas.security.SecurityProperties.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH; public class KafkaUtils implements AutoCloseable { @@ -62,6 +64,9 @@ public class KafkaUtils implements AutoCloseable { public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka"; public static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config"; + public static final String JAAS_PASSWORD_SUFFIX = "password"; + private static final String JAAS_MASK_PASSWORD = "********"; + final protected Properties kafkaConfiguration; final protected AdminClient adminClient; final protected boolean importInternalTopics; @@ -254,6 +259,7 @@ public class KafkaUtils implements AutoCloseable { String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + "."; String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP; + String passwordOptionKey = optionPrefix + JAAS_PASSWORD_SUFFIX; int optionPrefixLen = optionPrefix.length(); StringBuffer optionStringBuffer = new StringBuffer(); @@ -271,7 +277,16 @@ public class KafkaUtils implements AutoCloseable { } catch (IOException e) { LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal); } - + if (key.equalsIgnoreCase(passwordOptionKey)) { + String jaasKafkaClientConfigurationProperty = "atlas.jaas.KafkaClient.option.password"; + if (JAAS_MASK_PASSWORD.equals(configuration.getString(jaasKafkaClientConfigurationProperty))) { + try { + optionVal = SecurityUtil.getPassword(configuration, jaasKafkaClientConfigurationProperty, HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH); + } catch (Exception e) { + LOG.error("Error in getting secure password ", e); + } + } + } optionVal = surroundWithQuotes(optionVal); optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal)); diff --git a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java index 562e28ae1..9b4f093e7 100644 --- a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java +++ b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java @@ -20,13 +20,20 @@ package org.apache.atlas.utils; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.alias.CredentialProvider; +import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.hadoop.security.alias.JavaKeyStoreProvider; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.security.JaasContext; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.annotations.Test; + +import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -37,6 +44,11 @@ import static org.testng.Assert.fail; public class KafkaUtilsTest { + protected Path jksPath; + protected String providerUrl; + + protected static final String JAAS_MASKED_PASSWORD = "keypass"; + @Test public void testSetKafkaJAASPropertiesForAllProperValues() { Properties properties = new Properties(); @@ -262,6 +274,94 @@ public class KafkaUtilsTest { } } + @Test + public void testSetKafkaJAASPropertiesForClearTextPassword() throws Exception { + Properties properties = new Properties(); + Configuration configuration = new PropertiesConfiguration(); + setupCredentials(); + final String loginModuleName = "org.apache.kafka.common.security.scram.ScramLoginModule"; + final String loginModuleControlFlag = "required"; + final String optionUseKeyTab = "false"; + final String optionStoreKey = "false"; + final String optionServiceName = "kafka"; + final String optionTokenAuth = "true"; + final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ"; + final String optionPassword = "admin123"; + + configuration.setProperty("atlas.kafka.bootstrap.servers", "localhost:9100"); + configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName); + configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag); + configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab); + configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey); + configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName", optionServiceName); + configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth", optionTokenAuth); + configuration.setProperty("atlas.jaas.KafkaClient.option.username", optionUsername); + configuration.setProperty("atlas.jaas.KafkaClient.option.password", optionPassword); + configuration.setProperty("hadoop.security.credential.provider.path", providerUrl); + + try (MockedStatic mockedKafkaUtilsClass = Mockito.mockStatic(KafkaUtils.class)) { + mockedKafkaUtilsClass.when(() -> KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod(); + mockedKafkaUtilsClass.when(() -> KafkaUtils.setKafkaJAASProperties(configuration, properties)).thenCallRealMethod(); + + KafkaUtils.setKafkaJAASProperties(configuration, properties); + + String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY); + assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property"); + assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property"); + assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("storeKey=\"" + optionStoreKey + "\""), "storeKey not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("tokenauth=\"" + optionTokenAuth + "\""), "tokenauth not pres////.ent in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("username=\"" + optionUsername + "\""), "username not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("password=\"" + optionPassword + "\""), "password not present in new property or value doesn't match"); + assertJaaSConfigLoadable(newPropertyValue); + } + } + + @Test + public void testSetKafkaJAASPropertiesForPasswordEncryption() throws Exception { + Properties properties = new Properties(); + Configuration configuration = new PropertiesConfiguration(); + setupCredentials(); + final String loginModuleName = "org.apache.kafka.common.security.scram.ScramLoginModule"; + final String loginModuleControlFlag = "required"; + final String optionUseKeyTab = "false"; + final String optionStoreKey = "false"; + final String optionServiceName = "kafka"; + final String optionTokenAuth = "true"; + final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ"; + final String optionPassword = "********"; + + configuration.setProperty("atlas.kafka.bootstrap.servers", "localhost:9100"); + configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName); + configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag); + configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab); + configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey); + configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName", optionServiceName); + configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth", optionTokenAuth); + configuration.setProperty("atlas.jaas.KafkaClient.option.username", optionUsername); + configuration.setProperty("atlas.jaas.KafkaClient.option.password", optionPassword); + configuration.setProperty("hadoop.security.credential.provider.path", providerUrl); + + try (MockedStatic mockedKafkaUtilsClass = Mockito.mockStatic(KafkaUtils.class)) { + mockedKafkaUtilsClass.when(() -> KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod(); + mockedKafkaUtilsClass.when(() -> KafkaUtils.setKafkaJAASProperties(configuration, properties)).thenCallRealMethod(); + + KafkaUtils.setKafkaJAASProperties(configuration, properties); + + String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY); + assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property"); + assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property"); + assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("storeKey=\"" + optionStoreKey + "\""), "storeKey not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("tokenauth=\"" + optionTokenAuth + "\""), "tokenauth not pres////.ent in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("username=\"" + optionUsername + "\""), "username not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("password=\"" + JAAS_MASKED_PASSWORD + "\""), "password not present in new property or value doesn't match"); + assertJaaSConfigLoadable(newPropertyValue); + } + } + private void assertJaaSConfigLoadable(String jaasConfig) { // Ensure that JaaS config can be loaded Map<String, Password> jaasConfigs = new HashMap<>(); @@ -273,6 +373,27 @@ public class KafkaUtilsTest { } } + protected void setupCredentials() throws Exception { + jksPath = new Path(Files.createTempDirectory("tempproviders").toString(), "kafka.jceks"); + providerUrl = JavaKeyStoreProvider.SCHEME_NAME + "://file/" + jksPath.toUri(); + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(false); + + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, providerUrl); + + CredentialProvider provider = CredentialProviderFactory.getProviders(conf).get(0); + + // create new aliases + try { + provider.createCredentialEntry("atlas.jaas.KafkaClient.option.password", JAAS_MASKED_PASSWORD.toCharArray()); -} + // write out so that it can be found in checks + provider.flush(); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + +}