This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 19e74e80a8 Added dynamic SSL initialization support for the Kafka client (#12249) 19e74e80a8 is described below commit 19e74e80a825140a515eb9161d543b47d6870a8d Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com> AuthorDate: Tue Jan 16 09:34:35 2024 -0800 Added dynamic SSL initialization support for the Kafka client (#12249) --- .../pinot-stream-ingestion/pinot-kafka-2.0/pom.xml | 13 + .../KafkaPartitionLevelConnectionHandler.java | 1 + .../pinot/plugin/stream/kafka20/KafkaSSLUtils.java | 339 +++++++++++++++++++++ .../plugin/stream/kafka20/KafkaSSLUtilsTest.java | 310 +++++++++++++++++++ 4 files changed, 663 insertions(+) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml index bd2ac3c5e0..7ee33c3b7a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml @@ -37,6 +37,7 @@ <pinot.root>${basedir}/../../..</pinot.root> <kafka.lib.version>2.8.1</kafka.lib.version> <phase.prop>package</phase.prop> + <bouncycastle.version>1.70</bouncycastle.version> </properties> <dependencies> @@ -111,5 +112,17 @@ <artifactId>metrics-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk15on</artifactId> + <version>${bouncycastle.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpkix-jdk15on</artifactId> + <version>${bouncycastle.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java index 00371acaba..f0512cc8b3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java @@ -67,6 +67,7 @@ public abstract class KafkaPartitionLevelConnectionHandler { consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel()); } consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId); + KafkaSSLUtils.initSSL(consumerProp); _consumer = createConsumer(consumerProp); _topicPartition = new TopicPartition(_topic, _partition); _consumer.assign(Collections.singletonList(_topicPartition)); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java new file mode 100644 index 0000000000..63f285b785 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kafka20; + +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyFactory; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.Arrays; +import java.util.Base64; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SSL utils class which helps in initialization of Kafka client SSL configuration. The class can install the + * provided server certificate enabling one-way SSL or it can install the server certificate and the + * client certificates enabling two-way SSL. + */ +public class KafkaSSLUtils { + + private KafkaSSLUtils() { + // private on purpose + } + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSSLUtils.class); + // Value constants + private static final String DEFAULT_CERTIFICATE_TYPE = "X.509"; + private static final String DEFAULT_KEY_ALGORITHM = "RSA"; + private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12"; + private static final String DEFAULT_SECURITY_PROTOCOL = "SSL"; + private static final String DEFAULT_TRUSTSTORE_TYPE = "jks"; + private static final String DEFAULT_SERVER_ALIAS = "ServerAlias"; + private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias"; + // Key constants + private static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; + private static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; + private static final String SECURITY_PROTOCOL = "security.protocol"; + private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location"; + private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password"; + private static final String SSL_KEY_PASSWORD = "ssl.key.password"; + private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE = "stream.kafka.ssl.server.certificate"; + private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE = "stream.kafka.ssl.certificate.type"; + private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type"; + private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE = "stream.kafka.ssl.client.certificate"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY = "stream.kafka.ssl.client.key"; + private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM = "stream.kafka.ssl.client.key.algorithm"; + private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type"; + + public static void initSSL(Properties consumerProps) { + // Check if one-way SSL is enabled. In this scenario, the client validates the server certificate. + String trustStoreLocation = consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION); + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + String serverCertificate = consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE); + if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword, serverCertificate)) { + LOGGER.info("Skipping auto SSL server validation since it's not configured."); + return; + } + if (shouldRenewTrustStore(consumerProps)) { + initTrustStore(consumerProps); + } + + // Set the security protocol + String securityProtocol = consumerProps.getProperty(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL); + consumerProps.setProperty(SECURITY_PROTOCOL, securityProtocol); + + // Check if two-way SSL is enabled. In this scenario, the client validates the server's certificate and the server + // validates the client's certificate. + String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION); + String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD); + String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD); + String clientCertificate = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE); + if (StringUtils.isAnyEmpty(keyStoreLocation, keyStorePassword, keyPassword, clientCertificate)) { + LOGGER.info("Skipping auto SSL client validation since it's not configured."); + return; + } + if (shouldRenewKeyStore(consumerProps)) { + initKeyStore(consumerProps); + } + } + + @VisibleForTesting + static void initTrustStore(Properties consumerProps) { + Path trustStorePath = getTrustStorePath(consumerProps); + if (Files.exists(trustStorePath)) { + deleteFile(trustStorePath); + } + LOGGER.info("Initializing the SSL trust store"); + try { + // Create the trust store path + createFile(trustStorePath); + } catch (FileAlreadyExistsException fex) { + LOGGER.warn("SSL trust store initialization failed as trust store already exists."); + return; + } catch (IOException iex) { + throw new RuntimeException(String.format("Failed to create the trust store path: %s", trustStorePath), iex); + } + + try { + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + String serverCertificate = consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE); + String certificateType = consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, DEFAULT_CERTIFICATE_TYPE); + String trustStoreType = consumerProps.getProperty(SSL_TRUSTSTORE_TYPE, DEFAULT_TRUSTSTORE_TYPE); + consumerProps.setProperty(SSL_TRUSTSTORE_TYPE, trustStoreType); + + // Decode the Base64 string + byte[] certBytes = Base64.getDecoder().decode(serverCertificate); + InputStream certInputStream = new ByteArrayInputStream(certBytes); + + // Create a Certificate object + CertificateFactory certificateFactory = CertificateFactory.getInstance(certificateType); + Certificate certificate = certificateFactory.generateCertificate(certInputStream); + + // Create a TrustStore and load the default TrustStore + KeyStore trustStore = KeyStore.getInstance(trustStoreType); + + // Initialize the TrustStore + trustStore.load(null, null); + + // Add the server certificate to the truststore + trustStore.setCertificateEntry(DEFAULT_SERVER_ALIAS, certificate); + + // Save the keystore to a file + try (FileOutputStream fos = new FileOutputStream(trustStorePath.toString())) { + trustStore.store(fos, trustStorePassword.toCharArray()); + } + LOGGER.info("Initialized the SSL trust store."); + } catch (Exception ex) { + throw new RuntimeException("Error initializing the SSL trust store", ex); + } + } + + @VisibleForTesting + static void initKeyStore(Properties consumerProps) { + Path keyStorePath = getKeyStorePath(consumerProps); + if (Files.exists(keyStorePath)) { + deleteFile(keyStorePath); + } + LOGGER.info("Initializing the SSL key store"); + try { + // Create the key store path + createFile(keyStorePath); + } catch (FileAlreadyExistsException fex) { + LOGGER.warn("SSL key store initialization failed as key store already exists."); + return; + } catch (IOException iex) { + throw new RuntimeException(String.format("Failed to create the key store path: %s", keyStorePath), iex); + } + + String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD); + String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD); + String clientCertificate = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE); + String certificateType = consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, DEFAULT_CERTIFICATE_TYPE); + String privateKeyString = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY); + String privateKeyAlgorithm = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM, + DEFAULT_KEY_ALGORITHM); + String keyStoreType = consumerProps.getProperty(SSL_KEYSTORE_TYPE, DEFAULT_KEYSTORE_TYPE); + consumerProps.setProperty(SSL_KEYSTORE_TYPE, keyStoreType); + + try { + // decode the private key and certificate into bytes + byte[] pkBytes = Base64.getDecoder().decode(privateKeyString); + byte[] certBytes = Base64.getDecoder().decode(clientCertificate); + + // Create the private key object + PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(pkBytes); + KeyFactory keyFactory = KeyFactory.getInstance(privateKeyAlgorithm); + PrivateKey privateKey = keyFactory.generatePrivate(keySpec); + + // Create the Certificate object + CertificateFactory certFactory = CertificateFactory.getInstance(certificateType); + InputStream certInputStream = new ByteArrayInputStream(certBytes); + Certificate certificate = certFactory.generateCertificate(certInputStream); + + // Create a KeyStore object and load a new empty keystore + KeyStore keyStore = KeyStore.getInstance(keyStoreType); + keyStore.load(null, null); + + // Add the key pair and certificate to the keystore + KeyStore.PrivateKeyEntry privateKeyEntry = new KeyStore.PrivateKeyEntry( + privateKey, new Certificate[]{certificate} + ); + KeyStore.PasswordProtection keyPasswordProtection = new KeyStore.PasswordProtection(keyPassword.toCharArray()); + keyStore.setEntry(DEFAULT_CLIENT_ALIAS, privateKeyEntry, keyPasswordProtection); + + // Save the keystore to the specified location + try (FileOutputStream fos = new FileOutputStream(keyStorePath.toString())) { + keyStore.store(fos, keyStorePassword.toCharArray()); + } + LOGGER.info("Initialized the SSL key store."); + } catch (Exception ex) { + throw new RuntimeException("Error initializing the SSL key store", ex); + } + } + + private static Path getTrustStorePath(Properties consumerProps) { + String trustStoreLocation = consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION); + return Paths.get(trustStoreLocation); + } + + private static Path getKeyStorePath(Properties consumerProps) { + String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION); + return Paths.get(keyStoreLocation); + } + + // Renew the trust store if needed. + private static boolean shouldRenewTrustStore(Properties consumerProps) { + boolean renewTrustStore; + Path trustStorePath = getTrustStorePath(consumerProps); + String trustStorePassword = consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD); + String serverCertificate = consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE); + String certificateType = consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, DEFAULT_CERTIFICATE_TYPE); + + try { + // Load the trust store + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try (FileInputStream fis = new FileInputStream(trustStorePath.toString())) { + trustStore.load(fis, trustStorePassword.toCharArray()); + } + + // Decode the provided certificate + byte[] decodedCertBytes = Base64.getDecoder().decode(serverCertificate); + CertificateFactory certFactory = CertificateFactory.getInstance(certificateType); + Certificate providedCertificate = certFactory.generateCertificate(new ByteArrayInputStream(decodedCertBytes)); + + // Get the certificate from the trust store + Certificate trustStoreCertificate = trustStore.getCertificate(DEFAULT_SERVER_ALIAS); + + // Compare the certificates + renewTrustStore = !providedCertificate.equals(trustStoreCertificate); + } catch (FileNotFoundException fex) { + // create the trust store if trust store does not exist – happens the very first time + renewTrustStore = true; + } catch (Exception ex) { + // renew trust store if comparison check fails + renewTrustStore = true; + LOGGER.warn("Trust store certificate comparison check failed.", ex); + } + + return renewTrustStore; + } + + // Renew the key store if needed. + private static boolean shouldRenewKeyStore(Properties consumerProps) { + boolean renewKeyStore; + Path keyStorePath = getKeyStorePath(consumerProps); + String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD); + String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD); + String certificateType = consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, DEFAULT_CERTIFICATE_TYPE); + String clientCertificate = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE); + String privateKeyAlgorithm = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM, + DEFAULT_KEY_ALGORITHM); + String privateKeyString = consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY); + try { + // Load the KeyStore + KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try (FileInputStream fis = new FileInputStream(keyStorePath.toString())) { + keyStore.load(fis, keyStorePassword.toCharArray()); + } + + // Extract certificate and private key from KeyStore + Certificate keyStoreCert = keyStore.getCertificate(DEFAULT_CLIENT_ALIAS); + PrivateKey keyStorePrivateKey = (PrivateKey) keyStore.getKey(DEFAULT_CLIENT_ALIAS, keyPassword.toCharArray()); + + // Decode provided Base64 encoded certificate and private key + CertificateFactory certFactory = CertificateFactory.getInstance(certificateType); + Certificate providedCert = certFactory.generateCertificate(new ByteArrayInputStream( + Base64.getDecoder().decode(clientCertificate))); + PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKeyString)); + KeyFactory keyFactory = KeyFactory.getInstance(privateKeyAlgorithm); + PrivateKey providedPrivateKey = keyFactory.generatePrivate(keySpec); + + // Compare certificates and private keys + boolean isCertSame = Arrays.equals(keyStoreCert.getEncoded(), providedCert.getEncoded()); + boolean isKeySame = Arrays.equals(keyStorePrivateKey.getEncoded(), providedPrivateKey.getEncoded()); + renewKeyStore = !(isCertSame && isKeySame); + } catch (FileNotFoundException fex) { + // create the key store if key store does not exist – happens the very first time + renewKeyStore = true; + } catch (Exception ex) { + // renew key store if comparison check fails + renewKeyStore = true; + LOGGER.warn("Key store certificate and private key comparison checks failed.", ex); + } + return renewKeyStore; + } + + private static void deleteFile(Path path) { + try { + Files.deleteIfExists(path); + LOGGER.info(String.format("Successfully deleted file: %s", path)); + } catch (IOException iex) { + LOGGER.warn(String.format("Failed to delete the file: %s", path)); + } + } + + private static void createFile(Path path) + throws IOException { + Path parentDir = path.getParent(); + if (parentDir != null) { + Files.createDirectories(parentDir); + } + Path filePath = path.toAbsolutePath(); + if (!Files.exists(filePath)) { + Files.createFile(filePath); + LOGGER.info(String.format("Successfully created file: %s", path)); + } + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java new file mode 100644 index 0000000000..6f61a22e82 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kafka20; + +import java.io.FileInputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.SecureRandom; +import java.security.Security; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Date; +import java.util.Enumeration; +import java.util.Properties; +import java.util.UUID; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.cert.X509v3CertificateBuilder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.OperatorCreationException; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; +import org.bouncycastle.util.encoders.Base64; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class KafkaSSLUtilsTest { + private String _trustStorePath; + private String _keyStorePath; + private static final String DEFAULT_TRUSTSTORE_PASSWORD = "mytruststorepassword"; + private static final String DEFAULT_KEYSTORE_PASSWORD = "mykeystorepassword"; + + static { + // helps generate the X509Certificate + Security.addProvider(new BouncyCastleProvider()); + } + + @BeforeMethod + private void setup() { + _trustStorePath = "/tmp/" + UUID.randomUUID() + "/client.truststore.jks"; + _keyStorePath = "/tmp/" + UUID.randomUUID() + "/client.keystore.p12"; + } + + @AfterMethod + private void cleanup() { + Path trustStorePath = Paths.get(_trustStorePath); + try { + Files.deleteIfExists(trustStorePath); + } catch (IOException ex) { + // ignored + } + + Path keyStorePath = Paths.get(_keyStorePath); + try { + Files.deleteIfExists(keyStorePath); + } catch (IOException ex) { + // ignored + } + } + + @Test + public void testInitTrustStore() + throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, NoSuchProviderException, + IOException, KeyStoreException { + Properties consumerProps = new Properties(); + setTrustStoreProps(consumerProps); + + // should not throw any exceptions + KafkaSSLUtils.initTrustStore(consumerProps); + validateTrustStoreCertificateCount(1); + } + + @Test + public void testInitKeyStore() + throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, NoSuchProviderException, + IOException, KeyStoreException { + Properties consumerProps = new Properties(); + setKeyStoreProps(consumerProps); + + // should not throw any exceptions + KafkaSSLUtils.initKeyStore(consumerProps); + validateKeyStoreCertificateCount(1); + } + + @Test + public void testInitSSLTrustStoreAndKeyStore() + throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, NoSuchProviderException, + KeyStoreException, IOException { + Properties consumerProps = new Properties(); + setTrustStoreProps(consumerProps); + setKeyStoreProps(consumerProps); + + // should not throw any exceptions + KafkaSSLUtils.initSSL(consumerProps); + + // validate + validateTrustStoreCertificateCount(1); + validateKeyStoreCertificateCount(1); + } + + @Test + public void testInitSSLTrustStoreOnly() + throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, NoSuchProviderException, + IOException, KeyStoreException { + Properties consumerProps = new Properties(); + setTrustStoreProps(consumerProps); + + // should not throw any exceptions + KafkaSSLUtils.initSSL(consumerProps); + + // validate + validateTrustStoreCertificateCount(1); + } + + @Test (expectedExceptions = java.io.FileNotFoundException.class) + public void testInitSSLKeyStoreOnly() + throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, NoSuchProviderException, + IOException, KeyStoreException { + Properties consumerProps = new Properties(); + setKeyStoreProps(consumerProps); + + // should not throw any exceptions + KafkaSSLUtils.initSSL(consumerProps); + + // Validate that no certificates are installed + validateTrustStoreCertificateCount(0); + } + + @Test + public void testInitSSLAndRenewCertificates() + throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, NoSuchProviderException, + IOException, KeyStoreException { + Properties consumerProps = new Properties(); + setTrustStoreProps(consumerProps); + setKeyStoreProps(consumerProps); + KafkaSSLUtils.initSSL(consumerProps); + + // renew the truststore and keystore + setTrustStoreProps(consumerProps); + setKeyStoreProps(consumerProps); + KafkaSSLUtils.initSSL(consumerProps); + + // validate + validateTrustStoreCertificateCount(1); + validateKeyStoreCertificateCount(1); + } + + @Test + public void testInitSSLBackwardsCompatibilityCheck() + throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, NoSuchProviderException, + IOException, KeyStoreException { + Properties consumerProps = new Properties(); + setTrustStoreProps(consumerProps); + setKeyStoreProps(consumerProps); + KafkaSSLUtils.initSSL(consumerProps); + + // validate + validateTrustStoreCertificateCount(1); + validateKeyStoreCertificateCount(1); + + setTrustStoreProps(consumerProps); // new server certificate is generated + consumerProps.remove("stream.kafka.ssl.server.certificate"); + setKeyStoreProps(consumerProps); // new client certificate is generated + consumerProps.remove("stream.kafka.ssl.client.certificate"); + + // Attempt to initialize the trust store and key store again without passing the required certificates + KafkaSSLUtils.initSSL(consumerProps); + // validate again that the existing certificates are untouched. + validateTrustStoreCertificateCount(1); + validateKeyStoreCertificateCount(1); + } + + private void validateTrustStoreCertificateCount(int expCount) + throws CertificateException, IOException, NoSuchAlgorithmException, KeyStoreException { + // Validate that certificate is installed in the trust store + KeyStore trustStore = KeyStore.getInstance("JKS"); + try (FileInputStream fis = new FileInputStream(_trustStorePath)) { + trustStore.load(fis, DEFAULT_TRUSTSTORE_PASSWORD.toCharArray()); + } + + int certCount = 0; + // Iterate through the aliases in the TrustStore + Enumeration<String> aliases = trustStore.aliases(); + while (aliases.hasMoreElements()) { + String alias = aliases.nextElement(); + // Check if the alias refers to a certificate + if (trustStore.isCertificateEntry(alias)) { + ++certCount; + } + } + Assert.assertEquals(expCount, certCount); + } + + private void validateKeyStoreCertificateCount(int expCount) + throws CertificateException, IOException, NoSuchAlgorithmException, KeyStoreException { + // Validate that certificate is installed in the key store + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + try (FileInputStream fis = new FileInputStream(_keyStorePath)) { + keyStore.load(fis, DEFAULT_KEYSTORE_PASSWORD.toCharArray()); + } + + int certCount = 0; + // Iterate through the aliases in the TrustStore + Enumeration<String> aliases = keyStore.aliases(); + while (aliases.hasMoreElements()) { + String alias = aliases.nextElement(); + // Check if the alias refers to a key + if (keyStore.isKeyEntry(alias)) { + ++certCount; + } + } + Assert.assertEquals(expCount, certCount); + } + + private void setTrustStoreProps(Properties consumerProps) + throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, NoSuchProviderException { + String[] certCreationResult = generateSelfSignedCertificate(); + String serverCertificate = certCreationResult[1]; + consumerProps.setProperty("stream.kafka.ssl.server.certificate", serverCertificate); + consumerProps.setProperty("stream.kafka.ssl.server.certificate.type", "X.509"); + consumerProps.setProperty("ssl.truststore.type", "jks"); + consumerProps.setProperty("ssl.truststore.location", _trustStorePath); + consumerProps.setProperty("ssl.truststore.password", DEFAULT_TRUSTSTORE_PASSWORD); + } + + private void setKeyStoreProps(Properties consumerProps) + throws CertificateException, NoSuchAlgorithmException, OperatorCreationException, NoSuchProviderException { + String[] certCreationResult = generateSelfSignedCertificate(); + String privateKey = certCreationResult[0]; + String clientCertificate = certCreationResult[1]; + consumerProps.setProperty("ssl.keystore.location", _keyStorePath); + consumerProps.setProperty("ssl.keystore.password", DEFAULT_KEYSTORE_PASSWORD); + consumerProps.setProperty("ssl.keystore.type", "PKCS12"); + consumerProps.setProperty("ssl.key.password", "mykeypwd"); + consumerProps.setProperty("stream.kafka.ssl.certificate.type", "X.509"); + consumerProps.setProperty("stream.kafka.ssl.client.certificate", clientCertificate); + consumerProps.setProperty("stream.kafka.ssl.client.key", privateKey); + consumerProps.setProperty("stream.kafka.ssl.client.key.algorithm", "RSA"); + } + + private String[] generateSelfSignedCertificate() + throws CertificateException, OperatorCreationException, NoSuchAlgorithmException, NoSuchProviderException { + String[] certCreationResult = new String[2]; + // Generate a key pair + KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA", "BC"); + keyPairGenerator.initialize(2048, new SecureRandom()); + KeyPair keyPair = keyPairGenerator.generateKeyPair(); + // set the private key into the result object + certCreationResult[0] = Base64.toBase64String(keyPair.getPrivate().getEncoded()); + + // Validity of the certificate + Date notBefore = new Date(); + Date notAfter = new Date(notBefore.getTime() + 7 * 24 * 60 * 60 * 1000L); // 1 week + + // Issuer and Subject DN + X500Name issuerName = new X500Name("CN=Test CA, O=Eng, OU=IT, L=Sunnyvale, ST=CA, C=US"); + X500Name subjectName = new X500Name("CN=Test User, O=Eng, OU=IT, L=Sunnyvale, ST=CA, C=US"); + + // Serial Number + BigInteger serial = BigInteger.valueOf(System.currentTimeMillis()); + + // Create the certificate builder + X509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder( + issuerName, + serial, + notBefore, + notAfter, + subjectName, + keyPair.getPublic()); + + // Create a signer + ContentSigner signer = new JcaContentSignerBuilder("SHA256withRSA").setProvider("BC").build(keyPair.getPrivate()); + + // Build the certificate + X509Certificate cert = new JcaX509CertificateConverter().setProvider("BC") + .getCertificate(certBuilder.build(signer)); + // set the encoded certificate string into the result object + certCreationResult[1] = Base64.toBase64String(cert.getEncoded()); + return certCreationResult; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org