This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cc158275d Add helper method in IT to copy exported distcp files into 
given dir (#5840)
6cc158275d is described below

commit 6cc158275ddc9a81a98ff6f0d330d12b23030184
Author: Dom G. <[email protected]>
AuthorDate: Fri Aug 29 11:16:00 2025 -0400

    Add helper method in IT to copy exported distcp files into given dir (#5840)
---
 .../apache/accumulo/test/ComprehensiveITBase.java  | 15 +---
 .../org/apache/accumulo/test/ImportExportIT.java   | 94 ++++++++--------------
 .../apache/accumulo/test/shell/ShellServerIT.java  | 27 +------
 3 files changed, 38 insertions(+), 98 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java 
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
index a6406a7556..02b8bb253b 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
@@ -28,9 +28,7 @@ import static 
org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -105,7 +103,6 @@ import 
org.apache.accumulo.core.spi.scan.ConfigurableScanServerSelector;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.test.util.Wait;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
@@ -1379,17 +1376,7 @@ public abstract class ComprehensiveITBase extends 
SharedMiniClusterBase {
 
     client.tableOperations().exportTable(srcTable, exportDir);
 
-    fs.mkdirs(new Path(importDir));
-    try (var inputStream = fs.open(new Path(exportDir + "/distcp.txt"));
-        var inputStreamReader = new InputStreamReader(inputStream);
-        var reader = new BufferedReader(inputStreamReader)) {
-      String line;
-      while ((line = reader.readLine()) != null) {
-        var srcPath = new Path(line);
-        Path destPath = new Path(importDir, srcPath.getName());
-        FileUtil.copy(fs, srcPath, fs, destPath, false, fs.getConf());
-      }
-    }
+    ImportExportIT.copyExportedFilesToImportDirs(fs, new Path(exportDir), new 
Path(importDir));
 
     client.tableOperations().importTable(importTable, importDir);
 
diff --git a/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java 
b/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
index 54ef657a00..e35bece0f2 100644
--- a/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
@@ -156,26 +156,7 @@ public class ImportExportIT extends AccumuloClusterHarness 
{
     // Then export it
     client.tableOperations().exportTable(srcTable, exportDir.toString());
 
-    // Make sure the distcp.txt file that exporttable creates is available
-    Path distcp = new Path(exportDir, "distcp.txt");
-    fs.deleteOnExit(distcp);
-    assertTrue(fs.exists(distcp), "Distcp file doesn't exist");
-    FSDataInputStream is = fs.open(distcp);
-    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-
-    // Copy each file that was exported to one of the imports directory
-    String line;
-
-    while ((line = reader.readLine()) != null) {
-      Path p = new Path(line.substring(5));
-      assertTrue(fs.exists(p), "File doesn't exist: " + p);
-      Path importDir = importDirAry[RANDOM.get().nextInt(importDirAry.length)];
-      Path dest = new Path(importDir, p.getName());
-      assertFalse(fs.exists(dest), "Did not expect " + dest + " to exist");
-      FileUtil.copy(fs, p, fs, dest, false, fs.getConf());
-    }
-
-    reader.close();
+    copyExportedFilesToImportDirs(fs, exportDir, importDirAry);
 
     log.info("Import dir A: {}", Arrays.toString(fs.listStatus(importDirA)));
     log.info("Import dir B: {}", Arrays.toString(fs.listStatus(importDirB)));
@@ -301,26 +282,7 @@ public class ImportExportIT extends AccumuloClusterHarness 
{
       // Then export it
       client.tableOperations().exportTable(srcTable, exportDir.toString());
 
-      // Make sure the distcp.txt file that exporttable creates is available
-      Path distcp = new Path(exportDir, "distcp.txt");
-      fs.deleteOnExit(distcp);
-      assertTrue(fs.exists(distcp), "Distcp file doesn't exist");
-      FSDataInputStream is = fs.open(distcp);
-      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-
-      // Copy each file that was exported to one of the imports directory
-      String line;
-
-      while ((line = reader.readLine()) != null) {
-        Path p = new Path(line.substring(5));
-        assertTrue(fs.exists(p), "File doesn't exist: " + p);
-        Path importDir = 
importDirAry[RANDOM.get().nextInt(importDirAry.length)];
-        Path dest = new Path(importDir, p.getName());
-        assertFalse(fs.exists(dest), "Did not expect " + dest + " to exist");
-        FileUtil.copy(fs, p, fs, dest, false, fs.getConf());
-      }
-
-      reader.close();
+      copyExportedFilesToImportDirs(fs, exportDir, importDirAry);
 
       log.info("Import dir A: {}", Arrays.toString(fs.listStatus(importDirA)));
       log.info("Import dir B: {}", Arrays.toString(fs.listStatus(importDirB)));
@@ -467,26 +429,7 @@ public class ImportExportIT extends AccumuloClusterHarness 
{
       // Then export it
       client.tableOperations().exportTable(srcTable, exportDir.toString());
 
-      // Make sure the distcp.txt file that exporttable creates is available
-      Path distcp = new Path(exportDir, "distcp.txt");
-      fs.deleteOnExit(distcp);
-      assertTrue(fs.exists(distcp), "Distcp file doesn't exist");
-      FSDataInputStream is = fs.open(distcp);
-      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-
-      // Copy each file that was exported to one of the imports directory
-      String line;
-
-      while ((line = reader.readLine()) != null) {
-        Path p = new Path(line.substring(5));
-        assertTrue(fs.exists(p), "File doesn't exist: " + p);
-        Path importDir = 
importDirAry[RANDOM.get().nextInt(importDirAry.length)];
-        Path dest = new Path(importDir, p.getName());
-        assertFalse(fs.exists(dest), "Did not expect " + dest + " to exist");
-        FileUtil.copy(fs, p, fs, dest, false, fs.getConf());
-      }
-
-      reader.close();
+      copyExportedFilesToImportDirs(fs, exportDir, importDirAry);
 
       log.info("Import dir A: {}", Arrays.toString(fs.listStatus(importDirA)));
       log.info("Import dir B: {}", Arrays.toString(fs.listStatus(importDirB)));
@@ -617,6 +560,37 @@ public class ImportExportIT extends AccumuloClusterHarness 
{
             true));
   }
 
+  /**
+   * Copy exported files from export directory to import directories by 
reading distcp.txt. Files
+   * are distributed randomly across the provided import directories. If only 
one import directory
+   * is provided, all files will go to that directory.
+   *
+   * @param fs the filesystem to use
+   * @param exportDir the export directory containing distcp.txt and exported 
files
+   * @param importDirs array of import directories to distribute files to (can 
be single dir)
+   */
+  public static void copyExportedFilesToImportDirs(FileSystem fs, Path 
exportDir,
+      Path... importDirs) throws IOException {
+    for (Path importDir : importDirs) {
+      assertTrue(fs.mkdirs(importDir), "Failed to create import directory: " + 
importDir);
+    }
+
+    Path distcp = new Path(exportDir, "distcp.txt");
+    assertTrue(fs.exists(distcp), "Distcp file doesn't exist: " + distcp);
+
+    try (FSDataInputStream is = fs.open(distcp); InputStreamReader in = new 
InputStreamReader(is);
+        BufferedReader reader = new BufferedReader(in)) {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        Path srcPath = new Path(line);
+        assertTrue(fs.exists(srcPath), "Source file doesn't exist: " + 
srcPath);
+        Path importDir = importDirs[RANDOM.get().nextInt(importDirs.length)];
+        Path destPath = new Path(importDir, srcPath.getName());
+        FileUtil.copy(fs, srcPath, fs, destPath, false, fs.getConf());
+      }
+    }
+  }
+
   public static Path createBaseDir(AccumuloCluster cluster, Class<?> clazz) 
throws IOException {
     FileSystem fs = cluster.getFileSystem();
     log.info("Using FileSystem: " + fs);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java 
b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
index 87b087e6ce..a1dcc7891a 100644
--- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
@@ -31,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.junit.jupiter.api.Assumptions.assumeFalse;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
-import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
@@ -89,6 +88,7 @@ import org.apache.accumulo.core.util.format.FormatterConfig;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.ImportExportIT;
 import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
@@ -187,7 +187,6 @@ public class ShellServerIT extends SharedMiniClusterBase {
       java.nio.file.Path exportDir =
           java.nio.file.Path.of(rootPath).resolve("ShellServerIT.export");
       String exportUri = "file://" + exportDir;
-      String localTmp = "file://" + 
java.nio.file.Path.of(rootPath).resolve("ShellServerIT.tmp");
       ts.exec("exporttable -t " + table + " " + exportUri, true);
       DistCp cp = new DistCp(new Configuration(false), null);
       String import_ = "file://" + 
java.nio.file.Path.of(rootPath).resolve("ShellServerIT.import");
@@ -196,29 +195,9 @@ public class ShellServerIT extends SharedMiniClusterBase {
         // DistCp bugs out trying to get a fs delegation token to perform the 
cp. Just copy it
         // ourselves by hand.
         FileSystem fs = getCluster().getFileSystem();
-        FileSystem localFs = FileSystem.getLocal(new Configuration(false));
-
-        // Path on local fs to cp into
-        Path localTmpPath = new Path(localTmp);
-        localFs.mkdirs(localTmpPath);
-
-        // Path in remote fs to importtable from
         Path importDir = new Path(import_);
-        fs.mkdirs(importDir);
-
-        // Implement a poor-man's DistCp
-        try (BufferedReader reader =
-            Files.newBufferedReader(exportDir.resolve("distcp.txt"), UTF_8)) {
-          for (String line; (line = reader.readLine()) != null;) {
-            Path exportedFile = new Path(line);
-            // There isn't a cp on FileSystem??
-            log.info("Copying {} to {}", line, localTmpPath);
-            fs.copyToLocalFile(exportedFile, localTmpPath);
-            Path tmpFile = new Path(localTmpPath, exportedFile.getName());
-            log.info("Moving {} to the import directory {}", tmpFile, 
importDir);
-            fs.moveFromLocalFile(tmpFile, importDir);
-          }
-        }
+        Path exportPath = new Path(exportDir.toUri());
+        ImportExportIT.copyExportedFilesToImportDirs(fs, exportPath, 
importDir);
       } else {
         String[] distCpArgs = {"-f", exportUri + "/distcp.txt", import_};
         assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " + 
Arrays.toString(distCpArgs));

Reply via email to