This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ae8781959c [Feature] Add an option to search input files recursively in ingestion job. The default is set to true to be backward compatible. (#9265) ae8781959c is described below commit ae8781959ce0837f89767aec08783ac71c27f25c Author: Yao Liu <y...@startree.ai> AuthorDate: Wed Aug 24 14:54:41 2022 -0700 [Feature] Add an option to search input files recursively in ingestion job. The default is set to true to be backward compatible. (#9265) --- .../segment/generation/SegmentGenerationUtils.java | 60 +++++++++++ .../generation/SegmentGenerationUtilsTest.java | 118 +++++++++++++++++++++ .../hadoop/HadoopSegmentGenerationJobRunner.java | 85 +++++---------- .../spark/SparkSegmentGenerationJobRunner.java | 49 +-------- .../spark/SparkSegmentGenerationJobRunnerTest.java | 1 + .../spark3/SparkSegmentGenerationJobRunner.java | 49 +-------- .../SparkSegmentGenerationJobRunnerTest.java | 1 + .../standalone/SegmentGenerationJobRunner.java | 35 +----- .../standalone/SegmentGenerationJobRunnerTest.java | 1 + .../batch/spec/SegmentGenerationJobSpec.java | 14 +++ .../ingestion/batch/IngestionJobLauncherTest.java | 4 + .../resources/ingestion_job_spec_template.yaml | 1 + 12 files changed, 234 insertions(+), 184 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java index 121f2da164..d25372bf37 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtils.java @@ -28,6 +28,12 @@ import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.PathMatcher; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.TableConfig; @@ -229,4 +235,58 @@ public class SegmentGenerationUtils { } return IOUtils.toString(connection.getInputStream(), StandardCharsets.UTF_8); } + + + /** + * @param pinotFs root directory fs + * @param fileUri root directory uri + * @param includePattern optional glob patterns for files to include + * @param excludePattern optional glob patterns for files to exclude + * @param searchRecursively if ture, search files recursively from directory specified in fileUri + * @return list of matching files. + * @throws IOException on IO failure for list files in root directory. + * @throws URISyntaxException for matching file URIs + * @throws RuntimeException if there is no matching file. + */ + public static List<String> listMatchedFilesWithRecursiveOption(PinotFS pinotFs, URI fileUri, + @Nullable String includePattern, @Nullable String excludePattern, boolean searchRecursively) + throws Exception { + String[] files; + // listFiles throws IOException + files = pinotFs.listFiles(fileUri, searchRecursively); + //TODO: sort input files based on creation time + PathMatcher includeFilePathMatcher = null; + if (includePattern != null) { + includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(includePattern); + } + PathMatcher excludeFilePathMatcher = null; + if (excludePattern != null) { + excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(excludePattern); + } + List<String> filteredFiles = new ArrayList<>(); + for (String file : files) { + if (includeFilePathMatcher != null) { + if (!includeFilePathMatcher.matches(Paths.get(file))) { + continue; + } + } + if (excludeFilePathMatcher != null) { + if (excludeFilePathMatcher.matches(Paths.get(file))) { + continue; + } + } + if (!pinotFs.isDirectory(new URI(file))) { + // In case PinotFS implementations list files without a scheme (e.g. hdfs://), then we may lose it in the + // input file path. Call SegmentGenerationUtils.getFileURI() to fix this up. + // getFileURI throws URISyntaxException + filteredFiles.add(SegmentGenerationUtils.getFileURI(file, fileUri).toString()); + } + } + if (filteredFiles.isEmpty()) { + throw new RuntimeException(String.format( + "No file found in the input directory: %s matching includeFileNamePattern: %s," + + " excludeFileNamePattern: %s", fileUri, includePattern, excludePattern)); + } + return filteredFiles; + } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtilsTest.java index 525c136ca2..cf2cbaf749 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/segment/generation/SegmentGenerationUtilsTest.java @@ -19,10 +19,18 @@ package org.apache.pinot.common.segment.generation; +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; +import java.nio.file.Files; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.testng.Assert; import org.testng.annotations.Test; @@ -124,4 +132,114 @@ public class SegmentGenerationUtilsTest { throws URISyntaxException { validateFileURI(directoryURI, directoryURI.toString()); } + + // Test that we search files recursively when recursiveSearch option is set to true. + @Test + public void testMatchFilesRecursiveSearchOnRecursiveInputFilePattern() + throws Exception { + File testDir = makeTestDir(); + File inputDir = new File(testDir, "input"); + File inputSubDir1 = new File(inputDir, "2009"); + inputSubDir1.mkdirs(); + + File inputFile1 = new File(inputDir, "input.csv"); + FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", "value1,1", "value2,2")); + + File inputFile2 = new File(inputSubDir1, "input.csv"); + FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", "value3,3", "value4,4")); + URI inputDirURI = new URI(inputDir.getAbsolutePath()); + if (inputDirURI.getScheme() == null) { + inputDirURI = new File(inputDir.getAbsolutePath()).toURI(); + } + PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + String includePattern = "glob:" + inputDir.getAbsolutePath() + "/**.csv"; + List<String> files = + SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, includePattern, null, true); + Assert.assertEquals(files.size(), 2); + } + + // Test that we don't search files recursively when recursiveSearch option is set to false. + @Test + public void testMatchFilesRecursiveSearchOnNonRecursiveInputFilePattern() + throws Exception { + File testDir = makeTestDir(); + File inputDir = new File(testDir, "dir"); + File inputSubDir1 = new File(inputDir, "2009"); + inputSubDir1.mkdirs(); + + File inputFile1 = new File(inputDir, "input.csv"); + FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", "value1,1", "value2,2")); + + File inputFile2 = new File(inputSubDir1, "input.csv"); + FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", "value3,3", "value4,4")); + URI inputDirURI = new URI(inputDir.getAbsolutePath()); + if (inputDirURI.getScheme() == null) { + inputDirURI = new File(inputDir.getAbsolutePath()).toURI(); + } + PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + String includePattern = "glob:" + inputDir.getAbsolutePath() + "/*.csv"; + + List<String> files = + SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, includePattern, null, + false); + Assert.assertEquals(files.size(), 1); + } + + // Test that we exclude files that match exclude pattern. + @Test + public void testMatchFilesRecursiveSearchExcludeFilePattern() + throws Exception { + File testDir = makeTestDir(); + File inputDir = new File(testDir, "dir"); + File inputSubDir1 = new File(inputDir, "2009"); + inputSubDir1.mkdirs(); + + File inputFile1 = new File(inputDir, "input1.csv"); + FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", "value1,1", "value2,2")); + + File inputFile2 = new File(inputSubDir1, "input2.csv"); + FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", "value3,3", "value4,4")); + URI inputDirURI = new URI(inputDir.getAbsolutePath()); + if (inputDirURI.getScheme() == null) { + inputDirURI = new File(inputDir.getAbsolutePath()).toURI(); + } + PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + String includePattern = "glob:" + inputDir.getAbsolutePath() + "/**.csv"; + String excludePattern = "glob:" + inputDir.getAbsolutePath() + "/2009/input2.csv"; + + List<String> files = + SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, includePattern, + excludePattern, true); + Assert.assertEquals(files.size(), 1); + } + + // Test that we throw an exception when there is no file matching. + @Test + public void testEmptyMatchFiles() + throws Exception { + File testDir = makeTestDir(); + File inputDir = new File(testDir, "dir"); + File inputSubDir1 = new File(inputDir, "2009"); + inputSubDir1.mkdirs(); + + File inputFile1 = new File(inputDir, "input1.csv"); + FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", "value1,1", "value2,2")); + + File inputFile2 = new File(inputSubDir1, "input2.csv"); + FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", "value3,3", "value4,4")); + URI inputDirURI = new File(inputDir.getAbsolutePath()).toURI(); + PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + String includePattern = "glob:" + inputDir.getAbsolutePath() + "/**.json"; + Assert.assertThrows(RuntimeException.class, () -> { + SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, includePattern, null, true); + }); + } + + private File makeTestDir() + throws IOException { + File testDir = Files.createTempDirectory("testSegmentGeneration-").toFile(); + testDir.delete(); + testDir.mkdirs(); + return testDir; + } } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index 0608fd7192..0ea86376ec 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -26,8 +26,6 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; -import java.nio.file.FileSystems; -import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; @@ -142,13 +140,14 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); } - - //Get pinotFS for input + //Get list of files to process URI inputDirURI = new URI(_spec.getInputDirURI()); if (inputDirURI.getScheme() == null) { inputDirURI = new File(_spec.getInputDirURI()).toURI(); } PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); + List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, + _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); //Get outputFS for writing output pinot segments URI outputDirURI = new URI(_spec.getOutputDirURI()); @@ -181,71 +180,35 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge Path stagingSegmentTarUri = new Path(stagingDirURI.toString(), SEGMENT_TAR_SUBDIR_NAME); outputDirFS.mkdir(stagingSegmentTarUri.toUri()); - //Get list of files to process - String[] files = inputDirFS.listFiles(inputDirURI, true); - - //TODO: sort input files based on creation time - List<String> filteredFiles = new ArrayList<>(); - PathMatcher includeFilePathMatcher = null; - if (_spec.getIncludeFileNamePattern() != null) { - includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern()); - } - PathMatcher excludeFilePathMatcher = null; - if (_spec.getExcludeFileNamePattern() != null) { - excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern()); - } + // numDataFiles is guaranteed to be greater than zero since listMatchedFilesWithRecursiveOption will throw + // runtime exception if the matched files list is empty. + int numDataFiles = filteredFiles.size(); - for (String file : files) { - if (includeFilePathMatcher != null) { - if (!includeFilePathMatcher.matches(Paths.get(file))) { - continue; + LOGGER.info("Creating segments with data files: {}", filteredFiles); + if (!SegmentGenerationJobUtils.useGlobalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) { + Map<String, List<String>> localDirIndex = new HashMap<>(); + for (String filteredFile : filteredFiles) { + java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent(); + if (!localDirIndex.containsKey(filteredParentPath.toString())) { + localDirIndex.put(filteredParentPath.toString(), new ArrayList<>()); } + localDirIndex.get(filteredParentPath.toString()).add(filteredFile); } - if (excludeFilePathMatcher != null) { - if (excludeFilePathMatcher.matches(Paths.get(file))) { - continue; + for (String parentPath : localDirIndex.keySet()) { + List<String> siblingFiles = localDirIndex.get(parentPath); + Collections.sort(siblingFiles); + for (int i = 0; i < siblingFiles.size(); i++) { + URI inputFileURI = SegmentGenerationUtils.getFileURI(siblingFiles.get(i), + SegmentGenerationUtils.getDirectoryURI(parentPath)); + createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i); } } - if (!inputDirFS.isDirectory(new URI(file))) { - filteredFiles.add(file); - } - } - - int numDataFiles = filteredFiles.size(); - if (numDataFiles == 0) { - String errorMessage = String - .format("No data file founded in [%s], with include file pattern: [%s] and exclude file pattern [%s]", - _spec.getInputDirURI(), _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern()); - LOGGER.error(errorMessage); - throw new RuntimeException(errorMessage); } else { - LOGGER.info("Creating segments with data files: {}", filteredFiles); - if (!SegmentGenerationJobUtils.useGlobalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) { - Map<String, List<String>> localDirIndex = new HashMap<>(); - for (String filteredFile : filteredFiles) { - java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent(); - if (!localDirIndex.containsKey(filteredParentPath.toString())) { - localDirIndex.put(filteredParentPath.toString(), new ArrayList<>()); - } - localDirIndex.get(filteredParentPath.toString()).add(filteredFile); - } - for (String parentPath : localDirIndex.keySet()) { - List<String> siblingFiles = localDirIndex.get(parentPath); - Collections.sort(siblingFiles); - for (int i = 0; i < siblingFiles.size(); i++) { - URI inputFileURI = SegmentGenerationUtils - .getFileURI(siblingFiles.get(i), SegmentGenerationUtils.getDirectoryURI(parentPath)); - createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i); - } - } - } else { - for (int i = 0; i < numDataFiles; i++) { - URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI); - createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i); - } + for (int i = 0; i < numDataFiles; i++) { + URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI); + createInputFileUriAndSeqIdFile(inputFileURI, outputDirFS, stagingInputDir, i); } } - try { // Set up the job Job job = Job.getInstance(getConf()); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index ed73a2b719..ceaf2b1b9c 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -23,9 +23,7 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.net.URLEncoder; -import java.nio.file.FileSystems; import java.nio.file.Path; -import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; @@ -128,14 +126,15 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); } - - //Get pinotFS for input + //Get list of files to process URI inputDirURI = new URI(_spec.getInputDirURI()); if (inputDirURI.getScheme() == null) { inputDirURI = new File(_spec.getInputDirURI()).toURI(); } PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); - + List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, + _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); + LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size()); //Get outputFS for writing output pinot segments URI outputDirURI = new URI(_spec.getOutputDirURI()); if (outputDirURI.getScheme() == null) { @@ -159,46 +158,6 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri } outputDirFS.mkdir(stagingDirURI); } - //Get list of files to process - String[] files = inputDirFS.listFiles(inputDirURI, true); - - //TODO: sort input files based on creation time - List<String> filteredFiles = new ArrayList<>(); - PathMatcher includeFilePathMatcher = null; - if (_spec.getIncludeFileNamePattern() != null) { - includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern()); - } - PathMatcher excludeFilePathMatcher = null; - if (_spec.getExcludeFileNamePattern() != null) { - excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern()); - } - - for (String file : files) { - if (includeFilePathMatcher != null) { - if (!includeFilePathMatcher.matches(Paths.get(file))) { - continue; - } - } - if (excludeFilePathMatcher != null) { - if (excludeFilePathMatcher.matches(Paths.get(file))) { - continue; - } - } - if (!inputDirFS.isDirectory(new URI(file))) { - // In case PinotFS implementations list files without a scheme (e.g. hdfs://), then we may lose it in the - // input file path. Call SegmentGenerationUtils.getFileURI() to fix this up. - filteredFiles.add(SegmentGenerationUtils.getFileURI(file, inputDirURI).toString()); - } - } - - if (filteredFiles.isEmpty()) { - throw new RuntimeException( - String.format("No file found in the input directory: %s matching includeFileNamePattern: %s," - + " excludeFileNamePattern: %s", _spec.getInputDirURI(), _spec.getIncludeFileNamePattern(), - _spec.getExcludeFileNamePattern())); - } - - LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size()); try { JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java index 3a5de7b0e1..4556f9fc65 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunnerTest.java @@ -192,6 +192,7 @@ public class SparkSegmentGenerationJobRunnerTest { SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec(); jobSpec.setJobType("SegmentCreation"); jobSpec.setInputDirURI(inputDir.toURI().toString()); + jobSpec.setSearchRecursively(true); jobSpec.setOutputDirURI(outputDir.toURI().toString()); jobSpec.setOverwriteOutput(true); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java index 6300040d4e..46db5818a2 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java @@ -23,9 +23,7 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.net.URLEncoder; -import java.nio.file.FileSystems; import java.nio.file.Path; -import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; @@ -128,14 +126,15 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri for (PinotFSSpec pinotFSSpec : pinotFSSpecs) { PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec)); } - - //Get pinotFS for input + //Get list of files to process URI inputDirURI = new URI(_spec.getInputDirURI()); if (inputDirURI.getScheme() == null) { inputDirURI = new File(_spec.getInputDirURI()).toURI(); } PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme()); - + List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(inputDirFS, inputDirURI, + _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); + LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size()); //Get outputFS for writing output pinot segments URI outputDirURI = new URI(_spec.getOutputDirURI()); if (outputDirURI.getScheme() == null) { @@ -159,46 +158,6 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri } outputDirFS.mkdir(stagingDirURI); } - //Get list of files to process - String[] files = inputDirFS.listFiles(inputDirURI, true); - - //TODO: sort input files based on creation time - List<String> filteredFiles = new ArrayList<>(); - PathMatcher includeFilePathMatcher = null; - if (_spec.getIncludeFileNamePattern() != null) { - includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern()); - } - PathMatcher excludeFilePathMatcher = null; - if (_spec.getExcludeFileNamePattern() != null) { - excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern()); - } - - for (String file : files) { - if (includeFilePathMatcher != null) { - if (!includeFilePathMatcher.matches(Paths.get(file))) { - continue; - } - } - if (excludeFilePathMatcher != null) { - if (excludeFilePathMatcher.matches(Paths.get(file))) { - continue; - } - } - if (!inputDirFS.isDirectory(new URI(file))) { - // In case PinotFS implementations list files without a scheme (e.g. hdfs://), then we may lose it in the - // input file path. Call SegmentGenerationUtils.getFileURI() to fix this up. - filteredFiles.add(SegmentGenerationUtils.getFileURI(file, inputDirURI).toString()); - } - } - - if (filteredFiles.isEmpty()) { - throw new RuntimeException( - String.format("No file found in the input directory: %s matching includeFileNamePattern: %s," - + " excludeFileNamePattern: %s", _spec.getInputDirURI(), _spec.getIncludeFileNamePattern(), - _spec.getExcludeFileNamePattern())); - } - - LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size()); try { JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java index 96c11e893b..52ea6d2d9d 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/test/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.java @@ -192,6 +192,7 @@ public class SparkSegmentGenerationJobRunnerTest { SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec(); jobSpec.setJobType("SegmentCreation"); jobSpec.setInputDirURI(inputDir.toURI().toString()); + jobSpec.setSearchRecursively(true); jobSpec.setOutputDirURI(outputDir.toURI().toString()); jobSpec.setOverwriteOutput(true); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java index 51e65f45b6..a3d721ed53 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; -import java.nio.file.FileSystems; -import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; @@ -172,37 +170,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { public void run() throws Exception { // Get list of files to process. - String[] files = _inputDirFS.listFiles(_inputDirURI, true); - - // TODO - sort input files by modification timestamp. Though this is problematic because: - // a. It can put more load on the external filesystem (e.g. S3), and - // b. The call to Collections.sort(siblingFiles) below will reorder files by name. - - List<String> filteredFiles = new ArrayList<>(); - PathMatcher includeFilePathMatcher = null; - if (_spec.getIncludeFileNamePattern() != null) { - includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern()); - } - PathMatcher excludeFilePathMatcher = null; - if (_spec.getExcludeFileNamePattern() != null) { - excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern()); - } - - for (String file : files) { - if (includeFilePathMatcher != null) { - if (!includeFilePathMatcher.matches(Paths.get(file))) { - continue; - } - } - if (excludeFilePathMatcher != null) { - if (excludeFilePathMatcher.matches(Paths.get(file))) { - continue; - } - } - if (!_inputDirFS.isDirectory(new URI(file))) { - filteredFiles.add(file); - } - } + List<String> filteredFiles = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(_inputDirFS, _inputDirURI, + _spec.getIncludeFileNamePattern(), _spec.getExcludeFileNamePattern(), _spec.isSearchRecursively()); File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID()); try { diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java index cd506fba2c..cd1ec2256b 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java @@ -125,6 +125,7 @@ public class SegmentGenerationJobRunnerTest { File schemaFile = makeSchemaFile(testDir, schemaName); File tableConfigFile = makeTableConfigFile(testDir, schemaName); SegmentGenerationJobSpec jobSpec = makeJobSpec(inputDir, outputDir, schemaFile, tableConfigFile); + jobSpec.setSearchRecursively(true); SegmentGenerationJobRunner jobRunner = new SegmentGenerationJobRunner(jobSpec); jobRunner.run(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java index 145f443a4a..02e52b26c1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java @@ -45,6 +45,12 @@ public class SegmentGenerationJobSpec implements Serializable { */ private String _inputDirURI; + /** + * If true, search input files recursively from root directly specified in _inputDirURI. + */ + // TODO: set the default value to false after all clients are aware of this. + private boolean _searchRecursively = true; + /** * include file name pattern, supported glob pattern. * Sample usage: @@ -161,6 +167,14 @@ public class SegmentGenerationJobSpec implements Serializable { _inputDirURI = inputDirURI; } + public boolean isSearchRecursively() { + return _searchRecursively; + } + + public void setSearchRecursively(boolean searchRecursively) { + _searchRecursively = searchRecursively; + } + public String getIncludeFileNamePattern() { return _includeFileNamePattern; } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java index cc1fafad79..94270e20d9 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java @@ -48,6 +48,8 @@ public class IngestionJobLauncherTest { null, context, _defaultEnvironmentValues); Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/05/06"); Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/05/06"); + // searchRecursively is set to false in the yaml file. + Assert.assertFalse(spec.isSearchRecursively()); } @Test @@ -56,6 +58,8 @@ public class IngestionJobLauncherTest { GroovyTemplateUtils.class.getClassLoader().getResource("ingestion_job_spec_unicode.yaml").getFile(), null, null, null); Assert.assertEquals("\ufff0", spec.getRecordReaderSpec().getConfigs().get("multiValueDelimiter")); + // searchRecursively is set to true by default. + Assert.assertTrue(spec.isSearchRecursively()); } @Test diff --git a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml index e68bec2e38..665b0be218 100644 --- a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml +++ b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml @@ -26,6 +26,7 @@ executionFrameworkSpec: jobType: SegmentCreationAndTarPush inputDirURI: 'file:///path/to/input/${ year }/${ month }/${ day }' +searchRecursively: false # includeFileNamePattern: include file name pattern, supported glob pattern. # Sample usage: # 'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org