This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f1e9dc2a4b3 [SPARK-45408][CORE] Add RPC SSL settings to TransportConf
f1e9dc2a4b3 is described below
commit f1e9dc2a4b31f597f7b72e6eda137e990c7b3980
Author: Hasnain Lakhani <[email protected]>
AuthorDate: Thu Oct 5 11:40:38 2023 -0500
[SPARK-45408][CORE] Add RPC SSL settings to TransportConf
### What changes were proposed in this pull request?
This change adds new settings to `TransportConf` which are needed for the
RPC SSL functionality to work. Additionally, add some sample configurations
which are used by tests in follow up PRs (see
https://github.com/apache/spark/pull/42685 for the full context)
### Why are the changes needed?
These changes are needed so that other modules can easily access
configurations, and that the sample configurations are easily accessible for
tests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a test, then ran:
```
./build/sbt
> project network-common
> testOnly org.apache.spark.network.TransportConfSuite
```
There are more follow up tests coming (see
https://github.com/apache/spark/pull/42685)
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43220 from hasnain-db/spark-tls-configs-low.
Authored-by: Hasnain Lakhani <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../apache/spark/network/util/TransportConf.java | 152 +++++++++++++
.../src/test/java/TransportConfSuite.java | 88 ++++++++
.../apache/spark/network/ssl/SslSampleConfigs.java | 235 +++++++++++++++++++++
3 files changed, 475 insertions(+)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index b8d8f6b85a4..3ebb38e310f 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -17,6 +17,7 @@
package org.apache.spark.network.util;
+import java.io.File;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -257,6 +258,157 @@ public class TransportConf {
conf.get("spark.network.ssl.maxEncryptedBlockSize", "64k")));
}
+ /**
+ * Whether Secure (SSL/TLS) RPC (including Block Transfer Service) is enabled
+ */
+ public boolean sslRpcEnabled() {
+ return conf.getBoolean("spark.ssl.rpc.enabled", false);
+ }
+
+ /**
+ * SSL protocol (remember that SSLv3 was compromised) supported by Java
+ */
+ public String sslRpcProtocol() {
+ return conf.get("spark.ssl.rpc.protocol", null);
+ }
+
+ /**
+ * A comma separated list of ciphers
+ */
+ public String[] sslRpcRequestedCiphers() {
+ String ciphers = conf.get("spark.ssl.rpc.enabledAlgorithms", null);
+ return (ciphers != null ? ciphers.split(",") : null);
+ }
+
+ /**
+ * The key-store file; can be relative to the current directory
+ */
+ public File sslRpcKeyStore() {
+ String keyStore = conf.get("spark.ssl.rpc.keyStore", null);
+ if (keyStore != null) {
+ return new File(keyStore);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * The password to the key-store file
+ */
+ public String sslRpcKeyStorePassword() {
+ return conf.get("spark.ssl.rpc.keyStorePassword", null);
+ }
+
+ /**
+ * A PKCS#8 private key file in PEM format; can be relative to the current
directory
+ */
+ public File sslRpcPrivateKey() {
+ String privateKey = conf.get("spark.ssl.rpc.privateKey", null);
+ if (privateKey != null) {
+ return new File(privateKey);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * The password to the private key
+ */
+ public String sslRpcKeyPassword() {
+ return conf.get("spark.ssl.rpc.keyPassword", null);
+ }
+
+ /**
+ * A X.509 certificate chain file in PEM format; can be relative to the
current directory
+ */
+ public File sslRpcCertChain() {
+ String certChain = conf.get("spark.ssl.rpc.certChain", null);
+ if (certChain != null) {
+ return new File(certChain);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * The trust-store file; can be relative to the current directory
+ */
+ public File sslRpcTrustStore() {
+ String trustStore = conf.get("spark.ssl.rpc.trustStore", null);
+ if (trustStore != null) {
+ return new File(trustStore);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * The password to the trust-store file
+ */
+ public String sslRpcTrustStorePassword() {
+ return conf.get("spark.ssl.rpc.trustStorePassword", null);
+ }
+
+ /**
+ * If using a trust-store that that reloads its configuration is enabled.
+ * If true, when the trust-store file on disk changes, it will be reloaded
+ */
+ public boolean sslRpcTrustStoreReloadingEnabled() {
+ return conf.getBoolean("spark.ssl.rpc.trustStoreReloadingEnabled", false);
+ }
+
+ /**
+ * The interval, in milliseconds, the trust-store will reload its
configuration
+ */
+ public int sslRpctrustStoreReloadIntervalMs() {
+ return conf.getInt("spark.ssl.rpc.trustStoreReloadIntervalMs", 10000);
+ }
+
+ /**
+ * If the OpenSSL implementation is enabled,
+ * (if available on host system), requires certChain and keyFile arguments
+ */
+ public boolean sslRpcOpenSslEnabled() {
+ return conf.getBoolean("spark.ssl.rpc.openSslEnabled", false);
+ }
+
+ /**
+ *
+ * @return true if and only if RPC encryption is enabled and the relevant
keys exist
+ */
+ public boolean sslRpcEnabledAndKeysAreValid() {
+ if (!sslRpcEnabled()) {
+ return false;
+ }
+ if (sslRpcOpenSslEnabled()) {
+ // OpenSSL requires both the privateKey and certChain
+ File privateKey = sslRpcPrivateKey();
+ if (privateKey == null || !privateKey.exists()) {
+ return false;
+ }
+ File certChain = sslRpcCertChain();
+ if (certChain == null || !certChain.exists()) {
+ return false;
+ }
+ return true;
+ } else {
+ File keyStore = sslRpcKeyStore();
+ if (keyStore == null || !keyStore.exists()) {
+ return false;
+ }
+ // It's fine for the trust store to be missing, we would default to
trusting all.
+ return true;
+ }
+ }
+
+ /**
+ * If we can dangerously fallback to unencrypted connections if RPC over SSL
is enabled
+ * but the key files are not present
+ */
+ public boolean sslRpcDangerouslyFallbackIfKeysNotPresent() {
+ return
conf.getBoolean("spark.ssl.rpc.dangerouslyFallbackIfKeysNotPresent", false);
+ }
+
/**
* Flag indicating whether to share the pooled ByteBuf allocators between
the different Netty
* channels. If enabled then only two pooled ByteBuf allocators are created:
one where caching
diff --git a/common/network-common/src/test/java/TransportConfSuite.java
b/common/network-common/src/test/java/TransportConfSuite.java
new file mode 100644
index 00000000000..1537f67e98d
--- /dev/null
+++ b/common/network-common/src/test/java/TransportConfSuite.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network;
+
+import java.io.File;
+
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.ssl.SslSampleConfigs;
+
+public class TransportConfSuite {
+
+ private TransportConf transportConf =
+ new TransportConf(
+ "shuffle", SslSampleConfigs.createDefaultConfigProviderForRpcNamespace());
+
+ @Test
+ public void testKeyStorePath() {
+ assertEquals(new File(SslSampleConfigs.keyStorePath),
transportConf.sslRpcKeyStore());
+ }
+
+ @Test
+ public void testPrivateKeyPath() {
+ assertEquals(new File(SslSampleConfigs.privateKeyPath),
transportConf.sslRpcPrivateKey());
+ }
+
+ @Test
+ public void testCertChainPath() {
+ assertEquals(new File(SslSampleConfigs.certChainPath),
transportConf.sslRpcCertChain());
+ }
+
+ @Test
+ public void testTrustStorePath() {
+ assertEquals(new File(SslSampleConfigs.trustStorePath),
transportConf.sslRpcTrustStore());
+ }
+
+ @Test
+ public void testTrustStoreReloadingEnabled() {
+ assertFalse(transportConf.sslRpcTrustStoreReloadingEnabled());
+ }
+
+ @Test
+ public void testOpenSslEnabled() {
+ assertFalse(transportConf.sslRpcOpenSslEnabled());
+ }
+
+ @Test
+ public void testSslRpcEnabled() {
+ assertTrue(transportConf.sslRpcEnabled());
+ }
+
+
+ @Test
+ public void testSslKeyStorePassword() {
+ assertEquals("password", transportConf.sslRpcKeyStorePassword());
+ }
+
+ @Test
+ public void testSslKeyPassword() {
+ assertEquals("password", transportConf.sslRpcKeyPassword());
+ }
+
+ @Test
+ public void testSslTrustStorePassword() {
+ assertEquals("password", transportConf.sslRpcTrustStorePassword());
+ }
+
+ @Test
+ public void testSsltrustStoreReloadIntervalMs() {
+ assertEquals(10000, transportConf.sslRpctrustStoreReloadIntervalMs());
+ }
+}
diff --git
a/common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java
b/common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java
new file mode 100644
index 00000000000..3c81b0af318
--- /dev/null
+++
b/common/network-common/src/test/java/org/apache/spark/network/ssl/SslSampleConfigs.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import javax.security.auth.x500.X500Principal;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.GeneralSecurityException;
+import java.security.InvalidKeyException;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.SignatureException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateEncodingException;
+import java.security.cert.X509Certificate;
+import java.util.*;
+
+import org.bouncycastle.x509.X509V1CertificateGenerator;
+
+import org.apache.spark.network.util.ConfigProvider;
+import org.apache.spark.network.util.MapConfigProvider;
+
+
+/**
+ *
+ */
+public class SslSampleConfigs {
+
+ public static final String keyStorePath = getAbsolutePath("/keystore");
+ public static final String privateKeyPath = getAbsolutePath("/key.pem");
+ public static final String certChainPath = getAbsolutePath("/certchain.pem");
+ public static final String untrustedKeyStorePath =
getAbsolutePath("/untrusted-keystore");
+ public static final String trustStorePath = getAbsolutePath("/truststore");
+
+
+ /**
+ * Creates a config map containing the settings needed to enable the RPC SSL
feature
+ * All the settings (except the enabled one) are intentionally set on the
parent namespace
+ * so that we can verify settings inheritance works
+ */
+ public static Map<String, String> createDefaultConfigMap() {
+ Map<String, String> confMap = new HashMap<String, String>();
+ confMap.put("spark.ssl.rpc.enabled", "true");
+ // Need this so the other settings get parsed
+ confMap.put("spark.ssl.enabled", "true");
+ confMap.put("spark.ssl.trustStoreReloadingEnabled", "false");
+ confMap.put("spark.ssl.openSslEnabled", "false");
+ confMap.put("spark.ssl.trustStoreReloadIntervalMs", "10000");
+ confMap.put("spark.ssl.keyStore", SslSampleConfigs.keyStorePath);
+ confMap.put("spark.ssl.keyStorePassword", "password");
+ confMap.put("spark.ssl.privateKey", SslSampleConfigs.privateKeyPath);
+ confMap.put("spark.ssl.keyPassword", "password");
+ confMap.put("spark.ssl.certChain", SslSampleConfigs.certChainPath);
+ confMap.put("spark.ssl.trustStore", SslSampleConfigs.trustStorePath);
+ confMap.put("spark.ssl.trustStorePassword", "password");
+ return confMap;
+ }
+
+ /**
+ * Similar to the above, but sets the settings directly in the spark.ssl.rpc
namespace
+ * This is needed for testing in the lower level modules (like
network-common) where inheritance
+ * does not work as there is no access to SSLOptions.
+ */
+ public static Map<String, String> createDefaultConfigMapForRpcNamespace() {
+ Map<String, String> confMap = new HashMap<String, String>();
+ confMap.put("spark.ssl.rpc.enabled", "true");
+ confMap.put("spark.ssl.rpc.trustStoreReloadingEnabled", "false");
+ confMap.put("spark.ssl.rpc.openSslEnabled", "false");
+ confMap.put("spark.ssl.rpc.trustStoreReloadIntervalMs", "10000");
+ confMap.put("spark.ssl.rpc.keyStore", SslSampleConfigs.keyStorePath);
+ confMap.put("spark.ssl.rpc.keyStorePassword", "password");
+ confMap.put("spark.ssl.rpc.privateKey", SslSampleConfigs.privateKeyPath);
+ confMap.put("spark.ssl.rpc.keyPassword", "password");
+ confMap.put("spark.ssl.rpc.certChain", SslSampleConfigs.certChainPath);
+ confMap.put("spark.ssl.rpc.trustStore", SslSampleConfigs.trustStorePath);
+ confMap.put("spark.ssl.rpc.trustStorePassword", "password");
+ return confMap;
+ }
+
+ /**
+ * Create ConfigProvider based on the method above
+ */
+ public static ConfigProvider createDefaultConfigProviderForRpcNamespace() {
+ return new MapConfigProvider(createDefaultConfigMapForRpcNamespace());
+ }
+
+ /**
+ * Create ConfigProvider based on the method above
+ */
+ public static ConfigProvider
createDefaultConfigProviderForRpcNamespaceWithAdditionalEntries(
+ Map<String, String> entries) {
+ Map<String, String> confMap = createDefaultConfigMapForRpcNamespace();
+ confMap.putAll(entries);
+ return new MapConfigProvider(confMap);
+ }
+
+ public static void createTrustStore(
+ File trustStore, String password, String alias, Certificate cert)
+ throws GeneralSecurityException, IOException {
+ KeyStore ks = createEmptyKeyStore();
+ ks.setCertificateEntry(alias, cert);
+ saveKeyStore(ks, trustStore, password);
+ }
+
+ /**
+ * Creates a keystore with multiple keys and saves it to a file.
+ */
+ public static <T extends Certificate> void createTrustStore(
+ File trustStore, String password, Map<String, T> certs)
+ throws GeneralSecurityException, IOException {
+ KeyStore ks = createEmptyKeyStore();
+ for (Map.Entry<String, T> cert : certs.entrySet()) {
+ ks.setCertificateEntry(cert.getKey(), cert.getValue());
+ }
+ saveKeyStore(ks, trustStore, password);
+ }
+
+ /**
+ * Create a self-signed X.509 Certificate.
+ *
+ * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London,
C=GB"
+ * @param pair the KeyPair
+ * @param days how many days from now the Certificate is valid for
+ * @param algorithm the signing algorithm, eg "SHA1withRSA"
+ * @return the self-signed certificate
+ */
+ @SuppressWarnings("deprecation")
+ public static X509Certificate generateCertificate(
+ String dn, KeyPair pair, int days, String algorithm)
+ throws CertificateEncodingException, InvalidKeyException,
IllegalStateException,
+ NoSuchAlgorithmException, SignatureException {
+
+ Date from = new Date();
+ Date to = new Date(from.getTime() + days * 86400000L);
+ BigInteger sn = new BigInteger(64, new SecureRandom());
+ KeyPair keyPair = pair;
+ X509V1CertificateGenerator certGen = new X509V1CertificateGenerator();
+ X500Principal dnName = new X500Principal(dn);
+
+ certGen.setSerialNumber(sn);
+ certGen.setIssuerDN(dnName);
+ certGen.setNotBefore(from);
+ certGen.setNotAfter(to);
+ certGen.setSubjectDN(dnName);
+ certGen.setPublicKey(keyPair.getPublic());
+ certGen.setSignatureAlgorithm(algorithm);
+
+ X509Certificate cert = certGen.generate(pair.getPrivate());
+ return cert;
+ }
+
+ public static KeyPair generateKeyPair(String algorithm)
+ throws NoSuchAlgorithmException {
+ KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
+ keyGen.initialize(1024);
+ return keyGen.genKeyPair();
+ }
+
+ /**
+ * Creates a keystore with a single key and saves it to a file.
+ *
+ * @param keyStore File keystore to save
+ * @param password String store password to set on keystore
+ * @param keyPassword String key password to set on key
+ * @param alias String alias to use for the key
+ * @param privateKey Key to save in keystore
+ * @param cert Certificate to use as certificate chain associated to
key
+ * @throws GeneralSecurityException for any error with the security APIs
+ * @throws IOException if there is an I/O error saving the file
+ */
+ public static void createKeyStore(
+ File keyStore, String password, String keyPassword,
+ String alias, Key privateKey, Certificate cert)
+ throws GeneralSecurityException, IOException {
+ KeyStore ks = createEmptyKeyStore();
+ ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(),
+ new Certificate[]{cert});
+ saveKeyStore(ks, keyStore, password);
+ }
+
+ public static void createKeyStore(
+ File keyStore, String password,
+ String alias, Key privateKey, Certificate cert)
+ throws GeneralSecurityException, IOException {
+ KeyStore ks = createEmptyKeyStore();
+ ks.setKeyEntry(alias, privateKey, password.toCharArray(), new
Certificate[]{cert});
+ saveKeyStore(ks, keyStore, password);
+ }
+
+ private static KeyStore createEmptyKeyStore()
+ throws GeneralSecurityException, IOException {
+ KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+ ks.load(null, null); // initialize
+ return ks;
+ }
+
+ private static void saveKeyStore(
+ KeyStore ks, File keyStore, String password)
+ throws GeneralSecurityException, IOException {
+ FileOutputStream out = new FileOutputStream(keyStore);
+ try {
+ ks.store(out, password.toCharArray());
+ } finally {
+ out.close();
+ }
+ }
+
+ public static String getAbsolutePath(String path) {
+ try {
+ return new
File(SslSampleConfigs.class.getResource(path).getFile()).getCanonicalPath();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to resolve path " + path, e);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]