This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 51e152a85d Gets the correct filesystem when writing an rfile (#5296) 51e152a85d is described below commit 51e152a85d8406ba1e8c88de5cedb56d94f71120 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Jan 31 15:47:57 2025 -0500 Gets the correct filesystem when writing an rfile (#5296) * Gets the correct filesystem when writing an rfile Modified the rfile client code to get the filesystem based on the path being written or read. This covers the case of having multiple filesystems defined in hadoop config. --------- Co-authored-by: Daniel Roberts ddanielr <ddani...@gmail.com> --- .../accumulo/core/client/rfile/FSConfArgs.java | 10 +++++- .../core/client/rfile/RFileScannerBuilder.java | 4 +-- .../core/client/rfile/RFileWriterBuilder.java | 2 +- .../core/client/rfile/RFileClientTest.java | 41 ++++++++++++++++++++++ 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java index 4062b4c02b..58aa64671e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java @@ -22,15 +22,23 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; class FSConfArgs { FileSystem fs; Configuration conf; + FileSystem getFileSystem(Path path) throws IOException { + if (fs == null) { + return path.getFileSystem(getConf()); + } + return fs; + } + FileSystem getFileSystem() throws IOException { if (fs == null) { - fs = FileSystem.get(getConf()); + return FileSystem.get(getConf()); } return fs; } diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java index 02b88d1d62..d9ad691cac 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java @@ -56,8 +56,8 @@ class RFileScannerBuilder implements RFile.InputArguments, RFile.ScannerFSOption if (sources == null) { sources = new RFileSource[paths.length]; for (int i = 0; i < paths.length; i++) { - sources[i] = new RFileSource(getFileSystem().open(paths[i]), - getFileSystem().getFileStatus(paths[i]).getLen()); + sources[i] = new RFileSource(getFileSystem(paths[i]).open(paths[i]), + getFileSystem(paths[i]).getFileStatus(paths[i]).getLen()); } } else { for (int i = 0; i < sources.length; i++) { diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java index be1850c8c3..6382d568b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@ -119,7 +119,7 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions visCacheSize); } else { return new RFileWriter(fileops.newWriterBuilder() - .forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs) + .forFile(out.path.toString(), out.getFileSystem(out.path), out.getConf(), cs) .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize); } } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index ec07de4ac0..716803b356 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.fail; import java.io.File; import java.io.IOException; +import java.net.ConnectException; import java.security.SecureRandom; import java.util.AbstractMap; import java.util.ArrayList; @@ -67,6 +68,8 @@ import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -840,4 +843,42 @@ public class RFileClientTest { assertEquals(testData, toMap(scanner)); scanner.close(); } + + @Test + public void testFileSystemFromUri() throws Exception { + String localFsClass = "LocalFileSystem"; + + String remoteFsHost = "127.0.0.5:8080"; + String fileUri = "hdfs://" + remoteFsHost + "/bulk-xyx/file1.rf"; + // There was a bug in the code where the default hadoop file system was always used. This test + // checks that the hadoop filesystem used it based on the URI and not the default filesystem. In + // this env the default file system is the local hadoop file system. + var exception = + assertThrows(ConnectException.class, () -> RFile.newWriter().to(fileUri).build()); + assertTrue(exception.getMessage().contains("to " + remoteFsHost + + " failed on connection exception: java.net.ConnectException: Connection refused")); + // Ensure the DistributedFileSystem was used. + assertTrue(Arrays.stream(exception.getStackTrace()) + .anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName()))); + assertTrue(Arrays.stream(exception.getStackTrace()) + .noneMatch(ste -> ste.getClassName().contains(localFsClass))); + + var exception2 = assertThrows(RuntimeException.class, () -> { + var scanner = RFile.newScanner().from(fileUri).build(); + scanner.iterator(); + }); + assertTrue(exception2.getMessage().contains("to " + remoteFsHost + + " failed on connection exception: java.net.ConnectException: Connection refused")); + assertTrue(Arrays.stream(exception2.getCause().getStackTrace()) + .anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName()))); + assertTrue(Arrays.stream(exception2.getCause().getStackTrace()) + .noneMatch(ste -> ste.getClassName().contains(localFsClass))); + + // verify the assumptions this test is making about the local filesystem being the default. + var exception3 = assertThrows(IllegalArgumentException.class, + () -> FileSystem.get(new Configuration()).open(new Path(fileUri))); + assertTrue(exception3.getMessage().contains("Wrong FS: " + fileUri + ", expected: file:///")); + assertTrue(Arrays.stream(exception3.getStackTrace()) + .anyMatch(ste -> ste.getClassName().contains(localFsClass))); + } }