This is an automated email from the ASF dual-hosted git repository. nixon pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new e8c9bd8 ATLAS-4263: Fix KafkaUtils to always enclose property values in double-quotes (#137) e8c9bd8 is described below commit e8c9bd8216e451a3db3afeda562e8986527bc6b4 Author: Vlad Glinsky <61428392+vladhlin...@users.noreply.github.com> AuthorDate: Tue May 4 12:55:07 2021 +0300 ATLAS-4263: Fix KafkaUtils to always enclose property values in double-quotes (#137) (cherry picked from commit 21606bdc20e3890ddf9d26555fc71e4175310950) --- .../java/org/apache/atlas/utils/KafkaUtils.java | 19 +---- .../org/apache/atlas/utils/KafkaUtilsTest.java | 89 ++++++++++++++++++---- 2 files changed, 80 insertions(+), 28 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java index eea3311..1674422 100644 --- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java +++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java @@ -318,20 +318,9 @@ public class KafkaUtils implements AutoCloseable { return optionVal; } - String ret = optionVal; - - // For property values which have special chars like "@" or "/", we need to enclose it in - // double quotes, so that Kafka can parse it - // If the property is already enclosed in double quotes, then do nothing. - if (optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') { - // If the string as special characters like except _,- - final String SPECIAL_CHAR_LIST = "/!@#%^&*"; - - if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) { - ret = String.format("\"%s\"", optionVal); - } - } - - return ret; + // Enclose property values in double quotes, so that Kafka can parse it. + // Escape all double quotes that may occur in the property value. + String doubleQuoteEscaped = optionVal.replace("\"", "\\\""); + return String.format("\"%s\"", doubleQuoteEscaped); } } \ No newline at end of file diff --git a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java index 14739cd..562e28a 100644 --- a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java +++ b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java @@ -20,11 +20,15 @@ package org.apache.atlas.utils; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.security.JaasContext; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.annotations.Test; - import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.testng.Assert.assertNull; @@ -55,9 +59,10 @@ public class KafkaUtilsTest { assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property"); assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property"); - assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match"); - assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match"); - assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("storeKey=\""+ optionStoreKey + "\""), "storeKey not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match"); + assertJaaSConfigLoadable(newPropertyValue); } @@ -82,9 +87,10 @@ public class KafkaUtilsTest { assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property"); assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property"); - assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match"); - assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match"); - assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("storeKey=\""+ optionStoreKey + "\""), "storeKey not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match"); + assertJaaSConfigLoadable(newPropertyValue); } @@ -134,6 +140,7 @@ public class KafkaUtilsTest { assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property"); assertTrue(newPropertyValue.contains("keyTabPath=\"" + optionKeyTabPath + "\"")); assertTrue(newPropertyValue.contains("principal=\""+ updatedPrincipalValue + "\"")); + assertJaaSConfigLoadable(newPropertyValue); } catch (IOException e) { fail("Failed while getting updated principal value with exception : " + e.getMessage()); @@ -170,9 +177,10 @@ public class KafkaUtilsTest { String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY); assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property"); assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property"); - assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match"); - assertTrue(newPropertyValue.contains("storeKey=" + optionStoreKey), "storeKey not present in new property or value doesn't match"); - assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("storeKey=\"" + optionStoreKey + "\""), "storeKey not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match"); + assertJaaSConfigLoadable(newPropertyValue); } } @@ -204,9 +212,64 @@ public class KafkaUtilsTest { String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY); assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property"); assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property"); - assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match"); - assertTrue(newPropertyValue.contains("storeKey=" + optionStoreKey), "storeKey not present in new property or value doesn't match"); - assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("storeKey=\"" + optionStoreKey + "\""), "storeKey not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match"); + assertJaaSConfigLoadable(newPropertyValue); + } + } + + @Test + public void testSetKafkaJAASPropertiesForTokenAuthConfig() { + Properties properties = new Properties(); + Configuration configuration = new PropertiesConfiguration(); + + final String loginModuleName = "org.apache.kafka.common.security.scram.ScramLoginModule"; + final String loginModuleControlFlag = "required"; + final String optionUseKeyTab = "false"; + final String optionStoreKey = "false"; + final String optionServiceName = "kafka"; + final String optionTokenAuth = "true"; + final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ"; + final String optionPassword = "KdaUQ4FlKWlDxwQrAeFGUVbb6sR0P+zoqOZDZjtIRP1wseXbSbhiTjz3QI9Ur9o4LTYZSv8TE1QqUC4FSwnoTA=="; + + configuration.setProperty("atlas.kafka.bootstrap.servers", "localhost:9100"); + configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName); + configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag); + configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab); + configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey); + configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName", optionServiceName); + configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth", optionTokenAuth); + configuration.setProperty("atlas.jaas.KafkaClient.option.username", optionUsername); + configuration.setProperty("atlas.jaas.KafkaClient.option.password", optionPassword); + + try (MockedStatic mockedKafkaUtilsClass = Mockito.mockStatic(KafkaUtils.class)) { + mockedKafkaUtilsClass.when(() -> KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod(); + mockedKafkaUtilsClass.when(() -> KafkaUtils.setKafkaJAASProperties(configuration, properties)).thenCallRealMethod(); + + KafkaUtils.setKafkaJAASProperties(configuration, properties); + + String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY); + assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property"); + assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property"); + assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("storeKey=\"" + optionStoreKey + "\""), "storeKey not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("tokenauth=\"" + optionTokenAuth + "\""), "tokenauth not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("username=\"" + optionUsername + "\""), "username not present in new property or value doesn't match"); + assertTrue(newPropertyValue.contains("password=\"" + optionPassword + "\""), "password not present in new property or value doesn't match"); + assertJaaSConfigLoadable(newPropertyValue); + } + } + + private void assertJaaSConfigLoadable(String jaasConfig) { + // Ensure that JaaS config can be loaded + Map<String, Password> jaasConfigs = new HashMap<>(); + jaasConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfig)); + try { + JaasContext.loadClientContext(jaasConfigs); + } catch (IllegalArgumentException e) { + fail(String.format("JaaS config '%s' can not be loaded", jaasConfig), e); } }