This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 51e152a85d Gets the correct filesystem when writing an rfile (#5296)
51e152a85d is described below

commit 51e152a85d8406ba1e8c88de5cedb56d94f71120
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Jan 31 15:47:57 2025 -0500

    Gets the correct filesystem when writing an rfile (#5296)
    
    * Gets the correct filesystem when writing an rfile
    
    Modified the rfile client code to get the filesystem based on the path
    being written or read.  This covers the case of having multiple
    filesystems defined in hadoop config.
    
    ---------
    
    Co-authored-by: Daniel Roberts ddanielr <ddani...@gmail.com>
---
 .../accumulo/core/client/rfile/FSConfArgs.java     | 10 +++++-
 .../core/client/rfile/RFileScannerBuilder.java     |  4 +--
 .../core/client/rfile/RFileWriterBuilder.java      |  2 +-
 .../core/client/rfile/RFileClientTest.java         | 41 ++++++++++++++++++++++
 4 files changed, 53 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java
index 4062b4c02b..58aa64671e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java
@@ -22,15 +22,23 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 class FSConfArgs {
 
   FileSystem fs;
   Configuration conf;
 
+  FileSystem getFileSystem(Path path) throws IOException {
+    if (fs == null) {
+      return path.getFileSystem(getConf());
+    }
+    return fs;
+  }
+
   FileSystem getFileSystem() throws IOException {
     if (fs == null) {
-      fs = FileSystem.get(getConf());
+      return FileSystem.get(getConf());
     }
     return fs;
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
index 02b88d1d62..d9ad691cac 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
@@ -56,8 +56,8 @@ class RFileScannerBuilder implements RFile.InputArguments, 
RFile.ScannerFSOption
       if (sources == null) {
         sources = new RFileSource[paths.length];
         for (int i = 0; i < paths.length; i++) {
-          sources[i] = new RFileSource(getFileSystem().open(paths[i]),
-              getFileSystem().getFileStatus(paths[i]).getLen());
+          sources[i] = new RFileSource(getFileSystem(paths[i]).open(paths[i]),
+              getFileSystem(paths[i]).getFileStatus(paths[i]).getLen());
         }
       } else {
         for (int i = 0; i < sources.length; i++) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
index be1850c8c3..6382d568b5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
@@ -119,7 +119,7 @@ class RFileWriterBuilder implements RFile.OutputArguments, 
RFile.WriterFSOptions
           visCacheSize);
     } else {
       return new RFileWriter(fileops.newWriterBuilder()
-          .forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs)
+          .forFile(out.path.toString(), out.getFileSystem(out.path), 
out.getConf(), cs)
           .withTableConfiguration(acuconf).withStartDisabled().build(), 
visCacheSize);
     }
   }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java 
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
index ec07de4ac0..716803b356 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.security.SecureRandom;
 import java.util.AbstractMap;
 import java.util.ArrayList;
@@ -67,6 +68,8 @@ import 
org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
 
@@ -840,4 +843,42 @@ public class RFileClientTest {
     assertEquals(testData, toMap(scanner));
     scanner.close();
   }
+
+  @Test
+  public void testFileSystemFromUri() throws Exception {
+    String localFsClass = "LocalFileSystem";
+
+    String remoteFsHost = "127.0.0.5:8080";
+    String fileUri = "hdfs://" + remoteFsHost + "/bulk-xyx/file1.rf";
+    // There was a bug in the code where the default hadoop file system was 
always used. This test
+    // checks that the hadoop filesystem used it based on the URI and not the 
default filesystem. In
+    // this env the default file system is the local hadoop file system.
+    var exception =
+        assertThrows(ConnectException.class, () -> 
RFile.newWriter().to(fileUri).build());
+    assertTrue(exception.getMessage().contains("to " + remoteFsHost
+        + " failed on connection exception: java.net.ConnectException: 
Connection refused"));
+    // Ensure the DistributedFileSystem was used.
+    assertTrue(Arrays.stream(exception.getStackTrace())
+        .anyMatch(ste -> 
ste.getClassName().contains(DistributedFileSystem.class.getName())));
+    assertTrue(Arrays.stream(exception.getStackTrace())
+        .noneMatch(ste -> ste.getClassName().contains(localFsClass)));
+
+    var exception2 = assertThrows(RuntimeException.class, () -> {
+      var scanner = RFile.newScanner().from(fileUri).build();
+      scanner.iterator();
+    });
+    assertTrue(exception2.getMessage().contains("to " + remoteFsHost
+        + " failed on connection exception: java.net.ConnectException: 
Connection refused"));
+    assertTrue(Arrays.stream(exception2.getCause().getStackTrace())
+        .anyMatch(ste -> 
ste.getClassName().contains(DistributedFileSystem.class.getName())));
+    assertTrue(Arrays.stream(exception2.getCause().getStackTrace())
+        .noneMatch(ste -> ste.getClassName().contains(localFsClass)));
+
+    // verify the assumptions this test is making about the local filesystem 
being the default.
+    var exception3 = assertThrows(IllegalArgumentException.class,
+        () -> FileSystem.get(new Configuration()).open(new Path(fileUri)));
+    assertTrue(exception3.getMessage().contains("Wrong FS: " + fileUri + ", 
expected: file:///"));
+    assertTrue(Arrays.stream(exception3.getStackTrace())
+        .anyMatch(ste -> ste.getClassName().contains(localFsClass)));
+  }
 }

Reply via email to