[
https://issues.apache.org/jira/browse/HADOOP-17912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17613699#comment-17613699
]
ASF GitHub Bot commented on HADOOP-17912:
-----------------------------------------
steveloughran commented on code in PR #3440:
URL: https://github.com/apache/hadoop/pull/3440#discussion_r989072831
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -779,6 +787,18 @@ public AbfsInputStream openFileForRead(Path path,
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
+ try {
+ encryptionAdapter = new EncryptionAdapter(
+ client.getEncryptionContextProvider(), getRelativePath(path),
+
op.getResponseHeader(HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT)
+ .getBytes(StandardCharsets.UTF_8));
+ } catch (NullPointerException ex) {
+ LOG.debug("EncryptionContext missing in GetPathStatus response");
+ throw new IOException(
Review Comment:
raise a PathIOExtension and include the path of the file, for better
diagnostics
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -599,26 +605,25 @@ public OutputStream createFile(final Path path,
* only if there is match for eTag of existing file.
* @param relativePath
* @param statistics
- * @param permission
- * @param umask
+ * @param permissions contains permission and umask
* @param isAppendBlob
* @return
* @throws AzureBlobFileSystemException
*/
private AbfsRestOperation conditionalCreateOverwriteFile(final String
relativePath,
final FileSystem.Statistics statistics,
- final String permission,
- final String umask,
+ Permissions permissions,
Review Comment:
nit: make final for consistency with the others.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -1616,16 +1647,39 @@ private void initializeClient(URI uri, String
fileSystemName,
abfsConfiguration.getRawConfiguration());
}
+ // Encryption setup
+ EncryptionContextProvider encryptionContextProvider = null;
+ if (isSecure) {
+ encryptionContextProvider =
+ abfsConfiguration.createEncryptionContextProvider();
+ if (encryptionContextProvider != null) {
+ if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null)
{
+ throw new IOException(
+ "Both global key and encryption context are set, only one
allowed");
+ }
+ encryptionContextProvider.initialize(
+ abfsConfiguration.getRawConfiguration(), accountName,
+ fileSystemName);
+ } else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() !=
null) {
+ if (abfsConfiguration.getEncodedClientProvidedEncryptionKeySHA() !=
null) {
+ } else {
+ throw new IOException(
+ "Encoded SHA256 hash must be provided for global encryption");
Review Comment:
make PathIOException and include uri of the store
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -188,8 +188,14 @@ public final class ConfigurationKeys {
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT =
"fs.azure.shellkeyprovider.script";
/** Setting this true will make the driver use it's own RemoteIterator
implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR =
"fs.azure.enable.abfslistiterator";
- /** Server side encryption key */
- public static final String FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY =
"fs.azure.client-provided-encryption-key";
+ /** Server side encryption key encoded in Base6format */
Review Comment:
1. add a `{@Value}` reference for the javadocs to insert it (and IDEs to
show it)
2. add a ".' at the end of the javadocs (here and any new ones) to stop some
versions of javadoc rejecting the comment
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/EncryptionContextProvider.java:
##########
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.extensions;
+
+import javax.security.auth.Destroyable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.security.ABFSKey;
+
+public interface EncryptionContextProvider extends Destroyable {
Review Comment:
add javadoc for interface; in the existing ones add a "." at the end of each
first sentence
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -779,6 +787,18 @@ public AbfsInputStream openFileForRead(Path path,
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
+ try {
Review Comment:
add a comment as to what is needed here
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -695,8 +701,17 @@ public boolean seekToNewSource(long l) throws IOException {
public synchronized void close() throws IOException {
LOG.debug("Closing {}", this);
closed = true;
- buffer = null; // de-reference the buffer so it can be GC'ed sooner
ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
+ try {
+ if (encryptionAdapter != null) {
+ encryptionAdapter.destroy();
+ }
+ } catch (DestroyFailedException e) {
+ throw new IOException(
+ "Could not destroy encryptionContext: " + e.getMessage());
+ } finally {
+ buffer = null; // de-reference the buffer so it can be GC'ed sooner
Review Comment:
move that above L705 and no need to add a finally clause
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncryptionAdapter.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Base64;
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.Destroyable;
+
+import org.apache.hadoop.util.Preconditions;
+
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+
+public class EncryptionAdapter implements Destroyable {
+ private final String path;
+ private ABFSKey encryptionContext;
+ private ABFSKey encryptionKey;
+ private final EncryptionContextProvider provider;
+
+ public EncryptionAdapter(EncryptionContextProvider provider, String path,
+ byte[] encryptionContext) throws IOException {
+ this(provider, path);
+ Preconditions.checkNotNull(encryptionContext,
+ "Encryption context should not be null.");
+ this.encryptionContext = new
ABFSKey(Base64.getDecoder().decode(encryptionContext));
+ Arrays.fill(encryptionContext, (byte) 0);
+ }
+
+ public EncryptionAdapter(EncryptionContextProvider provider, String path)
+ throws IOException {
+ this.provider = provider;
+ this.path = path;
+ }
+
+ private void computeKeys() throws IOException {
+ if (encryptionContext == null) {
+ encryptionContext = provider.getEncryptionContext(path);
+ }
+ Preconditions.checkNotNull(encryptionContext,
+ "Encryption context should not be null.");
+ if (encryptionKey == null) {
+ encryptionKey = provider.getEncryptionKey(path, encryptionContext);
+ }
+ Preconditions.checkNotNull(encryptionKey, "Encryption key should not be
null.");
+ }
+
+ public String getEncodedKey() throws IOException {
+ computeKeys();
+ return EncodingHelper.getBase64EncodedString(encryptionKey.getEncoded());
Review Comment:
why not make this a static import
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.security.EncodingHelper;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.MockEncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Lists;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ENCRYPTION_KEY_LEN;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static
org.apache.hadoop.fs.azurebfs.utils.EncryptionType.ENCRYPTION_CONTEXT;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.GLOBAL_KEY;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.NONE;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+@RunWith(Parameterized.class)
+public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
+ private final byte[] cpk = new byte[ENCRYPTION_KEY_LEN];
+ private final String cpkSHAEncoded;
+
+ // Encryption type used by filesystem while creating file
+ @Parameterized.Parameter
+ public EncryptionType fileEncryptionType;
+
+ // Encryption type used by filesystem to call different operations
+ @Parameterized.Parameter(1)
+ public EncryptionType requestEncryptionType;
+
+ @Parameterized.Parameter(2)
+ public FSOperationType operation;
+
+ @Parameterized.Parameter(3)
+ public boolean responseHeaderServerEnc;
+
+ @Parameterized.Parameter(4)
+ public boolean responseHeaderReqServerEnc;
+
+ @Parameterized.Parameter(5)
+ public boolean isExceptionCase;
+
+ @Parameterized.Parameter(6)
+ public boolean isCpkResponseHdrExpected;
+
+
+ @Parameterized.Parameters(name = "{0} mode, {2}")
+ public static Iterable<Object[]> params() {
+ return Arrays.asList(new Object[][] {
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.READ, true,
false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.WRITE, false,
true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.APPEND,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ACL,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.GET_ATTR,
true, false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ATTR,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.LISTSTATUS,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.RENAME,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.DELETE,
false, false, false, false},
+
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.WRITE, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.GET_ATTR, true, false,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.READ, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ATTR, false, true,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.RENAME, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.DELETE, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ACL, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_PERMISSION, false,
false, false, false},
+
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.READ, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.WRITE, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.APPEND, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.RENAME, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.DELETE, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.GET_ATTR, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ATTR, false, true, false,
true},
+
+ {GLOBAL_KEY, NONE, FSOperationType.READ, true, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.WRITE, false, true, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ATTR, false, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.RENAME, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.LISTSTATUS, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.DELETE, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_PERMISSION, false, false,
false, false},
+ });
+ }
+
+ public ITestAbfsCustomEncryption() throws Exception {
+ super();
+ Assume.assumeTrue("Account should be HNS enabled for CPK",
+ getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT,
+ false));
+ new Random().nextBytes(cpk);
+ cpkSHAEncoded = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ }
+
+ @Test
+ public void testCustomEncryptionCombinations() throws Exception {
+ AzureBlobFileSystem fs = getFS();
+ Path testPath = path("/testFile");
+ String relativePath = fs.getAbfsStore().getRelativePath(testPath);
+ MockEncryptionContextProvider ecp =
+ (MockEncryptionContextProvider) createEncryptedFile(testPath);
+ AbfsRestOperation op = callOperation(fs, new Path(relativePath), ecp);
+ if (op == null) {
+ return;
+ }
+ AbfsHttpOperation httpOp = op.getResult();
+ if (isCpkResponseHdrExpected) {
+ if (requestEncryptionType == ENCRYPTION_CONTEXT) {
+ String encryptionContext =
ecp.getEncryptionContextForTest(relativePath);
+ String expectedKeySHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(
+ ecp.getEncryptionKeyForTest(encryptionContext)));
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(expectedKeySHA);
+ } else { // GLOBAL_KEY
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(cpkSHAEncoded);
+ }
+ } else {
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(null);
+ }
+ Assertions.assertThat(httpOp.getResponseHeader(X_MS_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderServerEnc? "true" : null);
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_REQUEST_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderReqServerEnc? "true" : null);
+ }
+
+ /**
+ * Executes a given operation at the AbfsClient level and returns
+ * AbfsRestOperation instance to verify response headers. Asserts excetion
+ * for combinations that should not succeed.
+ * @param fs AzureBlobFileSystem instance
+ * @param testPath path of file
+ * @param ecp EncryptionContextProvider instance to support AbfsClient
methods
+ * @return Rest op or null depending on whether the request is allowed
+ * @throws Exception error
+ */
+ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
+ Path testPath, EncryptionContextProvider ecp)
+ throws Exception {
+ AbfsClient client = fs.getAbfsClient();
+ client.setEncryptionContextProvider(ecp);
+ if (isExceptionCase) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ switch (operation) {
+ case WRITE: try (FSDataOutputStream out = fs.append(testPath)) {
+ out.write("bytes".getBytes());
+ }
+ break;
+ case READ: try (FSDataInputStream in = fs.open(testPath)) {
+ in.read(new byte[5]);
+ }
+ break;
+ case SET_ATTR: fs.setXAttr(testPath, "attribute", "value".getBytes());
+ break;
+ case GET_ATTR: fs.getXAttr(testPath, "attribute");
Review Comment:
add break;
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.security.EncodingHelper;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.MockEncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Lists;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ENCRYPTION_KEY_LEN;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static
org.apache.hadoop.fs.azurebfs.utils.EncryptionType.ENCRYPTION_CONTEXT;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.GLOBAL_KEY;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.NONE;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+@RunWith(Parameterized.class)
+public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
+ private final byte[] cpk = new byte[ENCRYPTION_KEY_LEN];
+ private final String cpkSHAEncoded;
+
+ // Encryption type used by filesystem while creating file
+ @Parameterized.Parameter
+ public EncryptionType fileEncryptionType;
+
+ // Encryption type used by filesystem to call different operations
+ @Parameterized.Parameter(1)
+ public EncryptionType requestEncryptionType;
+
+ @Parameterized.Parameter(2)
+ public FSOperationType operation;
+
+ @Parameterized.Parameter(3)
+ public boolean responseHeaderServerEnc;
+
+ @Parameterized.Parameter(4)
+ public boolean responseHeaderReqServerEnc;
+
+ @Parameterized.Parameter(5)
+ public boolean isExceptionCase;
+
+ @Parameterized.Parameter(6)
+ public boolean isCpkResponseHdrExpected;
+
+
+ @Parameterized.Parameters(name = "{0} mode, {2}")
+ public static Iterable<Object[]> params() {
+ return Arrays.asList(new Object[][] {
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.READ, true,
false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.WRITE, false,
true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.APPEND,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ACL,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.GET_ATTR,
true, false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ATTR,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.LISTSTATUS,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.RENAME,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.DELETE,
false, false, false, false},
+
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.WRITE, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.GET_ATTR, true, false,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.READ, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ATTR, false, true,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.RENAME, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.DELETE, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ACL, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_PERMISSION, false,
false, false, false},
+
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.READ, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.WRITE, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.APPEND, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.RENAME, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.DELETE, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.GET_ATTR, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ATTR, false, true, false,
true},
+
+ {GLOBAL_KEY, NONE, FSOperationType.READ, true, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.WRITE, false, true, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ATTR, false, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.RENAME, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.LISTSTATUS, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.DELETE, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_PERMISSION, false, false,
false, false},
+ });
+ }
+
+ public ITestAbfsCustomEncryption() throws Exception {
+ super();
+ Assume.assumeTrue("Account should be HNS enabled for CPK",
+ getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT,
+ false));
+ new Random().nextBytes(cpk);
+ cpkSHAEncoded = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ }
+
+ @Test
+ public void testCustomEncryptionCombinations() throws Exception {
+ AzureBlobFileSystem fs = getFS();
+ Path testPath = path("/testFile");
+ String relativePath = fs.getAbfsStore().getRelativePath(testPath);
+ MockEncryptionContextProvider ecp =
+ (MockEncryptionContextProvider) createEncryptedFile(testPath);
+ AbfsRestOperation op = callOperation(fs, new Path(relativePath), ecp);
+ if (op == null) {
+ return;
+ }
+ AbfsHttpOperation httpOp = op.getResult();
+ if (isCpkResponseHdrExpected) {
+ if (requestEncryptionType == ENCRYPTION_CONTEXT) {
+ String encryptionContext =
ecp.getEncryptionContextForTest(relativePath);
+ String expectedKeySHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(
+ ecp.getEncryptionKeyForTest(encryptionContext)));
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(expectedKeySHA);
+ } else { // GLOBAL_KEY
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(cpkSHAEncoded);
+ }
+ } else {
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(null);
+ }
+ Assertions.assertThat(httpOp.getResponseHeader(X_MS_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderServerEnc? "true" : null);
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_REQUEST_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderReqServerEnc? "true" : null);
+ }
+
+ /**
+ * Executes a given operation at the AbfsClient level and returns
+ * AbfsRestOperation instance to verify response headers. Asserts excetion
+ * for combinations that should not succeed.
+ * @param fs AzureBlobFileSystem instance
+ * @param testPath path of file
+ * @param ecp EncryptionContextProvider instance to support AbfsClient
methods
+ * @return Rest op or null depending on whether the request is allowed
+ * @throws Exception error
+ */
+ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
+ Path testPath, EncryptionContextProvider ecp)
+ throws Exception {
+ AbfsClient client = fs.getAbfsClient();
+ client.setEncryptionContextProvider(ecp);
+ if (isExceptionCase) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ switch (operation) {
+ case WRITE: try (FSDataOutputStream out = fs.append(testPath)) {
+ out.write("bytes".getBytes());
+ }
+ break;
+ case READ: try (FSDataInputStream in = fs.open(testPath)) {
+ in.read(new byte[5]);
+ }
+ break;
+ case SET_ATTR: fs.setXAttr(testPath, "attribute", "value".getBytes());
+ break;
+ case GET_ATTR: fs.getXAttr(testPath, "attribute");
+ default: throw new NoSuchFieldException();
+ }
+ });
+ return null;
+ } else {
+ EncryptionAdapter encryptionAdapter = null;
+ if (fileEncryptionType == ENCRYPTION_CONTEXT) {
+ encryptionAdapter = new EncryptionAdapter(ecp,
+ fs.getAbfsStore().getRelativePath(testPath),
+ Base64.getEncoder().encode(
+ ((MockEncryptionContextProvider)
ecp).getEncryptionContextForTest(testPath.toString())
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+ String path = testPath.toString();
+ switch (operation) {
+ case READ:
+ TracingContext tracingContext = getTestTracingContext(fs, true);
+ AbfsHttpOperation statusOp = client.getPathStatus(path, false,
+ tracingContext).getResult();
+ return client.read(path, 0, new byte[5], 0, 5,
statusOp.getResponseHeader(HttpHeaderConfigurations.ETAG),
+ null, encryptionAdapter, tracingContext);
+ case WRITE: return client.flush(path, 3, false, false, null,
Review Comment:
can you put the return on the next line; this is a complicated method
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -855,6 +876,15 @@ public OutputStream openFileForWrite(final Path path,
}
AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
+ EncryptionAdapter encryptionAdapter = null;
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
+ byte[] encryptionContext = op.getResult()
Review Comment:
will this ever raise an NPE?
maybe you could factor out the code to go from a response to an
encryptionAdapter, (in client?), use it in both places and have unified error
handling. this would then be testable in a unit test
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/ABFSKey.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import javax.crypto.SecretKey;
+import java.util.Arrays;
+
+public class ABFSKey implements SecretKey {
+ private byte[] bytes;
+
+ public ABFSKey(byte[] bytes) {
+ if (bytes != null) {
Review Comment:
style: indent with two spaces in this class
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -794,13 +814,13 @@ public AbfsInputStream openFileForRead(Path path,
// Add statistics for InputStream
return new AbfsInputStream(client, statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
- parameters.map(OpenFileParameters::getOptions)),
+ parameters.map(OpenFileParameters::getOptions), encryptionAdapter),
Review Comment:
nit, put on a new line
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -1666,7 +1715,8 @@ private boolean parseIsDirectory(final String
resourceType) {
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
}
- private String convertXmsPropertiesToCommaSeparatedString(final
Hashtable<String, String> properties) throws
+ public String convertXmsPropertiesToCommaSeparatedString(final
Hashtable<String,
Review Comment:
why are you making public? for testing, or for production. either way,
1. time for some javadocs
2. add a `@VisibleForTesting` if needed
3. make the parameter a Map rather than Hashtable
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -1616,16 +1647,39 @@ private void initializeClient(URI uri, String
fileSystemName,
abfsConfiguration.getRawConfiguration());
}
+ // Encryption setup
+ EncryptionContextProvider encryptionContextProvider = null;
+ if (isSecure) {
+ encryptionContextProvider =
+ abfsConfiguration.createEncryptionContextProvider();
+ if (encryptionContextProvider != null) {
+ if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null)
{
+ throw new IOException(
+ "Both global key and encryption context are set, only one
allowed");
+ }
+ encryptionContextProvider.initialize(
+ abfsConfiguration.getRawConfiguration(), accountName,
+ fileSystemName);
+ } else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() !=
null) {
+ if (abfsConfiguration.getEncodedClientProvidedEncryptionKeySHA() !=
null) {
+ } else {
Review Comment:
is some code planned here? If not, invert the condition and merge with the
one above for a simpler execution path
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -779,6 +787,18 @@ public AbfsInputStream openFileForRead(Path path,
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
+ try {
+ encryptionAdapter = new EncryptionAdapter(
+ client.getEncryptionContextProvider(), getRelativePath(path),
+
op.getResponseHeader(HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT)
Review Comment:
why not make this an optional field of VersionedFileStatus, and on a
getFileStatus() call attach it if present. this would allow for the
openFile().withFileStatus() call to work, which is something we are trying to
get into parquet, avro etc, to save time opening files
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.security.EncodingHelper;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.MockEncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Lists;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ENCRYPTION_KEY_LEN;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static
org.apache.hadoop.fs.azurebfs.utils.EncryptionType.ENCRYPTION_CONTEXT;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.GLOBAL_KEY;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.NONE;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+@RunWith(Parameterized.class)
+public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
+ private final byte[] cpk = new byte[ENCRYPTION_KEY_LEN];
+ private final String cpkSHAEncoded;
+
+ // Encryption type used by filesystem while creating file
+ @Parameterized.Parameter
+ public EncryptionType fileEncryptionType;
+
+ // Encryption type used by filesystem to call different operations
+ @Parameterized.Parameter(1)
+ public EncryptionType requestEncryptionType;
+
+ @Parameterized.Parameter(2)
+ public FSOperationType operation;
+
+ @Parameterized.Parameter(3)
+ public boolean responseHeaderServerEnc;
+
+ @Parameterized.Parameter(4)
+ public boolean responseHeaderReqServerEnc;
+
+ @Parameterized.Parameter(5)
+ public boolean isExceptionCase;
+
+ @Parameterized.Parameter(6)
+ public boolean isCpkResponseHdrExpected;
+
+
+ @Parameterized.Parameters(name = "{0} mode, {2}")
+ public static Iterable<Object[]> params() {
+ return Arrays.asList(new Object[][] {
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.READ, true,
false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.WRITE, false,
true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.APPEND,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ACL,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.GET_ATTR,
true, false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ATTR,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.LISTSTATUS,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.RENAME,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.DELETE,
false, false, false, false},
+
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.WRITE, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.GET_ATTR, true, false,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.READ, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ATTR, false, true,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.RENAME, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.DELETE, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ACL, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_PERMISSION, false,
false, false, false},
+
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.READ, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.WRITE, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.APPEND, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.RENAME, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.DELETE, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.GET_ATTR, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ATTR, false, true, false,
true},
+
+ {GLOBAL_KEY, NONE, FSOperationType.READ, true, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.WRITE, false, true, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ATTR, false, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.RENAME, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.LISTSTATUS, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.DELETE, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_PERMISSION, false, false,
false, false},
+ });
+ }
+
+ public ITestAbfsCustomEncryption() throws Exception {
+ super();
Review Comment:
not needed
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.security.EncodingHelper;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.MockEncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Lists;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ENCRYPTION_KEY_LEN;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static
org.apache.hadoop.fs.azurebfs.utils.EncryptionType.ENCRYPTION_CONTEXT;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.GLOBAL_KEY;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.NONE;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+@RunWith(Parameterized.class)
+public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
+ private final byte[] cpk = new byte[ENCRYPTION_KEY_LEN];
+ private final String cpkSHAEncoded;
+
+ // Encryption type used by filesystem while creating file
+ @Parameterized.Parameter
+ public EncryptionType fileEncryptionType;
+
+ // Encryption type used by filesystem to call different operations
+ @Parameterized.Parameter(1)
+ public EncryptionType requestEncryptionType;
+
+ @Parameterized.Parameter(2)
+ public FSOperationType operation;
+
+ @Parameterized.Parameter(3)
+ public boolean responseHeaderServerEnc;
+
+ @Parameterized.Parameter(4)
+ public boolean responseHeaderReqServerEnc;
+
+ @Parameterized.Parameter(5)
+ public boolean isExceptionCase;
+
+ @Parameterized.Parameter(6)
+ public boolean isCpkResponseHdrExpected;
+
+
+ @Parameterized.Parameters(name = "{0} mode, {2}")
+ public static Iterable<Object[]> params() {
+ return Arrays.asList(new Object[][] {
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.READ, true,
false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.WRITE, false,
true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.APPEND,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ACL,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.GET_ATTR,
true, false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ATTR,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.LISTSTATUS,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.RENAME,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.DELETE,
false, false, false, false},
+
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.WRITE, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.GET_ATTR, true, false,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.READ, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ATTR, false, true,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.RENAME, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.DELETE, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ACL, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_PERMISSION, false,
false, false, false},
+
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.READ, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.WRITE, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.APPEND, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.RENAME, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.DELETE, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.GET_ATTR, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ATTR, false, true, false,
true},
+
+ {GLOBAL_KEY, NONE, FSOperationType.READ, true, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.WRITE, false, true, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ATTR, false, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.RENAME, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.LISTSTATUS, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.DELETE, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_PERMISSION, false, false,
false, false},
+ });
+ }
+
+ public ITestAbfsCustomEncryption() throws Exception {
+ super();
+ Assume.assumeTrue("Account should be HNS enabled for CPK",
+ getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT,
+ false));
+ new Random().nextBytes(cpk);
+ cpkSHAEncoded = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ }
+
+ @Test
+ public void testCustomEncryptionCombinations() throws Exception {
+ AzureBlobFileSystem fs = getFS();
+ Path testPath = path("/testFile");
+ String relativePath = fs.getAbfsStore().getRelativePath(testPath);
+ MockEncryptionContextProvider ecp =
+ (MockEncryptionContextProvider) createEncryptedFile(testPath);
+ AbfsRestOperation op = callOperation(fs, new Path(relativePath), ecp);
+ if (op == null) {
+ return;
+ }
+ AbfsHttpOperation httpOp = op.getResult();
+ if (isCpkResponseHdrExpected) {
+ if (requestEncryptionType == ENCRYPTION_CONTEXT) {
+ String encryptionContext =
ecp.getEncryptionContextForTest(relativePath);
+ String expectedKeySHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(
+ ecp.getEncryptionKeyForTest(encryptionContext)));
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(expectedKeySHA);
+ } else { // GLOBAL_KEY
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(cpkSHAEncoded);
+ }
+ } else {
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(null);
+ }
+ Assertions.assertThat(httpOp.getResponseHeader(X_MS_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderServerEnc? "true" : null);
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_REQUEST_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderReqServerEnc? "true" : null);
+ }
+
+ /**
+ * Executes a given operation at the AbfsClient level and returns
+ * AbfsRestOperation instance to verify response headers. Asserts excetion
+ * for combinations that should not succeed.
+ * @param fs AzureBlobFileSystem instance
+ * @param testPath path of file
+ * @param ecp EncryptionContextProvider instance to support AbfsClient
methods
+ * @return Rest op or null depending on whether the request is allowed
+ * @throws Exception error
+ */
+ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
+ Path testPath, EncryptionContextProvider ecp)
+ throws Exception {
+ AbfsClient client = fs.getAbfsClient();
+ client.setEncryptionContextProvider(ecp);
+ if (isExceptionCase) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ switch (operation) {
+ case WRITE: try (FSDataOutputStream out = fs.append(testPath)) {
+ out.write("bytes".getBytes());
+ }
+ break;
+ case READ: try (FSDataInputStream in = fs.open(testPath)) {
+ in.read(new byte[5]);
+ }
+ break;
+ case SET_ATTR: fs.setXAttr(testPath, "attribute", "value".getBytes());
+ break;
+ case GET_ATTR: fs.getXAttr(testPath, "attribute");
+ default: throw new NoSuchFieldException();
+ }
+ });
+ return null;
+ } else {
+ EncryptionAdapter encryptionAdapter = null;
+ if (fileEncryptionType == ENCRYPTION_CONTEXT) {
+ encryptionAdapter = new EncryptionAdapter(ecp,
+ fs.getAbfsStore().getRelativePath(testPath),
+ Base64.getEncoder().encode(
+ ((MockEncryptionContextProvider)
ecp).getEncryptionContextForTest(testPath.toString())
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+ String path = testPath.toString();
+ switch (operation) {
+ case READ:
+ TracingContext tracingContext = getTestTracingContext(fs, true);
+ AbfsHttpOperation statusOp = client.getPathStatus(path, false,
+ tracingContext).getResult();
+ return client.read(path, 0, new byte[5], 0, 5,
statusOp.getResponseHeader(HttpHeaderConfigurations.ETAG),
+ null, encryptionAdapter, tracingContext);
+ case WRITE: return client.flush(path, 3, false, false, null,
+ null, encryptionAdapter, getTestTracingContext(fs, false));
+ case APPEND: return client.append(path, "val".getBytes(),
+ new AppendRequestParameters(3, 0, 3, APPEND_MODE, false, null),
+ null, encryptionAdapter, getTestTracingContext(fs, false));
+ case SET_ACL: return client.setAcl(path, AclEntry.aclSpecToString(
+ Lists.newArrayList(aclEntry(ACCESS, USER, ALL))),
+ getTestTracingContext(fs, false));
+ case LISTSTATUS: return client.listPath(path, false, 5, null,
+ getTestTracingContext(fs, true));
+ case RENAME: return client.renamePath(path, new Path(path +
"_2").toString(),
+ null, getTestTracingContext(fs, true), null, false).getOp();
+ case DELETE: return client.deletePath(path, false, null,
+ getTestTracingContext(fs, false));
+ case GET_ATTR: return client.getPathStatus(path, true,
+ getTestTracingContext(fs, false));
+ case SET_ATTR:
+ Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("key", "{ value: valueTest }");
+ return client.setPathProperties(path, fs.getAbfsStore()
+ .convertXmsPropertiesToCommaSeparatedString(properties),
+ getTestTracingContext(fs, false));
+ case SET_PERMISSION:
+ return client.setPermission(path, FsPermission.getDefault().toString(),
+ getTestTracingContext(fs, false));
+ default: throw new NoSuchFieldException();
+ }
+ }
+ }
+
+ private AzureBlobFileSystem getECProviderEnabledFS() throws Exception {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE + "."
+ + getAccountName(),
MockEncryptionContextProvider.class.getCanonicalName());
+ configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY + "."
+ + getAccountName());
+ configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA +
"."
+ + getAccountName());
+ return (AzureBlobFileSystem) FileSystem.newInstance(configuration);
+ }
+
+ private AzureBlobFileSystem getCPKEnabledFS() throws IOException {
+ Configuration conf = getRawConfiguration();
+ String cpkEncoded = EncodingHelper.getBase64EncodedString(cpk);
+ String cpkEncodedSHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY + "."
+ + getAccountName(), cpkEncoded);
+ conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA + "."
+ + getAccountName(), cpkEncodedSHA);
+ conf.unset(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+
+ private AzureBlobFileSystem getFS() throws Exception {
+ if (getFileSystem().getAbfsClient().getEncryptionType() ==
requestEncryptionType) {
+ return getFileSystem();
+ }
+ if (requestEncryptionType == ENCRYPTION_CONTEXT) {
+ return getECProviderEnabledFS();
+ } else if (requestEncryptionType == GLOBAL_KEY) {
+ return getCPKEnabledFS();
+ } else {
+ Configuration conf = getRawConfiguration();
+ conf.unset(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+ }
+
+ private EncryptionContextProvider createEncryptedFile(Path testPath) throws
Exception {
+ AzureBlobFileSystem fs;
+ if (getFileSystem().getAbfsClient().getEncryptionType() ==
fileEncryptionType) {
+ fs = getFileSystem();
+ } else {
+ fs = fileEncryptionType == ENCRYPTION_CONTEXT
+ ? getECProviderEnabledFS()
+ : getCPKEnabledFS();
+ }
+ String relativePath = fs.getAbfsStore().getRelativePath(testPath);
+ try (FSDataOutputStream out = fs.create(new Path(relativePath))) {
+ out.write("123".getBytes());
+ }
+ // verify file is encrypted by calling getPathStatus (with properties)
+ // without encryption headers in request
+ if (fileEncryptionType != EncryptionType.NONE) {
+ fs.getAbfsClient().setEncryptionType(EncryptionType.NONE);
+ LambdaTestUtils.intercept(IOException.class, () -> fs.getAbfsClient()
Review Comment:
can you split up the args to getPathStatus so each one is on its own line
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -64,10 +66,11 @@
import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
-import org.apache.hadoop.io.IOUtils;
Review Comment:
restore
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncryptionAdapter.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Base64;
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.Destroyable;
+
+import org.apache.hadoop.util.Preconditions;
+
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+
+public class EncryptionAdapter implements Destroyable {
+ private final String path;
+ private ABFSKey encryptionContext;
Review Comment:
should be final; set to null in second constructor
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -87,31 +90,35 @@ public class AbfsClient implements Closeable {
private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
- private final String xMsVersion = "2019-12-12";
+ private String xMsVersion = "2019-12-12";
Review Comment:
this and the value in L133 should be constants somewhere
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -1616,16 +1647,39 @@ private void initializeClient(URI uri, String
fileSystemName,
abfsConfiguration.getRawConfiguration());
}
+ // Encryption setup
+ EncryptionContextProvider encryptionContextProvider = null;
+ if (isSecure) {
+ encryptionContextProvider =
+ abfsConfiguration.createEncryptionContextProvider();
+ if (encryptionContextProvider != null) {
+ if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null)
{
+ throw new IOException(
Review Comment:
make PathIOException and include uri of the store
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -39,7 +36,11 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.store.LogExactlyOnce;
-import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.fs.Path;
Review Comment:
move down to line 56 block; this hadoop block is only here as a result of
the move off guava and very fussy in backporting
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.security.EncodingHelper;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.MockEncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Lists;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ENCRYPTION_KEY_LEN;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static
org.apache.hadoop.fs.azurebfs.utils.EncryptionType.ENCRYPTION_CONTEXT;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.GLOBAL_KEY;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.NONE;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+@RunWith(Parameterized.class)
+public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
+ private final byte[] cpk = new byte[ENCRYPTION_KEY_LEN];
+ private final String cpkSHAEncoded;
+
+ // Encryption type used by filesystem while creating file
+ @Parameterized.Parameter
+ public EncryptionType fileEncryptionType;
+
+ // Encryption type used by filesystem to call different operations
+ @Parameterized.Parameter(1)
+ public EncryptionType requestEncryptionType;
+
+ @Parameterized.Parameter(2)
+ public FSOperationType operation;
+
+ @Parameterized.Parameter(3)
+ public boolean responseHeaderServerEnc;
+
+ @Parameterized.Parameter(4)
+ public boolean responseHeaderReqServerEnc;
+
+ @Parameterized.Parameter(5)
+ public boolean isExceptionCase;
+
+ @Parameterized.Parameter(6)
+ public boolean isCpkResponseHdrExpected;
+
+
+ @Parameterized.Parameters(name = "{0} mode, {2}")
+ public static Iterable<Object[]> params() {
+ return Arrays.asList(new Object[][] {
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.READ, true,
false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.WRITE, false,
true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.APPEND,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ACL,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.GET_ATTR,
true, false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ATTR,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.LISTSTATUS,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.RENAME,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.DELETE,
false, false, false, false},
+
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.WRITE, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.GET_ATTR, true, false,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.READ, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ATTR, false, true,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.RENAME, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.DELETE, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ACL, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_PERMISSION, false,
false, false, false},
+
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.READ, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.WRITE, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.APPEND, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.RENAME, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.DELETE, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.GET_ATTR, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ATTR, false, true, false,
true},
+
+ {GLOBAL_KEY, NONE, FSOperationType.READ, true, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.WRITE, false, true, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ATTR, false, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.RENAME, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.LISTSTATUS, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.DELETE, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_PERMISSION, false, false,
false, false},
+ });
+ }
+
+ public ITestAbfsCustomEncryption() throws Exception {
+ super();
+ Assume.assumeTrue("Account should be HNS enabled for CPK",
+ getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT,
+ false));
+ new Random().nextBytes(cpk);
+ cpkSHAEncoded = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ }
+
+ @Test
+ public void testCustomEncryptionCombinations() throws Exception {
+ AzureBlobFileSystem fs = getFS();
+ Path testPath = path("/testFile");
+ String relativePath = fs.getAbfsStore().getRelativePath(testPath);
+ MockEncryptionContextProvider ecp =
+ (MockEncryptionContextProvider) createEncryptedFile(testPath);
+ AbfsRestOperation op = callOperation(fs, new Path(relativePath), ecp);
+ if (op == null) {
+ return;
+ }
+ AbfsHttpOperation httpOp = op.getResult();
+ if (isCpkResponseHdrExpected) {
+ if (requestEncryptionType == ENCRYPTION_CONTEXT) {
+ String encryptionContext =
ecp.getEncryptionContextForTest(relativePath);
+ String expectedKeySHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(
+ ecp.getEncryptionKeyForTest(encryptionContext)));
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(expectedKeySHA);
+ } else { // GLOBAL_KEY
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(cpkSHAEncoded);
+ }
+ } else {
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(null);
+ }
+ Assertions.assertThat(httpOp.getResponseHeader(X_MS_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderServerEnc? "true" : null);
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_REQUEST_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderReqServerEnc? "true" : null);
+ }
+
+ /**
+ * Executes a given operation at the AbfsClient level and returns
+ * AbfsRestOperation instance to verify response headers. Asserts excetion
+ * for combinations that should not succeed.
+ * @param fs AzureBlobFileSystem instance
+ * @param testPath path of file
+ * @param ecp EncryptionContextProvider instance to support AbfsClient
methods
+ * @return Rest op or null depending on whether the request is allowed
+ * @throws Exception error
+ */
+ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
+ Path testPath, EncryptionContextProvider ecp)
+ throws Exception {
+ AbfsClient client = fs.getAbfsClient();
+ client.setEncryptionContextProvider(ecp);
+ if (isExceptionCase) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ switch (operation) {
+ case WRITE: try (FSDataOutputStream out = fs.append(testPath)) {
+ out.write("bytes".getBytes());
+ }
+ break;
+ case READ: try (FSDataInputStream in = fs.open(testPath)) {
+ in.read(new byte[5]);
+ }
+ break;
+ case SET_ATTR: fs.setXAttr(testPath, "attribute", "value".getBytes());
+ break;
+ case GET_ATTR: fs.getXAttr(testPath, "attribute");
+ default: throw new NoSuchFieldException();
+ }
+ });
+ return null;
+ } else {
+ EncryptionAdapter encryptionAdapter = null;
+ if (fileEncryptionType == ENCRYPTION_CONTEXT) {
+ encryptionAdapter = new EncryptionAdapter(ecp,
+ fs.getAbfsStore().getRelativePath(testPath),
+ Base64.getEncoder().encode(
+ ((MockEncryptionContextProvider)
ecp).getEncryptionContextForTest(testPath.toString())
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+ String path = testPath.toString();
+ switch (operation) {
+ case READ:
+ TracingContext tracingContext = getTestTracingContext(fs, true);
+ AbfsHttpOperation statusOp = client.getPathStatus(path, false,
+ tracingContext).getResult();
+ return client.read(path, 0, new byte[5], 0, 5,
statusOp.getResponseHeader(HttpHeaderConfigurations.ETAG),
+ null, encryptionAdapter, tracingContext);
+ case WRITE: return client.flush(path, 3, false, false, null,
+ null, encryptionAdapter, getTestTracingContext(fs, false));
+ case APPEND: return client.append(path, "val".getBytes(),
+ new AppendRequestParameters(3, 0, 3, APPEND_MODE, false, null),
+ null, encryptionAdapter, getTestTracingContext(fs, false));
+ case SET_ACL: return client.setAcl(path, AclEntry.aclSpecToString(
+ Lists.newArrayList(aclEntry(ACCESS, USER, ALL))),
+ getTestTracingContext(fs, false));
+ case LISTSTATUS: return client.listPath(path, false, 5, null,
+ getTestTracingContext(fs, true));
+ case RENAME: return client.renamePath(path, new Path(path +
"_2").toString(),
+ null, getTestTracingContext(fs, true), null, false).getOp();
+ case DELETE: return client.deletePath(path, false, null,
+ getTestTracingContext(fs, false));
+ case GET_ATTR: return client.getPathStatus(path, true,
+ getTestTracingContext(fs, false));
+ case SET_ATTR:
+ Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("key", "{ value: valueTest }");
+ return client.setPathProperties(path, fs.getAbfsStore()
+ .convertXmsPropertiesToCommaSeparatedString(properties),
+ getTestTracingContext(fs, false));
+ case SET_PERMISSION:
+ return client.setPermission(path, FsPermission.getDefault().toString(),
+ getTestTracingContext(fs, false));
+ default: throw new NoSuchFieldException();
+ }
+ }
+ }
+
+ private AzureBlobFileSystem getECProviderEnabledFS() throws Exception {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE + "."
+ + getAccountName(),
MockEncryptionContextProvider.class.getCanonicalName());
+ configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY + "."
+ + getAccountName());
+ configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA +
"."
+ + getAccountName());
+ return (AzureBlobFileSystem) FileSystem.newInstance(configuration);
+ }
+
+ private AzureBlobFileSystem getCPKEnabledFS() throws IOException {
+ Configuration conf = getRawConfiguration();
+ String cpkEncoded = EncodingHelper.getBase64EncodedString(cpk);
+ String cpkEncodedSHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY + "."
+ + getAccountName(), cpkEncoded);
+ conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA + "."
+ + getAccountName(), cpkEncodedSHA);
+ conf.unset(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+
+ private AzureBlobFileSystem getFS() throws Exception {
+ if (getFileSystem().getAbfsClient().getEncryptionType() ==
requestEncryptionType) {
+ return getFileSystem();
+ }
+ if (requestEncryptionType == ENCRYPTION_CONTEXT) {
+ return getECProviderEnabledFS();
+ } else if (requestEncryptionType == GLOBAL_KEY) {
+ return getCPKEnabledFS();
+ } else {
+ Configuration conf = getRawConfiguration();
+ conf.unset(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+ }
+
+ private EncryptionContextProvider createEncryptedFile(Path testPath) throws
Exception {
Review Comment:
javadocs
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/ABFSKey.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import javax.crypto.SecretKey;
+import java.util.Arrays;
+
+public class ABFSKey implements SecretKey {
Review Comment:
make final, add javadocs, etc.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.security.EncodingHelper;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.MockEncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Lists;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ENCRYPTION_KEY_LEN;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static
org.apache.hadoop.fs.azurebfs.utils.EncryptionType.ENCRYPTION_CONTEXT;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.GLOBAL_KEY;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.NONE;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+@RunWith(Parameterized.class)
+public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
+ private final byte[] cpk = new byte[ENCRYPTION_KEY_LEN];
+ private final String cpkSHAEncoded;
+
+ // Encryption type used by filesystem while creating file
+ @Parameterized.Parameter
+ public EncryptionType fileEncryptionType;
+
+ // Encryption type used by filesystem to call different operations
+ @Parameterized.Parameter(1)
+ public EncryptionType requestEncryptionType;
+
+ @Parameterized.Parameter(2)
+ public FSOperationType operation;
+
+ @Parameterized.Parameter(3)
+ public boolean responseHeaderServerEnc;
+
+ @Parameterized.Parameter(4)
+ public boolean responseHeaderReqServerEnc;
+
+ @Parameterized.Parameter(5)
+ public boolean isExceptionCase;
+
+ @Parameterized.Parameter(6)
+ public boolean isCpkResponseHdrExpected;
+
+
+ @Parameterized.Parameters(name = "{0} mode, {2}")
+ public static Iterable<Object[]> params() {
+ return Arrays.asList(new Object[][] {
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.READ, true,
false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.WRITE, false,
true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.APPEND,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ACL,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.GET_ATTR,
true, false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ATTR,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.LISTSTATUS,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.RENAME,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.DELETE,
false, false, false, false},
+
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.WRITE, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.GET_ATTR, true, false,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.READ, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ATTR, false, true,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.RENAME, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.DELETE, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ACL, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_PERMISSION, false,
false, false, false},
+
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.READ, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.WRITE, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.APPEND, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.RENAME, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.DELETE, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.GET_ATTR, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ATTR, false, true, false,
true},
+
+ {GLOBAL_KEY, NONE, FSOperationType.READ, true, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.WRITE, false, true, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ATTR, false, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.RENAME, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.LISTSTATUS, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.DELETE, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_PERMISSION, false, false,
false, false},
+ });
+ }
+
+ public ITestAbfsCustomEncryption() throws Exception {
+ super();
+ Assume.assumeTrue("Account should be HNS enabled for CPK",
+ getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT,
+ false));
+ new Random().nextBytes(cpk);
+ cpkSHAEncoded = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ }
+
+ @Test
+ public void testCustomEncryptionCombinations() throws Exception {
+ AzureBlobFileSystem fs = getFS();
+ Path testPath = path("/testFile");
+ String relativePath = fs.getAbfsStore().getRelativePath(testPath);
+ MockEncryptionContextProvider ecp =
+ (MockEncryptionContextProvider) createEncryptedFile(testPath);
+ AbfsRestOperation op = callOperation(fs, new Path(relativePath), ecp);
+ if (op == null) {
+ return;
+ }
+ AbfsHttpOperation httpOp = op.getResult();
+ if (isCpkResponseHdrExpected) {
+ if (requestEncryptionType == ENCRYPTION_CONTEXT) {
+ String encryptionContext =
ecp.getEncryptionContextForTest(relativePath);
+ String expectedKeySHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(
+ ecp.getEncryptionKeyForTest(encryptionContext)));
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(expectedKeySHA);
+ } else { // GLOBAL_KEY
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(cpkSHAEncoded);
+ }
+ } else {
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(null);
+ }
+ Assertions.assertThat(httpOp.getResponseHeader(X_MS_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderServerEnc? "true" : null);
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_REQUEST_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderReqServerEnc? "true" : null);
+ }
+
+ /**
+ * Executes a given operation at the AbfsClient level and returns
+ * AbfsRestOperation instance to verify response headers. Asserts excetion
+ * for combinations that should not succeed.
+ * @param fs AzureBlobFileSystem instance
+ * @param testPath path of file
+ * @param ecp EncryptionContextProvider instance to support AbfsClient
methods
+ * @return Rest op or null depending on whether the request is allowed
+ * @throws Exception error
+ */
+ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
+ Path testPath, EncryptionContextProvider ecp)
+ throws Exception {
+ AbfsClient client = fs.getAbfsClient();
+ client.setEncryptionContextProvider(ecp);
+ if (isExceptionCase) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ switch (operation) {
+ case WRITE: try (FSDataOutputStream out = fs.append(testPath)) {
+ out.write("bytes".getBytes());
+ }
+ break;
+ case READ: try (FSDataInputStream in = fs.open(testPath)) {
+ in.read(new byte[5]);
+ }
+ break;
+ case SET_ATTR: fs.setXAttr(testPath, "attribute", "value".getBytes());
+ break;
+ case GET_ATTR: fs.getXAttr(testPath, "attribute");
+ default: throw new NoSuchFieldException();
+ }
+ });
+ return null;
+ } else {
+ EncryptionAdapter encryptionAdapter = null;
+ if (fileEncryptionType == ENCRYPTION_CONTEXT) {
+ encryptionAdapter = new EncryptionAdapter(ecp,
+ fs.getAbfsStore().getRelativePath(testPath),
+ Base64.getEncoder().encode(
+ ((MockEncryptionContextProvider)
ecp).getEncryptionContextForTest(testPath.toString())
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+ String path = testPath.toString();
+ switch (operation) {
+ case READ:
+ TracingContext tracingContext = getTestTracingContext(fs, true);
+ AbfsHttpOperation statusOp = client.getPathStatus(path, false,
+ tracingContext).getResult();
+ return client.read(path, 0, new byte[5], 0, 5,
statusOp.getResponseHeader(HttpHeaderConfigurations.ETAG),
+ null, encryptionAdapter, tracingContext);
+ case WRITE: return client.flush(path, 3, false, false, null,
+ null, encryptionAdapter, getTestTracingContext(fs, false));
+ case APPEND: return client.append(path, "val".getBytes(),
+ new AppendRequestParameters(3, 0, 3, APPEND_MODE, false, null),
+ null, encryptionAdapter, getTestTracingContext(fs, false));
+ case SET_ACL: return client.setAcl(path, AclEntry.aclSpecToString(
+ Lists.newArrayList(aclEntry(ACCESS, USER, ALL))),
+ getTestTracingContext(fs, false));
+ case LISTSTATUS: return client.listPath(path, false, 5, null,
+ getTestTracingContext(fs, true));
+ case RENAME: return client.renamePath(path, new Path(path +
"_2").toString(),
+ null, getTestTracingContext(fs, true), null, false).getOp();
+ case DELETE: return client.deletePath(path, false, null,
+ getTestTracingContext(fs, false));
+ case GET_ATTR: return client.getPathStatus(path, true,
+ getTestTracingContext(fs, false));
+ case SET_ATTR:
+ Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("key", "{ value: valueTest }");
+ return client.setPathProperties(path, fs.getAbfsStore()
+ .convertXmsPropertiesToCommaSeparatedString(properties),
+ getTestTracingContext(fs, false));
+ case SET_PERMISSION:
+ return client.setPermission(path, FsPermission.getDefault().toString(),
+ getTestTracingContext(fs, false));
+ default: throw new NoSuchFieldException();
+ }
+ }
+ }
+
+ private AzureBlobFileSystem getECProviderEnabledFS() throws Exception {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE + "."
+ + getAccountName(),
MockEncryptionContextProvider.class.getCanonicalName());
+ configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY + "."
+ + getAccountName());
+ configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA +
"."
+ + getAccountName());
+ return (AzureBlobFileSystem) FileSystem.newInstance(configuration);
+ }
+
+ private AzureBlobFileSystem getCPKEnabledFS() throws IOException {
+ Configuration conf = getRawConfiguration();
+ String cpkEncoded = EncodingHelper.getBase64EncodedString(cpk);
+ String cpkEncodedSHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY + "."
+ + getAccountName(), cpkEncoded);
+ conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA + "."
+ + getAccountName(), cpkEncodedSHA);
+ conf.unset(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+
+ private AzureBlobFileSystem getFS() throws Exception {
Review Comment:
this may actually create a new FS.
1. name is wrong, `getOrCreateFS()`
2. needs a strategy on how to close() all newly created instances, e.g.
store in a field and close in teardown
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncodingHelper.java:
##########
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+public class EncodingHelper {
Review Comment:
make final, add javadocs, indent to two spaces...
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -25,6 +25,7 @@
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter;
Review Comment:
move to line 43
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.reflect.Field;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
Review Comment:
move down to line 46
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncodingHelper.java:
##########
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+public class EncodingHelper {
+
+ public static byte[] getSHA256Hash(byte[] key) {
+ try {
+ final MessageDigest digester =
MessageDigest.getInstance("SHA-256");
+ return digester.digest(key);
+ } catch (NoSuchAlgorithmException ignored) {
+ /**
Review Comment:
should be simple /* comment, not a javadoc /** entry
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncryptionAdapter.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Base64;
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.Destroyable;
+
+import org.apache.hadoop.util.Preconditions;
+
Review Comment:
cut the empty line and try to sort the org.apache entries
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -1856,6 +1906,35 @@ public String toString() {
}
}
+ public static class Permissions {
Review Comment:
1. needs javadoc for classes, fields, constructor. emphasise that the values
will be null if constructed in a store without namespaces
2. make final
3. add a toString()
if you add `hasPermission()` and `hasUmask()` methods returning true if the
file is non-null and non-empty, it'll be easier to use when setting permissions
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncryptionAdapter.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Base64;
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.Destroyable;
+
+import org.apache.hadoop.util.Preconditions;
+
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+
+public class EncryptionAdapter implements Destroyable {
+ private final String path;
+ private ABFSKey encryptionContext;
+ private ABFSKey encryptionKey;
+ private final EncryptionContextProvider provider;
+
+ public EncryptionAdapter(EncryptionContextProvider provider, String path,
+ byte[] encryptionContext) throws IOException {
+ this(provider, path);
+ Preconditions.checkNotNull(encryptionContext,
+ "Encryption context should not be null.");
+ this.encryptionContext = new
ABFSKey(Base64.getDecoder().decode(encryptionContext));
+ Arrays.fill(encryptionContext, (byte) 0);
+ }
+
+ public EncryptionAdapter(EncryptionContextProvider provider, String path)
+ throws IOException {
+ this.provider = provider;
+ this.path = path;
+ }
+
+ private void computeKeys() throws IOException {
+ if (encryptionContext == null) {
+ encryptionContext = provider.getEncryptionContext(path);
+ }
+ Preconditions.checkNotNull(encryptionContext,
+ "Encryption context should not be null.");
+ if (encryptionKey == null) {
+ encryptionKey = provider.getEncryptionKey(path, encryptionContext);
Review Comment:
if you use Objects.requireNonNull() you can integrate the check with the
assignment...look for other places we do this
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -160,42 +168,30 @@ private AbfsClient(final URL baseUrl, final
SharedKeyCredentials sharedKeyCreden
public AbfsClient(final URL baseUrl, final SharedKeyCredentials
sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext)
throws IOException {
- this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
+ this(baseUrl, sharedKeyCredentials, abfsConfiguration,
+ encryptionContextProvider, abfsClientContext);
this.tokenProvider = tokenProvider;
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials
sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
+ final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext)
throws IOException {
- this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
+ this(baseUrl, sharedKeyCredentials, abfsConfiguration,
+ encryptionContextProvider, abfsClientContext);
this.sasTokenProvider = sasTokenProvider;
}
- private byte[] getSHA256Hash(String key) throws IOException {
- try {
- final MessageDigest digester = MessageDigest.getInstance("SHA-256");
- return digester.digest(key.getBytes(StandardCharsets.UTF_8));
- } catch (NoSuchAlgorithmException e) {
- throw new IOException(e);
- }
- }
-
- private String getBase64EncodedString(String key) {
- return getBase64EncodedString(key.getBytes(StandardCharsets.UTF_8));
- }
-
- private String getBase64EncodedString(byte[] bytes) {
- return Base64.getEncoder().encodeToString(bytes);
- }
-
@Override
public void close() throws IOException {
if (tokenProvider instanceof Closeable) {
- IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
+ org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG,
Review Comment:
restore
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncryptionAdapter.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Base64;
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.Destroyable;
+
+import org.apache.hadoop.util.Preconditions;
+
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+
+public class EncryptionAdapter implements Destroyable {
+ private final String path;
+ private ABFSKey encryptionContext;
+ private ABFSKey encryptionKey;
+ private final EncryptionContextProvider provider;
+
+ public EncryptionAdapter(EncryptionContextProvider provider, String path,
+ byte[] encryptionContext) throws IOException {
+ this(provider, path);
+ Preconditions.checkNotNull(encryptionContext,
+ "Encryption context should not be null.");
+ this.encryptionContext = new
ABFSKey(Base64.getDecoder().decode(encryptionContext));
+ Arrays.fill(encryptionContext, (byte) 0);
+ }
+
+ public EncryptionAdapter(EncryptionContextProvider provider, String path)
+ throws IOException {
+ this.provider = provider;
+ this.path = path;
+ }
+
+ private void computeKeys() throws IOException {
Review Comment:
does this need to be synchronized
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncryptionAdapter.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Base64;
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.Destroyable;
+
+import org.apache.hadoop.util.Preconditions;
+
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+
+public class EncryptionAdapter implements Destroyable {
Review Comment:
add javadocs, etc. Adding more on architectural issues in the summary
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncodingHelper.java:
##########
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+public class EncodingHelper {
+
+ public static byte[] getSHA256Hash(byte[] key) {
+ try {
+ final MessageDigest digester =
MessageDigest.getInstance("SHA-256");
+ return digester.digest(key);
+ } catch (NoSuchAlgorithmException ignored) {
+ /**
+ * This exception can be ignored. Reason being SHA-256 is a valid
algorithm, and it is constant for all
+ * method calls.
+ */
+ return null;
Review Comment:
if it can never happen, just throw a RuntimeException. somehow, somewhere,
it may fail...
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/EncryptionType.java:
##########
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+/**
+ * Enum EncryptionType to represent the level of encryption applied
Review Comment:
nit add .
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -494,17 +499,26 @@ public synchronized void close() throws IOException {
// See HADOOP-16785
throw wrapException(path, e.getMessage(), e);
} finally {
- if (hasLease()) {
- lease.free();
- lease = null;
- }
- lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- buffer = null;
- bufferIndex = 0;
- closed = true;
- writeOperations.clear();
- if (hasActiveBlock()) {
- clearActiveBlock();
+ try {
+ if (encryptionAdapter != null) {
+ encryptionAdapter.destroy();
+ }
+ } catch (DestroyFailedException e) {
Review Comment:
can cut once EncrytionAdapter.destroy() changes signature.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/EncryptionAdapter.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.security;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Base64;
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.Destroyable;
+
+import org.apache.hadoop.util.Preconditions;
+
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+
+public class EncryptionAdapter implements Destroyable {
+ private final String path;
+ private ABFSKey encryptionContext;
+ private ABFSKey encryptionKey;
+ private final EncryptionContextProvider provider;
+
+ public EncryptionAdapter(EncryptionContextProvider provider, String path,
+ byte[] encryptionContext) throws IOException {
+ this(provider, path);
+ Preconditions.checkNotNull(encryptionContext,
+ "Encryption context should not be null.");
+ this.encryptionContext = new
ABFSKey(Base64.getDecoder().decode(encryptionContext));
+ Arrays.fill(encryptionContext, (byte) 0);
+ }
+
+ public EncryptionAdapter(EncryptionContextProvider provider, String path)
+ throws IOException {
+ this.provider = provider;
+ this.path = path;
+ }
+
+ private void computeKeys() throws IOException {
+ if (encryptionContext == null) {
+ encryptionContext = provider.getEncryptionContext(path);
+ }
+ Preconditions.checkNotNull(encryptionContext,
+ "Encryption context should not be null.");
+ if (encryptionKey == null) {
+ encryptionKey = provider.getEncryptionKey(path, encryptionContext);
+ }
+ Preconditions.checkNotNull(encryptionKey, "Encryption key should not be
null.");
+ }
+
+ public String getEncodedKey() throws IOException {
+ computeKeys();
+ return EncodingHelper.getBase64EncodedString(encryptionKey.getEncoded());
+ }
+
+ public String getEncodedKeySHA() throws IOException {
+ computeKeys();
+ return
EncodingHelper.getBase64EncodedString(EncodingHelper.getSHA256Hash(encryptionKey.getEncoded()));
+ }
+
+ public String getEncodedContext() throws IOException {
+ computeKeys();
+ return
EncodingHelper.getBase64EncodedString(encryptionContext.getEncoded());
+ }
+
+ public void destroy() throws DestroyFailedException {
Review Comment:
ABFSKey doesn't throw DestroyFailedException; this doesn't need to. remove
it and invocations in the code get a lot easier
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -770,7 +778,7 @@ public AbfsInputStream openFileForRead(Path path,
if (fileStatus != null) {
LOG.debug(
"Fallback to getPathStatus REST call as provided filestatus "
- + "is not of type VersionedFileStatus");
+ + "is not of type VersionedFileStatus, or file is
encrypted");
Review Comment:
why does a new HEAD request need to be issued? is there something wrong with
the value returned by getFileStatus/list, or is it you need extra information?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -45,6 +46,8 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import javax.security.auth.DestroyFailedException;
Review Comment:
move to line 26
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockEncryptionContextProvider.java:
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.extensions;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.VisibleForTesting;
Review Comment:
no need for this annotation in test code
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java:
##########
@@ -25,6 +25,7 @@
import java.util.EnumSet;
import java.util.UUID;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
Review Comment:
review import placement/order
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/EncryptionType.java:
##########
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+/**
+ * Enum EncryptionType to represent the level of encryption applied
+ */
+public enum EncryptionType {
+ GLOBAL_KEY, // encrypt all files with the same client-provided key
Review Comment:
make these javadocs
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.security.EncodingHelper;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.MockEncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Lists;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ENCRYPTION_KEY_LEN;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static
org.apache.hadoop.fs.azurebfs.utils.EncryptionType.ENCRYPTION_CONTEXT;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.GLOBAL_KEY;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.NONE;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+@RunWith(Parameterized.class)
+public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
Review Comment:
there's a lot of configurations to test here. How long does it take?
a lot of the time will be setup/teardown costs, which, the way the itests
are done, happens for every test method.
would it be possible to take params 2-6 and store in a single list param,
then have a single test case read this list in and go through each one. that
would massively reduce the overheads. But what you have here is the more
elegant one, so it's only worth doing if the performance is bad or expected to
get worse
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -64,10 +66,11 @@
import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import javax.security.auth.DestroyFailedException;
Review Comment:
-> line 22 block
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.security.EncodingHelper;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.extensions.MockEncryptionContextProvider;
+import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Lists;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ENCRYPTION_KEY_LEN;
+import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static
org.apache.hadoop.fs.azurebfs.utils.EncryptionType.ENCRYPTION_CONTEXT;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.GLOBAL_KEY;
+import static org.apache.hadoop.fs.azurebfs.utils.EncryptionType.NONE;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+@RunWith(Parameterized.class)
+public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
+ private final byte[] cpk = new byte[ENCRYPTION_KEY_LEN];
+ private final String cpkSHAEncoded;
+
+ // Encryption type used by filesystem while creating file
+ @Parameterized.Parameter
+ public EncryptionType fileEncryptionType;
+
+ // Encryption type used by filesystem to call different operations
+ @Parameterized.Parameter(1)
+ public EncryptionType requestEncryptionType;
+
+ @Parameterized.Parameter(2)
+ public FSOperationType operation;
+
+ @Parameterized.Parameter(3)
+ public boolean responseHeaderServerEnc;
+
+ @Parameterized.Parameter(4)
+ public boolean responseHeaderReqServerEnc;
+
+ @Parameterized.Parameter(5)
+ public boolean isExceptionCase;
+
+ @Parameterized.Parameter(6)
+ public boolean isCpkResponseHdrExpected;
+
+
+ @Parameterized.Parameters(name = "{0} mode, {2}")
+ public static Iterable<Object[]> params() {
+ return Arrays.asList(new Object[][] {
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.READ, true,
false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.WRITE, false,
true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.APPEND,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ACL,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.GET_ATTR,
true, false, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.SET_ATTR,
false, true, false, true},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.LISTSTATUS,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.RENAME,
false, false, false, false},
+ {ENCRYPTION_CONTEXT, ENCRYPTION_CONTEXT, FSOperationType.DELETE,
false, false, false, false},
+
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.WRITE, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.GET_ATTR, true, false,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.READ, false, false, true,
false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ATTR, false, true,
true, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.RENAME, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.DELETE, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_ACL, false, false,
false, false},
+ {ENCRYPTION_CONTEXT, NONE, FSOperationType.SET_PERMISSION, false,
false, false, false},
+
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.READ, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.WRITE, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.APPEND, false, true, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.LISTSTATUS, false, false,
false, false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.RENAME, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.DELETE, false, false, false,
false},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.GET_ATTR, true, false, false,
true},
+ {GLOBAL_KEY, GLOBAL_KEY, FSOperationType.SET_ATTR, false, true, false,
true},
+
+ {GLOBAL_KEY, NONE, FSOperationType.READ, true, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.WRITE, false, true, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ATTR, false, false, true, true},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_ACL, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.RENAME, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.LISTSTATUS, false, false, false,
false},
+ {GLOBAL_KEY, NONE, FSOperationType.DELETE, false, false, false, false},
+ {GLOBAL_KEY, NONE, FSOperationType.SET_PERMISSION, false, false,
false, false},
+ });
+ }
+
+ public ITestAbfsCustomEncryption() throws Exception {
+ super();
+ Assume.assumeTrue("Account should be HNS enabled for CPK",
+ getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT,
+ false));
+ new Random().nextBytes(cpk);
+ cpkSHAEncoded = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ }
+
+ @Test
+ public void testCustomEncryptionCombinations() throws Exception {
+ AzureBlobFileSystem fs = getFS();
+ Path testPath = path("/testFile");
+ String relativePath = fs.getAbfsStore().getRelativePath(testPath);
+ MockEncryptionContextProvider ecp =
+ (MockEncryptionContextProvider) createEncryptedFile(testPath);
+ AbfsRestOperation op = callOperation(fs, new Path(relativePath), ecp);
+ if (op == null) {
+ return;
+ }
+ AbfsHttpOperation httpOp = op.getResult();
+ if (isCpkResponseHdrExpected) {
+ if (requestEncryptionType == ENCRYPTION_CONTEXT) {
+ String encryptionContext =
ecp.getEncryptionContextForTest(relativePath);
+ String expectedKeySHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(
+ ecp.getEncryptionKeyForTest(encryptionContext)));
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(expectedKeySHA);
+ } else { // GLOBAL_KEY
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(cpkSHAEncoded);
+ }
+ } else {
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_ENCRYPTION_KEY_SHA256))
+ .isEqualTo(null);
+ }
+ Assertions.assertThat(httpOp.getResponseHeader(X_MS_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderServerEnc? "true" : null);
+
Assertions.assertThat(httpOp.getResponseHeader(X_MS_REQUEST_SERVER_ENCRYPTED))
+ .isEqualTo(responseHeaderReqServerEnc? "true" : null);
+ }
+
+ /**
+ * Executes a given operation at the AbfsClient level and returns
+ * AbfsRestOperation instance to verify response headers. Asserts excetion
+ * for combinations that should not succeed.
+ * @param fs AzureBlobFileSystem instance
+ * @param testPath path of file
+ * @param ecp EncryptionContextProvider instance to support AbfsClient
methods
+ * @return Rest op or null depending on whether the request is allowed
+ * @throws Exception error
+ */
+ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
+ Path testPath, EncryptionContextProvider ecp)
+ throws Exception {
+ AbfsClient client = fs.getAbfsClient();
+ client.setEncryptionContextProvider(ecp);
+ if (isExceptionCase) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ switch (operation) {
+ case WRITE: try (FSDataOutputStream out = fs.append(testPath)) {
+ out.write("bytes".getBytes());
+ }
+ break;
+ case READ: try (FSDataInputStream in = fs.open(testPath)) {
+ in.read(new byte[5]);
+ }
+ break;
+ case SET_ATTR: fs.setXAttr(testPath, "attribute", "value".getBytes());
+ break;
+ case GET_ATTR: fs.getXAttr(testPath, "attribute");
+ default: throw new NoSuchFieldException();
+ }
+ });
+ return null;
+ } else {
+ EncryptionAdapter encryptionAdapter = null;
+ if (fileEncryptionType == ENCRYPTION_CONTEXT) {
+ encryptionAdapter = new EncryptionAdapter(ecp,
+ fs.getAbfsStore().getRelativePath(testPath),
+ Base64.getEncoder().encode(
+ ((MockEncryptionContextProvider)
ecp).getEncryptionContextForTest(testPath.toString())
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+ String path = testPath.toString();
+ switch (operation) {
+ case READ:
+ TracingContext tracingContext = getTestTracingContext(fs, true);
+ AbfsHttpOperation statusOp = client.getPathStatus(path, false,
+ tracingContext).getResult();
+ return client.read(path, 0, new byte[5], 0, 5,
statusOp.getResponseHeader(HttpHeaderConfigurations.ETAG),
+ null, encryptionAdapter, tracingContext);
+ case WRITE: return client.flush(path, 3, false, false, null,
+ null, encryptionAdapter, getTestTracingContext(fs, false));
+ case APPEND: return client.append(path, "val".getBytes(),
+ new AppendRequestParameters(3, 0, 3, APPEND_MODE, false, null),
+ null, encryptionAdapter, getTestTracingContext(fs, false));
+ case SET_ACL: return client.setAcl(path, AclEntry.aclSpecToString(
+ Lists.newArrayList(aclEntry(ACCESS, USER, ALL))),
+ getTestTracingContext(fs, false));
+ case LISTSTATUS: return client.listPath(path, false, 5, null,
+ getTestTracingContext(fs, true));
+ case RENAME: return client.renamePath(path, new Path(path +
"_2").toString(),
+ null, getTestTracingContext(fs, true), null, false).getOp();
+ case DELETE: return client.deletePath(path, false, null,
+ getTestTracingContext(fs, false));
+ case GET_ATTR: return client.getPathStatus(path, true,
+ getTestTracingContext(fs, false));
+ case SET_ATTR:
+ Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("key", "{ value: valueTest }");
+ return client.setPathProperties(path, fs.getAbfsStore()
+ .convertXmsPropertiesToCommaSeparatedString(properties),
+ getTestTracingContext(fs, false));
+ case SET_PERMISSION:
+ return client.setPermission(path, FsPermission.getDefault().toString(),
+ getTestTracingContext(fs, false));
+ default: throw new NoSuchFieldException();
+ }
+ }
+ }
+
+ private AzureBlobFileSystem getECProviderEnabledFS() throws Exception {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE + "."
+ + getAccountName(),
MockEncryptionContextProvider.class.getCanonicalName());
+ configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY + "."
+ + getAccountName());
+ configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA +
"."
+ + getAccountName());
+ return (AzureBlobFileSystem) FileSystem.newInstance(configuration);
+ }
+
+ private AzureBlobFileSystem getCPKEnabledFS() throws IOException {
+ Configuration conf = getRawConfiguration();
+ String cpkEncoded = EncodingHelper.getBase64EncodedString(cpk);
+ String cpkEncodedSHA = EncodingHelper.getBase64EncodedString(
+ EncodingHelper.getSHA256Hash(cpk));
+ conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY + "."
+ + getAccountName(), cpkEncoded);
+ conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA + "."
+ + getAccountName(), cpkEncodedSHA);
+ conf.unset(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+
+ private AzureBlobFileSystem getFS() throws Exception {
+ if (getFileSystem().getAbfsClient().getEncryptionType() ==
requestEncryptionType) {
+ return getFileSystem();
+ }
+ if (requestEncryptionType == ENCRYPTION_CONTEXT) {
+ return getECProviderEnabledFS();
+ } else if (requestEncryptionType == GLOBAL_KEY) {
+ return getCPKEnabledFS();
+ } else {
+ Configuration conf = getRawConfiguration();
+ conf.unset(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+ }
+
+ private EncryptionContextProvider createEncryptedFile(Path testPath) throws
Exception {
+ AzureBlobFileSystem fs;
+ if (getFileSystem().getAbfsClient().getEncryptionType() ==
fileEncryptionType) {
+ fs = getFileSystem();
+ } else {
+ fs = fileEncryptionType == ENCRYPTION_CONTEXT
+ ? getECProviderEnabledFS()
+ : getCPKEnabledFS();
+ }
+ String relativePath = fs.getAbfsStore().getRelativePath(testPath);
+ try (FSDataOutputStream out = fs.create(new Path(relativePath))) {
+ out.write("123".getBytes());
+ }
+ // verify file is encrypted by calling getPathStatus (with properties)
+ // without encryption headers in request
+ if (fileEncryptionType != EncryptionType.NONE) {
+ fs.getAbfsClient().setEncryptionType(EncryptionType.NONE);
Review Comment:
store the abfsclient in a variable and reuse
> ABFS: Support for Encryption Context
> ------------------------------------
>
> Key: HADOOP-17912
> URL: https://issues.apache.org/jira/browse/HADOOP-17912
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.3.1
> Reporter: Sumangala Patki
> Assignee: Pranav Saxena
> Priority: Major
> Labels: pull-request-available
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Support for customer-provided encryption keys at the file level, superceding
> the global (account-level) key use in HADOOP-17536.
> ABFS driver will support an "EncryptionContext" plugin for retrieving
> encryption information, the implementation for which should be provided by
> the client. The keys/context retrieved will be sent via request headers to
> the server, which will store the encryption context. Subsequent REST calls to
> server that access data/user metadata of the file will require fetching the
> encryption context through a GetFileProperties call and retrieving the key
> from the custom provider, before sending the request.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]