This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 19e74e80a8 Added dynamic SSL initialization support for the Kafka 
client (#12249)
19e74e80a8 is described below

commit 19e74e80a825140a515eb9161d543b47d6870a8d
Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com>
AuthorDate: Tue Jan 16 09:34:35 2024 -0800

    Added dynamic SSL initialization support for the Kafka client (#12249)
---
 .../pinot-stream-ingestion/pinot-kafka-2.0/pom.xml |  13 +
 .../KafkaPartitionLevelConnectionHandler.java      |   1 +
 .../pinot/plugin/stream/kafka20/KafkaSSLUtils.java | 339 +++++++++++++++++++++
 .../plugin/stream/kafka20/KafkaSSLUtilsTest.java   | 310 +++++++++++++++++++
 4 files changed, 663 insertions(+)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
index bd2ac3c5e0..7ee33c3b7a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml
@@ -37,6 +37,7 @@
     <pinot.root>${basedir}/../../..</pinot.root>
     <kafka.lib.version>2.8.1</kafka.lib.version>
     <phase.prop>package</phase.prop>
+    <bouncycastle.version>1.70</bouncycastle.version>
   </properties>
 
   <dependencies>
@@ -111,5 +112,17 @@
       <artifactId>metrics-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcprov-jdk15on</artifactId>
+      <version>${bouncycastle.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcpkix-jdk15on</artifactId>
+      <version>${bouncycastle.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index 00371acaba..f0512cc8b3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -67,6 +67,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
_config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
+    KafkaSSLUtils.initSSL(consumerProp);
     _consumer = createConsumer(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java
new file mode 100644
index 0000000000..63f285b785
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtils.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyFactory;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SSL utils class which helps in initialization of Kafka client SSL 
configuration. The class can install the
+ * provided server certificate enabling one-way SSL or it can install the 
server certificate and the
+ * client certificates enabling two-way SSL.
+ */
+public class KafkaSSLUtils {
+
+  private KafkaSSLUtils() {
+    // private on purpose
+  }
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSSLUtils.class);
+  // Value constants
+  private static final String DEFAULT_CERTIFICATE_TYPE = "X.509";
+  private static final String DEFAULT_KEY_ALGORITHM = "RSA";
+  private static final String DEFAULT_KEYSTORE_TYPE = "PKCS12";
+  private static final String DEFAULT_SECURITY_PROTOCOL = "SSL";
+  private static final String DEFAULT_TRUSTSTORE_TYPE = "jks";
+  private static final String DEFAULT_SERVER_ALIAS = "ServerAlias";
+  private static final String DEFAULT_CLIENT_ALIAS = "ClientAlias";
+  // Key constants
+  private static final String SSL_TRUSTSTORE_LOCATION = 
"ssl.truststore.location";
+  private static final String SSL_TRUSTSTORE_PASSWORD = 
"ssl.truststore.password";
+  private static final String SECURITY_PROTOCOL = "security.protocol";
+  private static final String SSL_KEYSTORE_LOCATION = "ssl.keystore.location";
+  private static final String SSL_KEYSTORE_PASSWORD = "ssl.keystore.password";
+  private static final String SSL_KEY_PASSWORD = "ssl.key.password";
+  private static final String STREAM_KAFKA_SSL_SERVER_CERTIFICATE = 
"stream.kafka.ssl.server.certificate";
+  private static final String STREAM_KAFKA_SSL_CERTIFICATE_TYPE = 
"stream.kafka.ssl.certificate.type";
+  private static final String SSL_TRUSTSTORE_TYPE = "ssl.truststore.type";
+  private static final String STREAM_KAFKA_SSL_CLIENT_CERTIFICATE = 
"stream.kafka.ssl.client.certificate";
+  private static final String STREAM_KAFKA_SSL_CLIENT_KEY = 
"stream.kafka.ssl.client.key";
+  private static final String STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM = 
"stream.kafka.ssl.client.key.algorithm";
+  private static final String SSL_KEYSTORE_TYPE = "ssl.keystore.type";
+
+  public static void initSSL(Properties consumerProps) {
+    // Check if one-way SSL is enabled. In this scenario, the client validates 
the server certificate.
+    String trustStoreLocation = 
consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION);
+    String trustStorePassword = 
consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD);
+    String serverCertificate = 
consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE);
+    if (StringUtils.isAnyEmpty(trustStoreLocation, trustStorePassword, 
serverCertificate)) {
+      LOGGER.info("Skipping auto SSL server validation since it's not 
configured.");
+      return;
+    }
+    if (shouldRenewTrustStore(consumerProps)) {
+      initTrustStore(consumerProps);
+    }
+
+    // Set the security protocol
+    String securityProtocol = consumerProps.getProperty(SECURITY_PROTOCOL, 
DEFAULT_SECURITY_PROTOCOL);
+    consumerProps.setProperty(SECURITY_PROTOCOL, securityProtocol);
+
+    // Check if two-way SSL is enabled. In this scenario, the client validates 
the server's certificate and the server
+    // validates the client's certificate.
+    String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION);
+    String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD);
+    String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD);
+    String clientCertificate = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE);
+    if (StringUtils.isAnyEmpty(keyStoreLocation, keyStorePassword, 
keyPassword, clientCertificate)) {
+      LOGGER.info("Skipping auto SSL client validation since it's not 
configured.");
+      return;
+    }
+    if (shouldRenewKeyStore(consumerProps)) {
+      initKeyStore(consumerProps);
+    }
+  }
+
+  @VisibleForTesting
+  static void initTrustStore(Properties consumerProps) {
+    Path trustStorePath = getTrustStorePath(consumerProps);
+    if (Files.exists(trustStorePath)) {
+      deleteFile(trustStorePath);
+    }
+    LOGGER.info("Initializing the SSL trust store");
+    try {
+      // Create the trust store path
+      createFile(trustStorePath);
+    } catch (FileAlreadyExistsException fex) {
+      LOGGER.warn("SSL trust store initialization failed as trust store 
already exists.");
+      return;
+    } catch (IOException iex) {
+      throw new RuntimeException(String.format("Failed to create the trust 
store path: %s", trustStorePath), iex);
+    }
+
+    try {
+      String trustStorePassword = 
consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD);
+      String serverCertificate = 
consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE);
+      String certificateType = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, 
DEFAULT_CERTIFICATE_TYPE);
+      String trustStoreType = consumerProps.getProperty(SSL_TRUSTSTORE_TYPE, 
DEFAULT_TRUSTSTORE_TYPE);
+      consumerProps.setProperty(SSL_TRUSTSTORE_TYPE, trustStoreType);
+
+      // Decode the Base64 string
+      byte[] certBytes = Base64.getDecoder().decode(serverCertificate);
+      InputStream certInputStream = new ByteArrayInputStream(certBytes);
+
+      // Create a Certificate object
+      CertificateFactory certificateFactory = 
CertificateFactory.getInstance(certificateType);
+      Certificate certificate = 
certificateFactory.generateCertificate(certInputStream);
+
+      // Create a TrustStore and load the default TrustStore
+      KeyStore trustStore = KeyStore.getInstance(trustStoreType);
+
+      // Initialize the TrustStore
+      trustStore.load(null, null);
+
+      // Add the server certificate to the truststore
+      trustStore.setCertificateEntry(DEFAULT_SERVER_ALIAS, certificate);
+
+      // Save the keystore to a file
+      try (FileOutputStream fos = new 
FileOutputStream(trustStorePath.toString())) {
+        trustStore.store(fos, trustStorePassword.toCharArray());
+      }
+      LOGGER.info("Initialized the SSL trust store.");
+    } catch (Exception ex) {
+      throw new RuntimeException("Error initializing the SSL trust store", ex);
+    }
+  }
+
+  @VisibleForTesting
+  static void initKeyStore(Properties consumerProps) {
+    Path keyStorePath = getKeyStorePath(consumerProps);
+    if (Files.exists(keyStorePath)) {
+      deleteFile(keyStorePath);
+    }
+    LOGGER.info("Initializing the SSL key store");
+    try {
+      // Create the key store path
+      createFile(keyStorePath);
+    } catch (FileAlreadyExistsException fex) {
+      LOGGER.warn("SSL key store initialization failed as key store already 
exists.");
+      return;
+    } catch (IOException iex) {
+      throw new RuntimeException(String.format("Failed to create the key store 
path: %s", keyStorePath), iex);
+    }
+
+    String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD);
+    String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD);
+    String clientCertificate = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE);
+    String certificateType = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, 
DEFAULT_CERTIFICATE_TYPE);
+    String privateKeyString = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY);
+    String privateKeyAlgorithm = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM,
+        DEFAULT_KEY_ALGORITHM);
+    String keyStoreType = consumerProps.getProperty(SSL_KEYSTORE_TYPE, 
DEFAULT_KEYSTORE_TYPE);
+    consumerProps.setProperty(SSL_KEYSTORE_TYPE, keyStoreType);
+
+    try {
+      // decode the private key and certificate into bytes
+      byte[] pkBytes = Base64.getDecoder().decode(privateKeyString);
+      byte[] certBytes = Base64.getDecoder().decode(clientCertificate);
+
+      // Create the private key object
+      PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(pkBytes);
+      KeyFactory keyFactory = KeyFactory.getInstance(privateKeyAlgorithm);
+      PrivateKey privateKey = keyFactory.generatePrivate(keySpec);
+
+      // Create the Certificate object
+      CertificateFactory certFactory = 
CertificateFactory.getInstance(certificateType);
+      InputStream certInputStream = new ByteArrayInputStream(certBytes);
+      Certificate certificate = 
certFactory.generateCertificate(certInputStream);
+
+      // Create a KeyStore object and load a new empty keystore
+      KeyStore keyStore = KeyStore.getInstance(keyStoreType);
+      keyStore.load(null, null);
+
+      // Add the key pair and certificate to the keystore
+      KeyStore.PrivateKeyEntry privateKeyEntry = new KeyStore.PrivateKeyEntry(
+          privateKey, new Certificate[]{certificate}
+      );
+      KeyStore.PasswordProtection keyPasswordProtection = new 
KeyStore.PasswordProtection(keyPassword.toCharArray());
+      keyStore.setEntry(DEFAULT_CLIENT_ALIAS, privateKeyEntry, 
keyPasswordProtection);
+
+      // Save the keystore to the specified location
+      try (FileOutputStream fos = new 
FileOutputStream(keyStorePath.toString())) {
+        keyStore.store(fos, keyStorePassword.toCharArray());
+      }
+      LOGGER.info("Initialized the SSL key store.");
+    } catch (Exception ex) {
+      throw new RuntimeException("Error initializing the SSL key store", ex);
+    }
+  }
+
+  private static Path getTrustStorePath(Properties consumerProps) {
+    String trustStoreLocation = 
consumerProps.getProperty(SSL_TRUSTSTORE_LOCATION);
+    return Paths.get(trustStoreLocation);
+  }
+
+  private static Path getKeyStorePath(Properties consumerProps) {
+    String keyStoreLocation = consumerProps.getProperty(SSL_KEYSTORE_LOCATION);
+    return Paths.get(keyStoreLocation);
+  }
+
+  // Renew the trust store if needed.
+  private static boolean shouldRenewTrustStore(Properties consumerProps) {
+    boolean renewTrustStore;
+    Path trustStorePath = getTrustStorePath(consumerProps);
+    String trustStorePassword = 
consumerProps.getProperty(SSL_TRUSTSTORE_PASSWORD);
+    String serverCertificate = 
consumerProps.getProperty(STREAM_KAFKA_SSL_SERVER_CERTIFICATE);
+    String certificateType = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, 
DEFAULT_CERTIFICATE_TYPE);
+
+    try {
+      // Load the trust store
+      KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+      try (FileInputStream fis = new 
FileInputStream(trustStorePath.toString())) {
+        trustStore.load(fis, trustStorePassword.toCharArray());
+      }
+
+      // Decode the provided certificate
+      byte[] decodedCertBytes = Base64.getDecoder().decode(serverCertificate);
+      CertificateFactory certFactory = 
CertificateFactory.getInstance(certificateType);
+      Certificate providedCertificate = certFactory.generateCertificate(new 
ByteArrayInputStream(decodedCertBytes));
+
+      // Get the certificate from the trust store
+      Certificate trustStoreCertificate = 
trustStore.getCertificate(DEFAULT_SERVER_ALIAS);
+
+      // Compare the certificates
+      renewTrustStore = !providedCertificate.equals(trustStoreCertificate);
+    } catch (FileNotFoundException fex) {
+      // create the trust store if trust store does not exist – happens the 
very first time
+      renewTrustStore = true;
+    } catch (Exception ex) {
+      // renew trust store if comparison check fails
+      renewTrustStore = true;
+      LOGGER.warn("Trust store certificate comparison check failed.", ex);
+    }
+
+    return renewTrustStore;
+  }
+
+  // Renew the key store if needed.
+  private static boolean shouldRenewKeyStore(Properties consumerProps) {
+    boolean renewKeyStore;
+    Path keyStorePath = getKeyStorePath(consumerProps);
+    String keyStorePassword = consumerProps.getProperty(SSL_KEYSTORE_PASSWORD);
+    String keyPassword = consumerProps.getProperty(SSL_KEY_PASSWORD);
+    String certificateType = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CERTIFICATE_TYPE, 
DEFAULT_CERTIFICATE_TYPE);
+    String clientCertificate = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_CERTIFICATE);
+    String privateKeyAlgorithm = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY_ALGORITHM,
+        DEFAULT_KEY_ALGORITHM);
+    String privateKeyString = 
consumerProps.getProperty(STREAM_KAFKA_SSL_CLIENT_KEY);
+    try {
+      // Load the KeyStore
+      KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+      try (FileInputStream fis = new FileInputStream(keyStorePath.toString())) 
{
+        keyStore.load(fis, keyStorePassword.toCharArray());
+      }
+
+      // Extract certificate and private key from KeyStore
+      Certificate keyStoreCert = keyStore.getCertificate(DEFAULT_CLIENT_ALIAS);
+      PrivateKey keyStorePrivateKey = (PrivateKey) 
keyStore.getKey(DEFAULT_CLIENT_ALIAS, keyPassword.toCharArray());
+
+      // Decode provided Base64 encoded certificate and private key
+      CertificateFactory certFactory = 
CertificateFactory.getInstance(certificateType);
+      Certificate providedCert = certFactory.generateCertificate(new 
ByteArrayInputStream(
+          Base64.getDecoder().decode(clientCertificate)));
+      PKCS8EncodedKeySpec keySpec = new 
PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKeyString));
+      KeyFactory keyFactory = KeyFactory.getInstance(privateKeyAlgorithm);
+      PrivateKey providedPrivateKey = keyFactory.generatePrivate(keySpec);
+
+      // Compare certificates and private keys
+      boolean isCertSame = Arrays.equals(keyStoreCert.getEncoded(), 
providedCert.getEncoded());
+      boolean isKeySame = Arrays.equals(keyStorePrivateKey.getEncoded(), 
providedPrivateKey.getEncoded());
+      renewKeyStore = !(isCertSame && isKeySame);
+    } catch (FileNotFoundException fex) {
+      // create the key store if key store does not exist – happens the very 
first time
+      renewKeyStore = true;
+    } catch (Exception ex) {
+      // renew key store if comparison check fails
+      renewKeyStore = true;
+      LOGGER.warn("Key store certificate and private key comparison checks 
failed.", ex);
+    }
+    return renewKeyStore;
+  }
+
+  private static void deleteFile(Path path) {
+    try {
+      Files.deleteIfExists(path);
+      LOGGER.info(String.format("Successfully deleted file: %s", path));
+    } catch (IOException iex) {
+      LOGGER.warn(String.format("Failed to delete the file: %s", path));
+    }
+  }
+
+  private static void createFile(Path path)
+      throws IOException {
+    Path parentDir = path.getParent();
+    if (parentDir != null) {
+      Files.createDirectories(parentDir);
+    }
+    Path filePath = path.toAbsolutePath();
+    if (!Files.exists(filePath)) {
+      Files.createFile(filePath);
+      LOGGER.info(String.format("Successfully created file: %s", path));
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java
new file mode 100644
index 0000000000..6f61a22e82
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaSSLUtilsTest.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.SecureRandom;
+import java.security.Security;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.UUID;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.cert.X509v3CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.OperatorCreationException;
+import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
+import org.bouncycastle.util.encoders.Base64;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class KafkaSSLUtilsTest {
+  private String _trustStorePath;
+  private String _keyStorePath;
+  private static final String DEFAULT_TRUSTSTORE_PASSWORD = 
"mytruststorepassword";
+  private static final String DEFAULT_KEYSTORE_PASSWORD = "mykeystorepassword";
+
+  static {
+    // helps generate the X509Certificate
+    Security.addProvider(new BouncyCastleProvider());
+  }
+
+  @BeforeMethod
+  private void setup() {
+    _trustStorePath = "/tmp/" + UUID.randomUUID() + "/client.truststore.jks";
+    _keyStorePath = "/tmp/" + UUID.randomUUID() + "/client.keystore.p12";
+  }
+
+  @AfterMethod
+  private void cleanup() {
+    Path trustStorePath = Paths.get(_trustStorePath);
+    try {
+      Files.deleteIfExists(trustStorePath);
+    } catch (IOException ex) {
+      // ignored
+    }
+
+    Path keyStorePath = Paths.get(_keyStorePath);
+    try {
+      Files.deleteIfExists(keyStorePath);
+    } catch (IOException ex) {
+      // ignored
+    }
+  }
+
+  @Test
+  public void testInitTrustStore()
+      throws CertificateException, NoSuchAlgorithmException, 
OperatorCreationException, NoSuchProviderException,
+             IOException, KeyStoreException {
+    Properties consumerProps = new Properties();
+    setTrustStoreProps(consumerProps);
+
+    // should not throw any exceptions
+    KafkaSSLUtils.initTrustStore(consumerProps);
+    validateTrustStoreCertificateCount(1);
+  }
+
+  @Test
+  public void testInitKeyStore()
+      throws CertificateException, NoSuchAlgorithmException, 
OperatorCreationException, NoSuchProviderException,
+             IOException, KeyStoreException {
+    Properties consumerProps = new Properties();
+    setKeyStoreProps(consumerProps);
+
+    // should not throw any exceptions
+    KafkaSSLUtils.initKeyStore(consumerProps);
+    validateKeyStoreCertificateCount(1);
+  }
+
+  @Test
+  public void testInitSSLTrustStoreAndKeyStore()
+      throws CertificateException, NoSuchAlgorithmException, 
OperatorCreationException, NoSuchProviderException,
+             KeyStoreException, IOException {
+    Properties consumerProps = new Properties();
+    setTrustStoreProps(consumerProps);
+    setKeyStoreProps(consumerProps);
+
+    // should not throw any exceptions
+    KafkaSSLUtils.initSSL(consumerProps);
+
+    // validate
+    validateTrustStoreCertificateCount(1);
+    validateKeyStoreCertificateCount(1);
+  }
+
+  @Test
+  public void testInitSSLTrustStoreOnly()
+      throws CertificateException, NoSuchAlgorithmException, 
OperatorCreationException, NoSuchProviderException,
+             IOException, KeyStoreException {
+    Properties consumerProps = new Properties();
+    setTrustStoreProps(consumerProps);
+
+    // should not throw any exceptions
+    KafkaSSLUtils.initSSL(consumerProps);
+
+    // validate
+    validateTrustStoreCertificateCount(1);
+  }
+
+  @Test (expectedExceptions = java.io.FileNotFoundException.class)
+  public void testInitSSLKeyStoreOnly()
+      throws CertificateException, NoSuchAlgorithmException, 
OperatorCreationException, NoSuchProviderException,
+             IOException, KeyStoreException {
+    Properties consumerProps = new Properties();
+    setKeyStoreProps(consumerProps);
+
+    // should not throw any exceptions
+    KafkaSSLUtils.initSSL(consumerProps);
+
+    // Validate that no certificates are installed
+    validateTrustStoreCertificateCount(0);
+  }
+
+  @Test
+  public void testInitSSLAndRenewCertificates()
+      throws CertificateException, NoSuchAlgorithmException, 
OperatorCreationException, NoSuchProviderException,
+             IOException, KeyStoreException {
+    Properties consumerProps = new Properties();
+    setTrustStoreProps(consumerProps);
+    setKeyStoreProps(consumerProps);
+    KafkaSSLUtils.initSSL(consumerProps);
+
+    // renew the truststore and keystore
+    setTrustStoreProps(consumerProps);
+    setKeyStoreProps(consumerProps);
+    KafkaSSLUtils.initSSL(consumerProps);
+
+    // validate
+    validateTrustStoreCertificateCount(1);
+    validateKeyStoreCertificateCount(1);
+  }
+
+  @Test
+  public void testInitSSLBackwardsCompatibilityCheck()
+      throws CertificateException, NoSuchAlgorithmException, 
OperatorCreationException, NoSuchProviderException,
+             IOException, KeyStoreException {
+    Properties consumerProps = new Properties();
+    setTrustStoreProps(consumerProps);
+    setKeyStoreProps(consumerProps);
+    KafkaSSLUtils.initSSL(consumerProps);
+
+    // validate
+    validateTrustStoreCertificateCount(1);
+    validateKeyStoreCertificateCount(1);
+
+    setTrustStoreProps(consumerProps); // new server certificate is generated
+    consumerProps.remove("stream.kafka.ssl.server.certificate");
+    setKeyStoreProps(consumerProps); // new client certificate is generated
+    consumerProps.remove("stream.kafka.ssl.client.certificate");
+
+    // Attempt to initialize the trust store and key store again without 
passing the required certificates
+    KafkaSSLUtils.initSSL(consumerProps);
+    // validate again that the existing certificates are untouched.
+    validateTrustStoreCertificateCount(1);
+    validateKeyStoreCertificateCount(1);
+  }
+
+  private void validateTrustStoreCertificateCount(int expCount)
+      throws CertificateException, IOException, NoSuchAlgorithmException, 
KeyStoreException {
+    // Validate that certificate is installed in the trust store
+    KeyStore trustStore = KeyStore.getInstance("JKS");
+    try (FileInputStream fis = new FileInputStream(_trustStorePath)) {
+      trustStore.load(fis, DEFAULT_TRUSTSTORE_PASSWORD.toCharArray());
+    }
+
+    int certCount = 0;
+    // Iterate through the aliases in the TrustStore
+    Enumeration<String> aliases = trustStore.aliases();
+    while (aliases.hasMoreElements()) {
+      String alias = aliases.nextElement();
+      // Check if the alias refers to a certificate
+      if (trustStore.isCertificateEntry(alias)) {
+        ++certCount;
+      }
+    }
+    Assert.assertEquals(expCount, certCount);
+  }
+
+  private void validateKeyStoreCertificateCount(int expCount)
+      throws CertificateException, IOException, NoSuchAlgorithmException, 
KeyStoreException {
+    // Validate that certificate is installed in the key store
+    KeyStore keyStore = KeyStore.getInstance("PKCS12");
+    try (FileInputStream fis = new FileInputStream(_keyStorePath)) {
+      keyStore.load(fis, DEFAULT_KEYSTORE_PASSWORD.toCharArray());
+    }
+
+    int certCount = 0;
+    // Iterate through the aliases in the TrustStore
+    Enumeration<String> aliases = keyStore.aliases();
+    while (aliases.hasMoreElements()) {
+      String alias = aliases.nextElement();
+      // Check if the alias refers to a key
+      if (keyStore.isKeyEntry(alias)) {
+        ++certCount;
+      }
+    }
+    Assert.assertEquals(expCount, certCount);
+  }
+
+  private void setTrustStoreProps(Properties consumerProps)
+      throws CertificateException, NoSuchAlgorithmException, 
OperatorCreationException, NoSuchProviderException {
+    String[] certCreationResult = generateSelfSignedCertificate();
+    String serverCertificate = certCreationResult[1];
+    consumerProps.setProperty("stream.kafka.ssl.server.certificate", 
serverCertificate);
+    consumerProps.setProperty("stream.kafka.ssl.server.certificate.type", 
"X.509");
+    consumerProps.setProperty("ssl.truststore.type", "jks");
+    consumerProps.setProperty("ssl.truststore.location", _trustStorePath);
+    consumerProps.setProperty("ssl.truststore.password", 
DEFAULT_TRUSTSTORE_PASSWORD);
+  }
+
+  private void setKeyStoreProps(Properties consumerProps)
+      throws CertificateException, NoSuchAlgorithmException, 
OperatorCreationException, NoSuchProviderException {
+    String[] certCreationResult = generateSelfSignedCertificate();
+    String privateKey = certCreationResult[0];
+    String clientCertificate = certCreationResult[1];
+    consumerProps.setProperty("ssl.keystore.location", _keyStorePath);
+    consumerProps.setProperty("ssl.keystore.password", 
DEFAULT_KEYSTORE_PASSWORD);
+    consumerProps.setProperty("ssl.keystore.type", "PKCS12");
+    consumerProps.setProperty("ssl.key.password", "mykeypwd");
+    consumerProps.setProperty("stream.kafka.ssl.certificate.type", "X.509");
+    consumerProps.setProperty("stream.kafka.ssl.client.certificate", 
clientCertificate);
+    consumerProps.setProperty("stream.kafka.ssl.client.key", privateKey);
+    consumerProps.setProperty("stream.kafka.ssl.client.key.algorithm", "RSA");
+  }
+
+  private String[] generateSelfSignedCertificate()
+      throws CertificateException, OperatorCreationException, 
NoSuchAlgorithmException, NoSuchProviderException {
+    String[] certCreationResult = new String[2];
+    // Generate a key pair
+    KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA", 
"BC");
+    keyPairGenerator.initialize(2048, new SecureRandom());
+    KeyPair keyPair = keyPairGenerator.generateKeyPair();
+    // set the private key into the result object
+    certCreationResult[0] = 
Base64.toBase64String(keyPair.getPrivate().getEncoded());
+
+    // Validity of the certificate
+    Date notBefore = new Date();
+    Date notAfter = new Date(notBefore.getTime() + 7 * 24 * 60 * 60 * 1000L); 
// 1 week
+
+    // Issuer and Subject DN
+    X500Name issuerName = new X500Name("CN=Test CA, O=Eng, OU=IT, L=Sunnyvale, 
ST=CA, C=US");
+    X500Name subjectName = new X500Name("CN=Test User, O=Eng, OU=IT, 
L=Sunnyvale, ST=CA, C=US");
+
+    // Serial Number
+    BigInteger serial = BigInteger.valueOf(System.currentTimeMillis());
+
+    // Create the certificate builder
+    X509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder(
+        issuerName,
+        serial,
+        notBefore,
+        notAfter,
+        subjectName,
+        keyPair.getPublic());
+
+    // Create a signer
+    ContentSigner signer = new 
JcaContentSignerBuilder("SHA256withRSA").setProvider("BC").build(keyPair.getPrivate());
+
+    // Build the certificate
+    X509Certificate cert = new JcaX509CertificateConverter().setProvider("BC")
+        .getCertificate(certBuilder.build(signer));
+    // set the encoded certificate string into the result object
+    certCreationResult[1] = Base64.toBase64String(cert.getEncoded());
+    return certCreationResult;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to