[
https://issues.apache.org/jira/browse/HADOOP-19604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18014797#comment-18014797
]
ASF GitHub Bot commented on HADOOP-19604:
-----------------------------------------
anujmodi2021 commented on code in PR #7853:
URL: https://github.com/apache/hadoop/pull/7853#discussion_r2284098667
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java:
##########
@@ -447,34 +473,37 @@ public void testScenario3() throws Exception {
*/
@Test
public void testScenario4() throws Exception {
- AzureBlobFileSystem abfs = getFileSystem();
- Assume.assumeFalse("Namespace enabled account does not support this test",
- getIsNamespaceEnabled(abfs));
- assumeBlobServiceType();
- Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
- NativeAzureFileSystem wasb = getWasbFileSystem();
-
- Path testFile = path("/testReadFile");
- Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID());
-
- // Write
- wasb.create(path, true);
- try (FSDataOutputStream abfsOutputStream = abfs.append(path)) {
- abfsOutputStream.write(TEST_CONTEXT.getBytes());
- abfsOutputStream.flush();
- abfsOutputStream.hsync();
- }
-
- try (FSDataOutputStream nativeFsStream = wasb.append(path)) {
- nativeFsStream.write(TEST_CONTEXT1.getBytes());
- nativeFsStream.flush();
- nativeFsStream.hsync();
+ try (AzureBlobFileSystem abfs = getFileSystem()) {
+ Assume.assumeFalse("Namespace enabled account does not support this
test",
Review Comment:
Let's move all the assume into a common place and they could be first line
in each method.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java:
##########
@@ -299,32 +311,34 @@ public void testUrlConversion() {
@Test
public void testSetWorkingDirectory() throws Exception {
//create folders
- AzureBlobFileSystem abfs = getFileSystem();
- // test only valid for non-namespace enabled account
- Assume.assumeFalse("Namespace enabled account does not support this test",
- getIsNamespaceEnabled(abfs));
-
- NativeAzureFileSystem wasb = getWasbFileSystem();
-
- Path d1 = path("/d1");
- Path d1d4 = new Path(d1 + "/d2/d3/d4");
- assertMkdirs(abfs, d1d4);
-
- //set working directory to path1
- Path path1 = new Path(d1 + "/d2");
- wasb.setWorkingDirectory(path1);
- abfs.setWorkingDirectory(path1);
- assertEquals(path1, wasb.getWorkingDirectory());
- assertEquals(path1, abfs.getWorkingDirectory());
-
- //set working directory to path2
- Path path2 = new Path("d3/d4");
- wasb.setWorkingDirectory(path2);
- abfs.setWorkingDirectory(path2);
-
- Path path3 = d1d4;
- assertEquals(path3, wasb.getWorkingDirectory());
- assertEquals(path3, abfs.getWorkingDirectory());
+ try (AzureBlobFileSystem abfs = getFileSystem()) {
+ // test only valid for non-namespace enabled account
+ Assume.assumeFalse("Namespace enabled account does not support this
test",
Review Comment:
we can use base class method `assumeHnsDisabled()` for better readability
here and other places.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java:
##########
@@ -481,6 +482,103 @@ public void testResetCalledOnExceptionInRemoteFlush()
throws Exception {
//expected exception
}
// Verify that reset was called on the message digest
- Mockito.verify(mockMessageDigest, Mockito.times(1)).reset();
+ if (spiedClient.isFullBlobChecksumValidationEnabled()) {
+
Assertions.assertThat(Mockito.mockingDetails(mockMessageDigest).getInvocations()
+ .stream()
+ .filter(i -> i.getMethod().getName().equals("reset"))
+ .count())
+ .as("Expected MessageDigest.reset() to be called exactly once when
checksum validation is enabled")
+ .isEqualTo(1);
+ }
+ }
+
+ /**
+ * Tests that the message digest is reset when an exception occurs during
remote flush.
+ * Simulates a failure in the flush operation and verifies reset is called
on MessageDigest.
+ */
+ @Test
+ public void testNoChecksumComputedWhenConfigFalse() throws Exception {
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, false);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
+ AzureBlobFileSystem fs = (AzureBlobFileSystem) fileSystem;
+ Assume.assumeTrue(!getIsNamespaceEnabled(fs));
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ assumeBlobServiceType();
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+
+ // Create spies for the client handler and blob client
+ AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+ AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
+
+ // Set up the spies to return the mocked objects
+ Mockito.doReturn(clientHandler).when(store).getClientHandler();
+ Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
+ Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
+ AbfsOutputStream abfsOutputStream = Mockito.spy(
+ (AbfsOutputStream) fs.create(new
Path("/test/file")).getWrappedStream());
+ AzureIngressHandler ingressHandler = Mockito.spy(
+ abfsOutputStream.getIngressHandler());
+
Mockito.doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler();
+ Mockito.doReturn(blobClient).when(ingressHandler).getClient();
+ FSDataOutputStream os = Mockito.spy(
+ new FSDataOutputStream(abfsOutputStream, null));
+ AbfsOutputStream out = (AbfsOutputStream) os.getWrappedStream();
+ byte[] bytes = new byte[1024 * 1024 * 4];
+ new Random().nextBytes(bytes);
+ // Write some bytes and attempt to flush, which should retry
+ out.write(bytes);
+ out.hsync();
+ Assertions.assertThat(Mockito.mockingDetails(blobClient).getInvocations()
+ .stream()
+ .filter(i ->
i.getMethod().getName().equals("addCheckSumHeaderForWrite"))
+ .count())
+ .as("Expected addCheckSumHeaderForWrite() to be called exactly 0
times")
+ .isZero();
+ }
+
+ /**
+ * Tests that the message digest is reset when an exception occurs during
remote flush.
+ * Simulates a failure in the flush operation and verifies reset is called
on MessageDigest.
+ */
+ @Test
+ public void testChecksumComputedWhenConfigTrue() throws Exception {
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
+ AzureBlobFileSystem fs = (AzureBlobFileSystem) fileSystem;
Review Comment:
try() needed here as it is a new Instance
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java:
##########
@@ -113,121 +114,129 @@ public void testReadFile() throws Exception {
boolean[] createFileWithAbfs = new boolean[]{false, true, false, true};
boolean[] readFileWithAbfs = new boolean[]{false, true, true, false};
- AzureBlobFileSystem abfs = getFileSystem();
- // test only valid for non-namespace enabled account
- Assume.assumeFalse("Namespace enabled account does not support this test",
- getIsNamespaceEnabled(abfs));
- Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
-
- NativeAzureFileSystem wasb = getWasbFileSystem();
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, true);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
+ try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+ // test only valid for non-namespace enabled account
+ Assume.assumeFalse("Namespace enabled account does not support this
test",
+ getIsNamespaceEnabled(abfs));
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+
+ NativeAzureFileSystem wasb = getWasbFileSystem();
+
+ Path testFile = path("/testReadFile");
+ for (int i = 0; i < 4; i++) {
+ Path path = new Path(testFile + "/~12/!008/testfile" + i);
+ final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb;
+ // Read
+ final FileSystem readFs = readFileWithAbfs[i] ? abfs : wasb;
+ // Write
+ try (FSDataOutputStream nativeFsStream = createFs.create(path, true)) {
+ nativeFsStream.write(TEST_CONTEXT.getBytes());
+ nativeFsStream.flush();
+ nativeFsStream.hsync();
+ }
+
+ // Check file status
+ ContractTestUtils.assertIsFile(createFs, path);
+
+ try (BufferedReader br = new BufferedReader(
+ new InputStreamReader(readFs.open(path)))) {
+ String line = br.readLine();
+ assertEquals("Wrong text from " + readFs,
+ TEST_CONTEXT, line);
+ }
+
+ // Remove file
+ assertDeleted(readFs, path, true);
+ }
+ }
+ }
- Path testFile = path("/testReadFile");
- for (int i = 0; i < 4; i++) {
- Path path = new Path(testFile + "/~12/!008/testfile" + i);
- final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb;
- // Read
- final FileSystem readFs = readFileWithAbfs[i] ? abfs : wasb;
+ /**
+ * Flow: Create and write a file using WASB, then read and append to it
using ABFS. Finally, delete the file via ABFS after verifying content
consistency.
+ * Expected: WASB successfully creates the file and writes content. ABFS
reads, appends, and deletes the file without data loss or errors.
+ */
+ @Test
+ public void testwriteFile() throws Exception {
+ try (AzureBlobFileSystem abfs = getFileSystem()) {
Review Comment:
Nit: Here we are not creating a new instance. We are using the instance
created by base class and that will be autoclosed during tear down. try() is
redundant here.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java:
##########
@@ -481,6 +482,103 @@ public void testResetCalledOnExceptionInRemoteFlush()
throws Exception {
//expected exception
}
// Verify that reset was called on the message digest
- Mockito.verify(mockMessageDigest, Mockito.times(1)).reset();
+ if (spiedClient.isFullBlobChecksumValidationEnabled()) {
+
Assertions.assertThat(Mockito.mockingDetails(mockMessageDigest).getInvocations()
+ .stream()
+ .filter(i -> i.getMethod().getName().equals("reset"))
+ .count())
+ .as("Expected MessageDigest.reset() to be called exactly once when
checksum validation is enabled")
+ .isEqualTo(1);
+ }
+ }
+
+ /**
+ * Tests that the message digest is reset when an exception occurs during
remote flush.
+ * Simulates a failure in the flush operation and verifies reset is called
on MessageDigest.
+ */
+ @Test
+ public void testNoChecksumComputedWhenConfigFalse() throws Exception {
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, false);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
Review Comment:
try() needed here
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java:
##########
@@ -299,32 +311,34 @@ public void testUrlConversion() {
@Test
public void testSetWorkingDirectory() throws Exception {
//create folders
- AzureBlobFileSystem abfs = getFileSystem();
- // test only valid for non-namespace enabled account
- Assume.assumeFalse("Namespace enabled account does not support this test",
- getIsNamespaceEnabled(abfs));
-
- NativeAzureFileSystem wasb = getWasbFileSystem();
-
- Path d1 = path("/d1");
- Path d1d4 = new Path(d1 + "/d2/d3/d4");
- assertMkdirs(abfs, d1d4);
-
- //set working directory to path1
- Path path1 = new Path(d1 + "/d2");
- wasb.setWorkingDirectory(path1);
- abfs.setWorkingDirectory(path1);
- assertEquals(path1, wasb.getWorkingDirectory());
- assertEquals(path1, abfs.getWorkingDirectory());
-
- //set working directory to path2
- Path path2 = new Path("d3/d4");
- wasb.setWorkingDirectory(path2);
- abfs.setWorkingDirectory(path2);
-
- Path path3 = d1d4;
- assertEquals(path3, wasb.getWorkingDirectory());
- assertEquals(path3, abfs.getWorkingDirectory());
+ try (AzureBlobFileSystem abfs = getFileSystem()) {
+ // test only valid for non-namespace enabled account
+ Assume.assumeFalse("Namespace enabled account does not support this
test",
Review Comment:
Nit: Also I suppose all the tests in this class need FNS account, may be we
canmove this assume in constuctor itself to skip whole file instead of checking
individual tests
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java:
##########
@@ -113,121 +114,129 @@ public void testReadFile() throws Exception {
boolean[] createFileWithAbfs = new boolean[]{false, true, false, true};
boolean[] readFileWithAbfs = new boolean[]{false, true, true, false};
- AzureBlobFileSystem abfs = getFileSystem();
- // test only valid for non-namespace enabled account
- Assume.assumeFalse("Namespace enabled account does not support this test",
- getIsNamespaceEnabled(abfs));
- Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
-
- NativeAzureFileSystem wasb = getWasbFileSystem();
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, true);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
+ try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+ // test only valid for non-namespace enabled account
+ Assume.assumeFalse("Namespace enabled account does not support this
test",
+ getIsNamespaceEnabled(abfs));
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+
+ NativeAzureFileSystem wasb = getWasbFileSystem();
+
+ Path testFile = path("/testReadFile");
+ for (int i = 0; i < 4; i++) {
+ Path path = new Path(testFile + "/~12/!008/testfile" + i);
+ final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb;
+ // Read
+ final FileSystem readFs = readFileWithAbfs[i] ? abfs : wasb;
+ // Write
+ try (FSDataOutputStream nativeFsStream = createFs.create(path, true)) {
+ nativeFsStream.write(TEST_CONTEXT.getBytes());
+ nativeFsStream.flush();
+ nativeFsStream.hsync();
+ }
+
+ // Check file status
+ ContractTestUtils.assertIsFile(createFs, path);
+
+ try (BufferedReader br = new BufferedReader(
+ new InputStreamReader(readFs.open(path)))) {
+ String line = br.readLine();
+ assertEquals("Wrong text from " + readFs,
+ TEST_CONTEXT, line);
+ }
+
+ // Remove file
+ assertDeleted(readFs, path, true);
+ }
+ }
+ }
- Path testFile = path("/testReadFile");
- for (int i = 0; i < 4; i++) {
- Path path = new Path(testFile + "/~12/!008/testfile" + i);
- final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb;
- // Read
- final FileSystem readFs = readFileWithAbfs[i] ? abfs : wasb;
+ /**
+ * Flow: Create and write a file using WASB, then read and append to it
using ABFS. Finally, delete the file via ABFS after verifying content
consistency.
+ * Expected: WASB successfully creates the file and writes content. ABFS
reads, appends, and deletes the file without data loss or errors.
+ */
+ @Test
+ public void testwriteFile() throws Exception {
+ try (AzureBlobFileSystem abfs = getFileSystem()) {
+ Assume.assumeFalse("Namespace enabled account does not support this
test",
+ getIsNamespaceEnabled(abfs));
+ assumeBlobServiceType();
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+ NativeAzureFileSystem wasb = getWasbFileSystem();
+
+ Path testFile = path("/testReadFile");
+ Path path = new Path(
+ testFile + "/~12/!008/testfile_" + UUID.randomUUID());
// Write
- try (FSDataOutputStream nativeFsStream = createFs.create(path, true)) {
+ try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) {
nativeFsStream.write(TEST_CONTEXT.getBytes());
nativeFsStream.flush();
nativeFsStream.hsync();
}
// Check file status
- ContractTestUtils.assertIsFile(createFs, path);
+ ContractTestUtils.assertIsFile(wasb, path);
try (BufferedReader br = new BufferedReader(
- new InputStreamReader(readFs.open(path)))) {
+ new InputStreamReader(abfs.open(path)))) {
String line = br.readLine();
- assertEquals("Wrong text from " + readFs,
+ assertEquals("Wrong text from " + abfs,
TEST_CONTEXT, line);
}
-
+ try (FSDataOutputStream abfsOutputStream = abfs.append(path)) {
+ abfsOutputStream.write(TEST_CONTEXT.getBytes());
+ abfsOutputStream.flush();
+ abfsOutputStream.hsync();
+ }
// Remove file
- assertDeleted(readFs, path, true);
+ assertDeleted(abfs, path, true);
}
}
- /**
- * Flow: Create and write a file using WASB, then read and append to it
using ABFS. Finally, delete the file via ABFS after verifying content
consistency.
- * Expected: WASB successfully creates the file and writes content. ABFS
reads, appends, and deletes the file without data loss or errors.
- */
- @Test
- public void testwriteFile() throws Exception {
- AzureBlobFileSystem abfs = getFileSystem();
- Assume.assumeFalse("Namespace enabled account does not support this test",
- getIsNamespaceEnabled(abfs));
- assumeBlobServiceType();
- Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
- NativeAzureFileSystem wasb = getWasbFileSystem();
-
- Path testFile = path("/testReadFile");
- Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID());
- // Write
- try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) {
- nativeFsStream.write(TEST_CONTEXT.getBytes());
- nativeFsStream.flush();
- nativeFsStream.hsync();
- }
-
- // Check file status
- ContractTestUtils.assertIsFile(wasb, path);
-
- try (BufferedReader br = new BufferedReader(
- new InputStreamReader(abfs.open(path)))) {
- String line = br.readLine();
- assertEquals("Wrong text from " + abfs,
- TEST_CONTEXT, line);
- }
- try (FSDataOutputStream abfsOutputStream = abfs.append(path)) {
- abfsOutputStream.write(TEST_CONTEXT.getBytes());
- abfsOutputStream.flush();
- abfsOutputStream.hsync();
- }
- // Remove file
- assertDeleted(abfs, path, true);
- }
-
/**
* Flow: Create and write a file using ABFS, append to the file using WASB,
then write again using ABFS.
* Expected: File is created and written correctly by ABFS, appended by
WASB, and final ABFS write reflects all updates without errors.
*/
@Test
public void testwriteFile1() throws Exception {
- AzureBlobFileSystem abfs = getFileSystem();
- Assume.assumeFalse("Namespace enabled account does not support this test",
- getIsNamespaceEnabled(abfs));
- assumeBlobServiceType();
- Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
- NativeAzureFileSystem wasb = getWasbFileSystem();
-
- Path testFile = path("/testReadFile");
- Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID());
- // Write
- try (FSDataOutputStream nativeFsStream = abfs.create(path, true)) {
- nativeFsStream.write(TEST_CONTEXT.getBytes());
- nativeFsStream.flush();
- nativeFsStream.hsync();
- }
+ try (AzureBlobFileSystem abfs = getFileSystem()) {
Review Comment:
Same as above and a few places below, no need to close this.
> ABFS: Fix WASB ABFS compatibility issues
> ----------------------------------------
>
> Key: HADOOP-19604
> URL: https://issues.apache.org/jira/browse/HADOOP-19604
> Project: Hadoop Common
> Issue Type: Sub-task
> Affects Versions: 3.4.1
> Reporter: Anmol Asrani
> Assignee: Anmol Asrani
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.4.1
>
>
> Fix WASB ABFS compatibility issues. Fix issues such as:-
> # BlockId computation to be consistent across clients for PutBlock and
> PutBlockList
> # Restrict url encoding of certain json metadata during setXAttr calls.
> # Maintain the md5 hash of whole block to validate data integrity during
> flush.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]