This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 6cc158275d Add helper method in IT to copy exported distcp files into
given dir (#5840)
6cc158275d is described below
commit 6cc158275ddc9a81a98ff6f0d330d12b23030184
Author: Dom G. <[email protected]>
AuthorDate: Fri Aug 29 11:16:00 2025 -0400
Add helper method in IT to copy exported distcp files into given dir (#5840)
---
.../apache/accumulo/test/ComprehensiveITBase.java | 15 +---
.../org/apache/accumulo/test/ImportExportIT.java | 94 ++++++++--------------
.../apache/accumulo/test/shell/ShellServerIT.java | 27 +------
3 files changed, 38 insertions(+), 98 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
index a6406a7556..02b8bb253b 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
@@ -28,9 +28,7 @@ import static
org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
@@ -105,7 +103,6 @@ import
org.apache.accumulo.core.spi.scan.ConfigurableScanServerSelector;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.test.util.Wait;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
@@ -1379,17 +1376,7 @@ public abstract class ComprehensiveITBase extends
SharedMiniClusterBase {
client.tableOperations().exportTable(srcTable, exportDir);
- fs.mkdirs(new Path(importDir));
- try (var inputStream = fs.open(new Path(exportDir + "/distcp.txt"));
- var inputStreamReader = new InputStreamReader(inputStream);
- var reader = new BufferedReader(inputStreamReader)) {
- String line;
- while ((line = reader.readLine()) != null) {
- var srcPath = new Path(line);
- Path destPath = new Path(importDir, srcPath.getName());
- FileUtil.copy(fs, srcPath, fs, destPath, false, fs.getConf());
- }
- }
+ ImportExportIT.copyExportedFilesToImportDirs(fs, new Path(exportDir), new
Path(importDir));
client.tableOperations().importTable(importTable, importDir);
diff --git a/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
b/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
index 54ef657a00..e35bece0f2 100644
--- a/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
@@ -156,26 +156,7 @@ public class ImportExportIT extends AccumuloClusterHarness
{
// Then export it
client.tableOperations().exportTable(srcTable, exportDir.toString());
- // Make sure the distcp.txt file that exporttable creates is available
- Path distcp = new Path(exportDir, "distcp.txt");
- fs.deleteOnExit(distcp);
- assertTrue(fs.exists(distcp), "Distcp file doesn't exist");
- FSDataInputStream is = fs.open(distcp);
- BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-
- // Copy each file that was exported to one of the imports directory
- String line;
-
- while ((line = reader.readLine()) != null) {
- Path p = new Path(line.substring(5));
- assertTrue(fs.exists(p), "File doesn't exist: " + p);
- Path importDir = importDirAry[RANDOM.get().nextInt(importDirAry.length)];
- Path dest = new Path(importDir, p.getName());
- assertFalse(fs.exists(dest), "Did not expect " + dest + " to exist");
- FileUtil.copy(fs, p, fs, dest, false, fs.getConf());
- }
-
- reader.close();
+ copyExportedFilesToImportDirs(fs, exportDir, importDirAry);
log.info("Import dir A: {}", Arrays.toString(fs.listStatus(importDirA)));
log.info("Import dir B: {}", Arrays.toString(fs.listStatus(importDirB)));
@@ -301,26 +282,7 @@ public class ImportExportIT extends AccumuloClusterHarness
{
// Then export it
client.tableOperations().exportTable(srcTable, exportDir.toString());
- // Make sure the distcp.txt file that exporttable creates is available
- Path distcp = new Path(exportDir, "distcp.txt");
- fs.deleteOnExit(distcp);
- assertTrue(fs.exists(distcp), "Distcp file doesn't exist");
- FSDataInputStream is = fs.open(distcp);
- BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-
- // Copy each file that was exported to one of the imports directory
- String line;
-
- while ((line = reader.readLine()) != null) {
- Path p = new Path(line.substring(5));
- assertTrue(fs.exists(p), "File doesn't exist: " + p);
- Path importDir =
importDirAry[RANDOM.get().nextInt(importDirAry.length)];
- Path dest = new Path(importDir, p.getName());
- assertFalse(fs.exists(dest), "Did not expect " + dest + " to exist");
- FileUtil.copy(fs, p, fs, dest, false, fs.getConf());
- }
-
- reader.close();
+ copyExportedFilesToImportDirs(fs, exportDir, importDirAry);
log.info("Import dir A: {}", Arrays.toString(fs.listStatus(importDirA)));
log.info("Import dir B: {}", Arrays.toString(fs.listStatus(importDirB)));
@@ -467,26 +429,7 @@ public class ImportExportIT extends AccumuloClusterHarness
{
// Then export it
client.tableOperations().exportTable(srcTable, exportDir.toString());
- // Make sure the distcp.txt file that exporttable creates is available
- Path distcp = new Path(exportDir, "distcp.txt");
- fs.deleteOnExit(distcp);
- assertTrue(fs.exists(distcp), "Distcp file doesn't exist");
- FSDataInputStream is = fs.open(distcp);
- BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-
- // Copy each file that was exported to one of the imports directory
- String line;
-
- while ((line = reader.readLine()) != null) {
- Path p = new Path(line.substring(5));
- assertTrue(fs.exists(p), "File doesn't exist: " + p);
- Path importDir =
importDirAry[RANDOM.get().nextInt(importDirAry.length)];
- Path dest = new Path(importDir, p.getName());
- assertFalse(fs.exists(dest), "Did not expect " + dest + " to exist");
- FileUtil.copy(fs, p, fs, dest, false, fs.getConf());
- }
-
- reader.close();
+ copyExportedFilesToImportDirs(fs, exportDir, importDirAry);
log.info("Import dir A: {}", Arrays.toString(fs.listStatus(importDirA)));
log.info("Import dir B: {}", Arrays.toString(fs.listStatus(importDirB)));
@@ -617,6 +560,37 @@ public class ImportExportIT extends AccumuloClusterHarness
{
true));
}
+ /**
+ * Copy exported files from export directory to import directories by
reading distcp.txt. Files
+ * are distributed randomly across the provided import directories. If only
one import directory
+ * is provided, all files will go to that directory.
+ *
+ * @param fs the filesystem to use
+ * @param exportDir the export directory containing distcp.txt and exported
files
+ * @param importDirs array of import directories to distribute files to (can
be single dir)
+ */
+ public static void copyExportedFilesToImportDirs(FileSystem fs, Path
exportDir,
+ Path... importDirs) throws IOException {
+ for (Path importDir : importDirs) {
+ assertTrue(fs.mkdirs(importDir), "Failed to create import directory: " +
importDir);
+ }
+
+ Path distcp = new Path(exportDir, "distcp.txt");
+ assertTrue(fs.exists(distcp), "Distcp file doesn't exist: " + distcp);
+
+ try (FSDataInputStream is = fs.open(distcp); InputStreamReader in = new
InputStreamReader(is);
+ BufferedReader reader = new BufferedReader(in)) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ Path srcPath = new Path(line);
+ assertTrue(fs.exists(srcPath), "Source file doesn't exist: " +
srcPath);
+ Path importDir = importDirs[RANDOM.get().nextInt(importDirs.length)];
+ Path destPath = new Path(importDir, srcPath.getName());
+ FileUtil.copy(fs, srcPath, fs, destPath, false, fs.getConf());
+ }
+ }
+ }
+
public static Path createBaseDir(AccumuloCluster cluster, Class<?> clazz)
throws IOException {
FileSystem fs = cluster.getFileSystem();
log.info("Using FileSystem: " + fs);
diff --git
a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
index 87b087e6ce..a1dcc7891a 100644
--- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
@@ -31,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
-import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
@@ -89,6 +88,7 @@ import org.apache.accumulo.core.util.format.FormatterConfig;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.ImportExportIT;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
@@ -187,7 +187,6 @@ public class ShellServerIT extends SharedMiniClusterBase {
java.nio.file.Path exportDir =
java.nio.file.Path.of(rootPath).resolve("ShellServerIT.export");
String exportUri = "file://" + exportDir;
- String localTmp = "file://" +
java.nio.file.Path.of(rootPath).resolve("ShellServerIT.tmp");
ts.exec("exporttable -t " + table + " " + exportUri, true);
DistCp cp = new DistCp(new Configuration(false), null);
String import_ = "file://" +
java.nio.file.Path.of(rootPath).resolve("ShellServerIT.import");
@@ -196,29 +195,9 @@ public class ShellServerIT extends SharedMiniClusterBase {
// DistCp bugs out trying to get a fs delegation token to perform the
cp. Just copy it
// ourselves by hand.
FileSystem fs = getCluster().getFileSystem();
- FileSystem localFs = FileSystem.getLocal(new Configuration(false));
-
- // Path on local fs to cp into
- Path localTmpPath = new Path(localTmp);
- localFs.mkdirs(localTmpPath);
-
- // Path in remote fs to importtable from
Path importDir = new Path(import_);
- fs.mkdirs(importDir);
-
- // Implement a poor-man's DistCp
- try (BufferedReader reader =
- Files.newBufferedReader(exportDir.resolve("distcp.txt"), UTF_8)) {
- for (String line; (line = reader.readLine()) != null;) {
- Path exportedFile = new Path(line);
- // There isn't a cp on FileSystem??
- log.info("Copying {} to {}", line, localTmpPath);
- fs.copyToLocalFile(exportedFile, localTmpPath);
- Path tmpFile = new Path(localTmpPath, exportedFile.getName());
- log.info("Moving {} to the import directory {}", tmpFile,
importDir);
- fs.moveFromLocalFile(tmpFile, importDir);
- }
- }
+ Path exportPath = new Path(exportDir.toUri());
+ ImportExportIT.copyExportedFilesToImportDirs(fs, exportPath,
importDir);
} else {
String[] distCpArgs = {"-f", exportUri + "/distcp.txt", import_};
assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " +
Arrays.toString(distCpArgs));